mirror of
https://github.com/hoshikawa2/rfp_response_automation.git
synced 2026-03-03 16:09:35 +00:00
286 lines
8.6 KiB
Python
286 lines
8.6 KiB
Python
import pandas as pd
|
|
import requests
|
|
import json
|
|
from pathlib import Path
|
|
import os
|
|
import re
|
|
|
|
# =========================
|
|
# Configurações
|
|
# =========================
|
|
EXCEL_PATH = "<YOUR_EXCEL_XLSX_FILE>"
|
|
API_URL = "http://demo-orcl-api-ai.hoshikawa.com.br:8101/rest/chat"
|
|
QUERY_LOG_FILE = Path("queries_with_low_confidence_or_no.txt")
|
|
TIMEOUT = 120
|
|
APP_USER = os.environ.get("APP_USER", "<YOUR_USER_NAME>")
|
|
APP_PASS = os.environ.get("APP_PASS", "<YOUR_PASSWORD>")
|
|
|
|
CONTEXT_COLUMNS = [1, 2] # USE IF YOU HAVE A NON-HIERARQUICAL STRUCTURE
|
|
ORDER_COLUMN = 0 # WHERE ARE YOUR ORDER LINE COLUMN
|
|
QUESTION_COLUMN = 4 # WHERE ARE YOUR QUESTION/TEXT to submit to RFP AI
|
|
ALLOWED_STRUCTURES = [
|
|
"x.x",
|
|
"x.x.x",
|
|
"x.x.x.x",
|
|
"x.x.x.x.x",
|
|
"x.x.x.x.x.x"
|
|
]
|
|
ALLOWED_SEPARATORS = [".", "-", "/", "_", ">"]
|
|
|
|
ANSWER_COL = "ANSWER" # NAME YOUR COLUMN for the YES/NO/PARTIAL result
|
|
JSON_COL = "RESULT_JSON" # NAME YOUR COLUMN for the RFP AI automation results
|
|
|
|
CONFIDENCE_COL = "CONFIDENCE"
|
|
AMBIGUITY_COL = "AMBIGUITY"
|
|
CONF_REASON_COL = "CONFIDENCE_REASON"
|
|
JUSTIFICATION_COL = "JUSTIFICATION"
|
|
|
|
# =========================
|
|
# Helpers
|
|
# =========================
|
|
|
|
def normalize_structure(num: str, separators: list[str]) -> str:
|
|
if not num:
|
|
return ""
|
|
|
|
pattern = "[" + re.escape("".join(separators)) + "]"
|
|
return re.sub(pattern, ".", num.strip())
|
|
|
|
def should_process(num: str, allowed_patterns: list[str], separators: list[str]) -> bool:
|
|
normalized = normalize_structure(num, separators)
|
|
|
|
if not is_hierarchical(normalized):
|
|
return True
|
|
|
|
depth = normalized.count(".") + 1
|
|
|
|
allowed_depths = {
|
|
pattern.count(".") + 1
|
|
for pattern in allowed_patterns
|
|
}
|
|
|
|
return depth in allowed_depths
|
|
|
|
def register_failed_query(query: str, answer: str, confidence: str):
|
|
QUERY_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
print("Negative/Doubt result")
|
|
with QUERY_LOG_FILE.open("a", encoding="utf-8") as f:
|
|
f.write("----------------------------\n")
|
|
f.write(f"Query:\n{query}\n\n")
|
|
f.write(f"Answer: {answer}\n")
|
|
f.write(f"Confidence: {confidence}\n\n")
|
|
|
|
def normalize_num(num: str) -> str:
|
|
return num.strip().rstrip(".")
|
|
|
|
def build_question_from_columns(row, context_cols: list[int], question_col: int) -> str:
|
|
context_parts = []
|
|
|
|
for col in context_cols:
|
|
value = str(row.iloc[col]).strip()
|
|
if value:
|
|
context_parts.append(value)
|
|
|
|
question = str(row.iloc[question_col]).strip()
|
|
|
|
if not context_parts:
|
|
return question
|
|
|
|
context = " > ".join(dict.fromkeys(context_parts))
|
|
return f'Considering the context of "{context}", {question}'
|
|
|
|
def build_question(hierarchy: dict, current_num: str) -> str:
|
|
if not is_hierarchical(current_num):
|
|
return hierarchy[current_num]["text"]
|
|
|
|
parts = current_num.split(".")
|
|
|
|
main_subject = None
|
|
main_key = None
|
|
|
|
# ancestral mais alto existente
|
|
for i in range(1, len(parts) + 1):
|
|
key = ".".join(parts[:i])
|
|
if key in hierarchy:
|
|
main_subject = hierarchy[key]["text"]
|
|
main_key = key
|
|
break
|
|
|
|
if not main_subject:
|
|
raise ValueError(f"No valid root subject for {current_num}")
|
|
|
|
subtopics = []
|
|
for i in range(1, len(parts)):
|
|
key = ".".join(parts[: i + 1])
|
|
if key in hierarchy and key != main_key:
|
|
subtopics.append(hierarchy[key]["text"])
|
|
|
|
specific = hierarchy[current_num]["text"]
|
|
|
|
if subtopics:
|
|
context = " > ".join(subtopics)
|
|
return (
|
|
f'Considering the context of "{context}"'
|
|
)
|
|
|
|
return f'What is the {specific} of {main_subject}?'
|
|
|
|
def normalize_api_response(api_response: dict) -> dict:
|
|
if isinstance(api_response, dict) and "result" in api_response and isinstance(api_response["result"], dict):
|
|
if "answer" in api_response["result"]:
|
|
return api_response["result"]
|
|
return api_response
|
|
|
|
def call_api(question: str) -> dict:
|
|
payload = {"question": question}
|
|
|
|
response = requests.post(
|
|
API_URL,
|
|
json=payload,
|
|
auth=(APP_USER, APP_PASS), # 🔐 BASIC AUTH
|
|
timeout=TIMEOUT
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def is_explicit_url(source: str) -> bool:
|
|
return source.startswith("http://") or source.startswith("https://")
|
|
|
|
def is_hierarchical(num: str) -> bool:
|
|
return bool(
|
|
num
|
|
and "." in num
|
|
and all(p.isdigit() for p in num.split("."))
|
|
)
|
|
|
|
def normalize_evidence_sources(evidence: list[dict]) -> list[dict]:
|
|
normalized = []
|
|
|
|
for ev in evidence:
|
|
source = ev.get("source", "").strip()
|
|
quote = ev.get("quote", "").strip()
|
|
|
|
if is_explicit_url(source):
|
|
normalized.append(ev)
|
|
continue
|
|
|
|
normalized.append({
|
|
"quote": quote,
|
|
"source": source or "Oracle Cloud Infrastructure documentation"
|
|
})
|
|
|
|
return normalized
|
|
|
|
# =========================
|
|
# Main
|
|
# =========================
|
|
def main():
|
|
df = pd.read_excel(EXCEL_PATH, dtype=str).fillna("")
|
|
|
|
if ANSWER_COL not in df.columns:
|
|
df[ANSWER_COL] = ""
|
|
|
|
if JSON_COL not in df.columns:
|
|
df[JSON_COL] = ""
|
|
|
|
for col in [
|
|
ANSWER_COL,
|
|
JSON_COL,
|
|
CONFIDENCE_COL,
|
|
AMBIGUITY_COL,
|
|
CONF_REASON_COL,
|
|
JUSTIFICATION_COL
|
|
]:
|
|
if col not in df.columns:
|
|
df[col] = ""
|
|
|
|
hierarchy = {}
|
|
for idx, row in df.iterrows():
|
|
num = normalize_num(str(row.iloc[ORDER_COLUMN]))
|
|
text = str(row.iloc[QUESTION_COLUMN]).strip()
|
|
|
|
if num and text:
|
|
hierarchy[num] = {
|
|
"text": text,
|
|
"row": idx
|
|
}
|
|
|
|
for num, info in hierarchy.items():
|
|
if not should_process(num, ALLOWED_STRUCTURES, ALLOWED_SEPARATORS):
|
|
print(f"⏭️ SKIPPED (structure not allowed): {num}")
|
|
continue
|
|
|
|
try:
|
|
row = df.loc[info["row"]]
|
|
num = normalize_num(str(row.iloc[ORDER_COLUMN]))
|
|
|
|
if is_hierarchical(num):
|
|
question = build_question(hierarchy, num)
|
|
else:
|
|
question = build_question_from_columns(
|
|
row,
|
|
CONTEXT_COLUMNS,
|
|
QUESTION_COLUMN
|
|
)
|
|
|
|
print(f"\n❓ QUESTION SENT TO API:\n{question}")
|
|
|
|
api_response_raw = call_api(question)
|
|
api_response = normalize_api_response(api_response_raw)
|
|
|
|
if "evidence" in api_response:
|
|
api_response["evidence"] = normalize_evidence_sources(
|
|
api_response.get("evidence", [])
|
|
)
|
|
|
|
if (
|
|
api_response.get("answer") == "NO"
|
|
or api_response.get("confidence") in ("MEDIUM", "LOW")
|
|
):
|
|
register_failed_query(
|
|
query=question,
|
|
answer=api_response.get("answer", ""),
|
|
confidence=api_response.get("confidence", "")
|
|
)
|
|
|
|
print("📄 JSON RESPONSE (normalized):")
|
|
print(json.dumps(api_response, ensure_ascii=False, indent=2))
|
|
print("-" * 80)
|
|
|
|
df.at[info["row"], ANSWER_COL] = api_response.get("answer", "ERROR")
|
|
df.at[info["row"], CONFIDENCE_COL] = api_response.get("confidence", "")
|
|
df.at[info["row"], AMBIGUITY_COL] = str(api_response.get("ambiguity_detected", ""))
|
|
df.at[info["row"], CONF_REASON_COL] = api_response.get("confidence_reason", "")
|
|
df.at[info["row"], JUSTIFICATION_COL] = api_response.get("justification", "")
|
|
|
|
df.at[info["row"], JSON_COL] = json.dumps(api_response, ensure_ascii=False)
|
|
|
|
except Exception as e:
|
|
error_json = {
|
|
"answer": "ERROR",
|
|
"confidence": "LOW",
|
|
"ambiguity_detected": True,
|
|
"confidence_reason": "Processing error",
|
|
"justification": str(e),
|
|
"evidence": []
|
|
}
|
|
|
|
df.at[info["row"], ANSWER_COL] = "ERROR"
|
|
df.at[info["row"], CONFIDENCE_COL] = "LOW"
|
|
df.at[info["row"], AMBIGUITY_COL] = "True"
|
|
df.at[info["row"], CONF_REASON_COL] = "Processing error"
|
|
df.at[info["row"], JUSTIFICATION_COL] = str(e)
|
|
df.at[info["row"], JSON_COL] = json.dumps(error_json, ensure_ascii=False)
|
|
|
|
print(f"❌ ERROR processing item {num}: {e}")
|
|
|
|
output_path = Path(EXCEL_PATH).with_name(
|
|
Path(EXCEL_PATH).stem + "_result.xlsx"
|
|
)
|
|
df.to_excel(output_path, index=False)
|
|
|
|
print(f"\n✅ Saved in: {output_path}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |