Files
rfp_response_automation/files/rfp_process.py
2026-02-18 20:34:33 -03:00

428 lines
12 KiB
Python

import pandas as pd
import requests
import json
from pathlib import Path
import os
import re
import logging
from config_loader import load_config
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from queue import Queue
import threading
from oci_genai_llm_graphrag_rerank_rfp import answer_question
config = load_config()
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
EXCEL_QUEUE = Queue()
# =========================
# Configurações
# =========================
API_URL = "http://127.0.0.1:" + str(config.service_port) + "/chat"
QUERY_LOG_FILE = Path("queries_with_low_confidence_or_no.txt")
CONTEXT_COLUMNS = [1, 2] # USE IF YOU HAVE A NON-HIERARQUICAL STRUCTURE
ORDER_COLUMN = 0 # WHERE ARE YOUR ORDER LINE COLUMN
QUESTION_COLUMN = 4 # WHERE ARE YOUR QUESTION/TEXT to submit to RFP AI
ALLOWED_STRUCTURES = [
"x.x",
"x.x.x",
"x.x.x.x",
"x.x.x.x.x",
"x.x.x.x.x.x"
]
ALLOWED_SEPARATORS = [".", "-", "/", "_", ">"]
ANSWER_COL = "ANSWER" # NAME YOUR COLUMN for the YES/NO/PARTIAL result
JSON_COL = "RESULT_JSON" # NAME YOUR COLUMN for the RFP AI automation results
ARCH_PLAN_COL = "ARCH_PLAN"
MERMAID_COL = "MERMAID"
CONFIDENCE_COL = "CONFIDENCE"
AMBIGUITY_COL = "AMBIGUITY"
CONF_REASON_COL = "CONFIDENCE_REASON"
JUSTIFICATION_COL = "JUSTIFICATION"
# =========================
# Helpers
# =========================
def normalize_structure(num: str, separators: list[str]) -> str:
if not num:
return ""
pattern = "[" + re.escape("".join(separators)) + "]"
return re.sub(pattern, ".", num.strip())
def should_process(num: str, allowed_patterns: list[str], separators: list[str]) -> bool:
normalized = normalize_structure(num, separators)
if not is_hierarchical(normalized):
return True
depth = normalized.count(".") + 1
allowed_depths = {
pattern.count(".") + 1
for pattern in allowed_patterns
}
return depth in allowed_depths
def register_failed_query(query: str, answer: str, confidence: str):
QUERY_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
logger.info("Negative/Doubt result")
with QUERY_LOG_FILE.open("a", encoding="utf-8") as f:
f.write("----------------------------\n")
f.write(f"Query:\n{query}\n\n")
f.write(f"Answer: {answer}\n")
f.write(f"Confidence: {confidence}\n\n")
def normalize_num(num: str) -> str:
return num.strip().rstrip(".")
def build_question_from_columns(row, context_cols: list[int], question_col: int) -> str:
context_parts = []
for col in context_cols:
value = str(row.iloc[col]).strip()
if value:
context_parts.append(value)
question = str(row.iloc[question_col]).strip()
if not context_parts:
return question
context = " > ".join(dict.fromkeys(context_parts))
return f'Considering the context of "{context}", {question}'
def build_question(hierarchy: dict, current_num: str) -> str:
if not is_hierarchical(current_num):
return hierarchy[current_num]["text"]
parts = current_num.split(".")
main_subject = None
main_key = None
# ancestral mais alto existente
for i in range(1, len(parts) + 1):
key = ".".join(parts[:i])
if key in hierarchy:
main_subject = hierarchy[key]["text"]
main_key = key
break
if not main_subject:
raise ValueError(f"No valid root subject for {current_num}")
subtopics = []
for i in range(1, len(parts)):
key = ".".join(parts[: i + 1])
if key in hierarchy and key != main_key:
subtopics.append(hierarchy[key]["text"])
specific = hierarchy[current_num]["text"]
if subtopics:
context = " > ".join(subtopics)
return (
f'Considering the context of "{context}"'
)
return f'What is the {specific} of {main_subject}?'
def normalize_api_response(api_response) -> dict:
# --------------------------------
# 🔥 STRING → JSON
# --------------------------------
if isinstance(api_response, str):
try:
api_response = json.loads(api_response)
except Exception:
return {"error": f"Invalid string response: {api_response[:300]}"}
if not isinstance(api_response, dict):
return {"error": f"Invalid type: {type(api_response)}"}
if "error" in api_response:
return api_response
if isinstance(api_response.get("result"), dict):
return api_response["result"]
if "answer" in api_response:
return api_response
return {"error": f"Unexpected format: {str(api_response)[:300]}"}
def call_api(
question: str,
*,
api_url: str,
timeout: int,
auth_user: str | None,
auth_pass: str | None,
) -> dict:
payload = {"question": question}
response = requests.post(
api_url,
json=payload,
auth=(auth_user, auth_pass) if auth_user else None,
timeout=timeout
)
if response.status_code >= 500:
raise RuntimeError(
f"Server error {response.status_code}: {response.text}",
response=response
)
text = response.text.lower()
if "gateway time" in text or "timeout" in text:
raise RuntimeError(response.text)
try:
return response.json()
except:
raise RuntimeError(
f"Invalid JSON: {response.text[:300]}"
)
def is_explicit_url(source: str) -> bool:
return source.startswith("http://") or source.startswith("https://")
def is_hierarchical(num: str) -> bool:
return bool(
num
and "." in num
and all(p.isdigit() for p in num.split("."))
)
def normalize_evidence_sources(evidence: list[dict]) -> list[dict]:
normalized = []
for ev in evidence:
source = ev.get("source", "").strip()
quote = ev.get("quote", "").strip()
if is_explicit_url(source):
normalized.append(ev)
continue
normalized.append({
"quote": quote,
"source": source or "Oracle Cloud Infrastructure documentation"
})
return normalized
def build_justification_with_links(justification: str, evidence: list[dict]) -> str:
"""
Combine justification text + evidence URLs in a readable format for Excel.
"""
if not evidence:
return justification or ""
urls = []
for ev in evidence:
src = ev.get("source", "").strip()
if is_explicit_url(src):
urls.append(src)
if not urls:
return justification or ""
links_text = "\n".join(f"- {u}" for u in sorted(set(urls)))
if justification:
return f"{justification}\n\nSources:\n{links_text}"
return f"Sources:\n{links_text}"
def call_api_with_retry(question, max_minutes=30, **kwargs):
start = time.time()
attempt = 0
delay = 5
while True:
try:
return call_api(question, **kwargs)
except Exception as e:
attempt += 1
elapsed = time.time() - start
msg = str(e).lower()
if any(x in msg for x in ["401", "403", "400", "invalid json format"]):
raise
if elapsed > max_minutes * 60:
raise RuntimeError(
f"Timeout after {attempt} attempts / {int(elapsed)}s"
)
logger.info(
f"🔁 Retry {attempt} | waiting {delay}s | {e}"
)
time.sleep(delay)
delay = min(delay * 1.5, 60)
def call_local_engine(question: str) -> dict:
return answer_question(question)
# =========================
# Main
# =========================
def process_excel_rfp(
input_excel: Path,
output_excel: Path,
*,
api_url: str,
timeout: int = 120,
auth_user: str | None = None,
auth_pass: str | None = None,
) -> Path:
df = pd.read_excel(input_excel, dtype=str).fillna("")
for col in [
ANSWER_COL,
JSON_COL,
CONFIDENCE_COL,
AMBIGUITY_COL,
CONF_REASON_COL,
JUSTIFICATION_COL
]:
if col not in df.columns:
df[col] = ""
hierarchy = {}
for idx, row in df.iterrows():
num = normalize_num(str(row.iloc[ORDER_COLUMN]))
text = str(row.iloc[QUESTION_COLUMN]).strip()
if num and text:
hierarchy[num] = {"text": text, "row": idx}
# =========================================
# 🔥 WORKER PARALELO
# =========================================
def process_row(num, info):
try:
row = df.loc[info["row"]]
if is_hierarchical(num):
question = build_question(hierarchy, num)
else:
question = build_question_from_columns(
row,
CONTEXT_COLUMNS,
QUESTION_COLUMN
)
logger.info(f"\n🔸 QUESTION {num} SENT TO API:\n{question}")
# raw = call_api_with_retry(
# question,
# api_url=api_url,
# timeout=timeout,
# auth_user=auth_user,
# auth_pass=auth_pass
# )
raw = call_local_engine(question)
resp = normalize_api_response(raw)
return info["row"], question, resp
except Exception as e:
return info["row"], "", {"error": str(e)}
# =========================================
# PARALLEL EXECUTION - FUTURE - OCI ACCEPTS ONLY 1 HERE
# =========================================
futures = []
with ThreadPoolExecutor(max_workers=1) as executor:
for num, info in hierarchy.items():
if not should_process(num, ALLOWED_STRUCTURES, ALLOWED_SEPARATORS):
continue
futures.append(executor.submit(process_row, num, info))
for f in as_completed(futures):
row_idx, question, api_response = f.result()
api_response = normalize_api_response(api_response)
try:
if "error" in api_response:
raise Exception(api_response["error"])
if "evidence" in api_response:
api_response["evidence"] = normalize_evidence_sources(
api_response["evidence"]
)
if (
api_response.get("answer") == "NO"
or api_response.get("confidence") in ("MEDIUM", "LOW")
):
register_failed_query(
query=question,
answer=api_response.get("answer", ""),
confidence=api_response.get("confidence", "")
)
df.at[row_idx, ANSWER_COL] = api_response.get("answer", "ERROR")
df.at[row_idx, CONFIDENCE_COL] = api_response.get("confidence", "")
df.at[row_idx, AMBIGUITY_COL] = str(api_response.get("ambiguity_detected", ""))
df.at[row_idx, CONF_REASON_COL] = api_response.get("confidence_reason", "")
df.at[row_idx, JUSTIFICATION_COL] = build_justification_with_links(
api_response.get("justification", ""),
api_response.get("evidence", [])
)
df.at[row_idx, JSON_COL] = json.dumps(api_response, ensure_ascii=False)
logger.info(json.dumps(api_response, indent=2))
except Exception as e:
df.at[row_idx, ANSWER_COL] = "ERROR"
df.at[row_idx, CONFIDENCE_COL] = "LOW"
df.at[row_idx, JUSTIFICATION_COL] = str(e)
logger.info(f"❌ ERROR: {e}")
df.to_excel(output_excel, index=False)
return output_excel
if __name__ == "__main__":
import sys
input_path = Path(sys.argv[1])
output_path = input_path.with_name(input_path.stem + "_result.xlsx")
process_excel_rfp(
input_excel=input_path,
output_excel=output_path,
api_url=API_URL,
)