Files
rfp_response_automation/files/graphrag_rerank.py
2026-01-10 07:50:18 -03:00

856 lines
26 KiB
Python

from langchain_community.chat_models.oci_generative_ai import ChatOCIGenAI
from langchain_core.prompts import PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain_community.embeddings import OCIGenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema.runnable import RunnableMap
from langchain_community.document_loaders import UnstructuredPDFLoader, PyMuPDFLoader
from langchain_core.documents import Document
from langchain_core.runnables import RunnableLambda
from tqdm import tqdm
import os
import pickle
import re
import atexit
import oracledb
import json
# =========================
# Oracle Autonomous Configuration
# =========================
WALLET_PATH = "Wallet_oradb23ai"
DB_ALIAS = "oradb23ai_high"
USERNAME = "admin"
PASSWORD = "**********"
os.environ["TNS_ADMIN"] = WALLET_PATH
GRAPH_NAME = "GRAPH_DB_1"
# =========================
# Global Configurations
# =========================
INDEX_PATH = "./faiss_index"
PROCESSED_DOCS_FILE = os.path.join(INDEX_PATH, "processed_docs.pkl")
chapter_separator_regex = r"^(#{1,6} .+|\*\*.+\*\*)$"
pdf_paths = ['<YOUR_KNOWLEDGE_BASE_FILE>.pdf']
# =========================
# LLM Definitions
# =========================
llm = ChatOCIGenAI(
model_id="meta.llama-3.1-405b-instruct",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
model_kwargs={"temperature": 0.7, "top_p": 0.75, "max_tokens": 4000},
)
llm_for_rag = ChatOCIGenAI(
model_id="meta.llama-3.1-405b-instruct",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
)
embeddings = OCIGenAIEmbeddings(
model_id="cohere.embed-multilingual-v3.0",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
)
oracle_conn = oracledb.connect(
user=USERNAME,
password=PASSWORD,
dsn=DB_ALIAS,
config_dir=WALLET_PATH,
wallet_location=WALLET_PATH,
wallet_password=PASSWORD
)
atexit.register(lambda: oracle_conn.close())
# =========================
# Oracle Graph Client
# =========================
def ensure_oracle_text_index(
conn,
table_name: str,
column_name: str,
index_name: str
):
"""
Ensure an Oracle Text (CTXSYS.CONTEXT) index exists and is synchronized
for a given table and column.
"""
cursor = conn.cursor()
# 1. Verifica se o índice já existe
cursor.execute("""
SELECT COUNT(*)
FROM user_indexes
WHERE index_name = :idx_name
""", {"idx_name": index_name.upper()})
exists = cursor.fetchone()[0] > 0
if not exists:
print(f"🛠️ Creating Oracle Text index {index_name} on {table_name}.{column_name}")
cursor.execute(f"""
CREATE INDEX {index_name}
ON {table_name} ({column_name})
INDEXTYPE IS CTXSYS.CONTEXT
""")
else:
print(f"✔️ Oracle Text index already exists: {index_name}")
# 2. Sincroniza o índice (importante se dados já existirem)
print(f"🔄 Syncing Oracle Text index: {index_name}")
cursor.execute(f"""
BEGIN
CTX_DDL.SYNC_INDEX('{index_name}');
END;
""")
conn.commit()
cursor.close()
def create_tables_if_not_exist(conn):
cursor = conn.cursor()
try:
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE ENTITIES_{GRAPH_NAME} (
ID NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
NAME VARCHAR2(500)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE RELATIONS_{GRAPH_NAME} (
ID NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
SOURCE_ID NUMBER,
TARGET_ID NUMBER,
RELATION_TYPE VARCHAR2(100),
SOURCE_TEXT VARCHAR2(4000)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
conn.commit()
print("✅ ENTITIES and RELATIONS tables created or already exist.")
except Exception as e:
print(f"[ERROR] Failed to create tables: {e}")
finally:
cursor.close()
create_tables_if_not_exist(oracle_conn)
ensure_oracle_text_index(
oracle_conn,
"ENTITIES_" + GRAPH_NAME,
"NAME",
"IDX_ENT_" + GRAPH_NAME + "_NAME"
)
ensure_oracle_text_index(
oracle_conn,
"RELATIONS_" + GRAPH_NAME,
"RELATION_TYPE",
"IDX_REL_" + GRAPH_NAME + "_RELTYPE"
)
def create_knowledge_graph(chunks):
cursor = oracle_conn.cursor()
# Creates graph if it does not exist
try:
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE PROPERTY GRAPH {GRAPH_NAME}
VERTEX TABLES (ENTITIES_{GRAPH_NAME}
KEY (ID)
LABEL ENTITIES
PROPERTIES (NAME))
EDGE TABLES (RELATIONS_{GRAPH_NAME}
KEY (ID)
SOURCE KEY (SOURCE_ID) REFERENCES ENTITIES_{GRAPH_NAME}(ID)
DESTINATION KEY (TARGET_ID) REFERENCES ENTITIES_{GRAPH_NAME}(ID)
LABEL RELATIONS
PROPERTIES (RELATION_TYPE, SOURCE_TEXT))
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -55358 THEN -- ORA-55358: Graph already exists
RAISE;
END IF;
END;
""")
print(f"🧠 Graph '{GRAPH_NAME}' created or already exists.")
except Exception as e:
print(f"[GRAPH ERROR] Failed to create graph: {e}")
# Inserting vertices and edges into the tables
for doc in chunks:
text = doc.page_content
source = doc.metadata.get("source", "unknown")
if not text.strip():
continue
prompt = f"""
You are extracting structured RFP evidence from technical documentation.
Given the text below, identify ONLY explicit, verifiable facts.
Text:
{text}
Extract triples in ONE of the following formats ONLY:
1. REQUIREMENT -[HAS_SUBJECT]-> <subject>
2. REQUIREMENT -[HAS_METRIC]-> <metric name>
3. REQUIREMENT -[HAS_VALUE]-> <exact value or limit>
4. REQUIREMENT -[SUPPORTED_BY]-> <document section or sentence>
Rules:
- Use REQUIREMENT as the source entity
- Use UPPERCASE relation names
- Do NOT infer or assume
- If nothing explicit is found, return NONE
"""
try:
response = llm_for_rag.invoke(prompt)
result = response.content.strip()
except Exception as e:
print(f"[ERROR] Gen AI call error: {e}")
continue
if result.upper() == "NONE":
continue
triples = result.splitlines()
for triple in triples:
parts = triple.split("-[")
if len(parts) != 2:
continue
right_part = parts[1].split("]->")
if len(right_part) != 2:
continue
raw_relation, entity2 = right_part
relation = re.sub(r'\W+', '_', raw_relation.strip().upper())
entity1 = parts[0].strip()
entity2 = entity2.strip()
if entity1.upper() != "REQUIREMENT":
entity1 = "REQUIREMENT"
try:
# Insertion of entities (with existence check)
cursor.execute(f"MERGE INTO ENTITIES_{GRAPH_NAME} e USING (SELECT :name AS NAME FROM dual) src ON (e.name = src.name) WHEN NOT MATCHED THEN INSERT (NAME) VALUES (:name)", [entity1, entity1])
cursor.execute(f"MERGE INTO ENTITIES_{GRAPH_NAME} e USING (SELECT :name AS NAME FROM dual) src ON (e.name = src.name) WHEN NOT MATCHED THEN INSERT (NAME) VALUES (:name)", [entity2, entity2])
# Retrieve the IDs
cursor.execute(f"SELECT ID FROM ENTITIES_{GRAPH_NAME} WHERE NAME = :name", [entity1])
source_id = cursor.fetchone()[0]
cursor.execute(f"SELECT ID FROM ENTITIES_{GRAPH_NAME} WHERE NAME = :name", [entity2])
target_id = cursor.fetchone()[0]
# Create relations
cursor.execute(f"""
INSERT INTO RELATIONS_{GRAPH_NAME} (SOURCE_ID, TARGET_ID, RELATION_TYPE, SOURCE_TEXT)
VALUES (:src, :tgt, :rel, :txt)
""", [source_id, target_id, relation, source])
print(f"{entity1} -[{relation}]-> {entity2}")
except Exception as e:
print(f"[INSERT ERROR] {e}")
oracle_conn.commit()
cursor.close()
print("💾 Knowledge graph updated.")
def parse_rfp_requirement(question: str) -> dict:
prompt = f"""
You are an RFP requirement extractor.
Return the result STRICTLY between the tags <json> and </json>.
Do NOT write anything outside these tags.
Question:
{question}
<json>
{{
"requirement_type": "COMPLIANCE | FUNCTIONAL | NON_FUNCTIONAL",
"subject": "<short subject>",
"expected_value": "<value or condition if any>",
"decision_type": "YES_NO | YES_NO_PARTIAL",
"keywords": ["keyword1", "keyword2"]
}}
</json>
"""
resp = llm_for_rag.invoke(prompt)
raw = resp.content.strip()
try:
# remove ```json ``` ou ``` ```
raw = re.sub(r"```json|```", "", raw).strip()
match = re.search(r"<json>\s*(\{.*?\})\s*</json>", raw, re.DOTALL)
if not match:
raise ValueError("No JSON block found")
json_text = match.group(1)
return json.loads(json_text)
except Exception as e:
print("⚠️ RFP PARSER FAILED")
print("RAW RESPONSE:")
print(raw)
return {
"requirement_type": "UNKNOWN",
"subject": question,
"expected_value": "",
"decision_type": "YES_NO_PARTIAL",
"keywords": re.findall(r"\b\w+\b", question.lower())[:5]
}
def extract_graph_keywords_from_requirement(req: dict) -> str:
keywords = set(req.get("keywords", []))
if req.get("subject"):
keywords.add(req["subject"].lower())
if req.get("expected_value"):
keywords.add(str(req["expected_value"]).lower())
return ", ".join(sorted(keywords))
def build_oracle_text_query(text: str) -> str | None:
ORACLE_TEXT_STOPWORDS = {
"and", "or", "the", "with", "between", "of", "to", "for",
"in", "on", "by", "is", "are", "was", "were", "be"
}
tokens = []
text = text.lower()
text = re.sub(r"[^a-z0-9\s]", " ", text)
for token in text.split():
if len(token) >= 4 and token not in ORACLE_TEXT_STOPWORDS:
tokens.append(f"{token}")
tokens = sorted(set(tokens))
return " OR ".join(tokens) if tokens else None
def query_knowledge_graph(raw_keywords: str):
cursor = oracle_conn.cursor()
safe_query = build_oracle_text_query(raw_keywords)
base_sql = f"""
SELECT
e1.NAME AS source_name,
r.RELATION_TYPE,
e2.NAME AS target_name
FROM RELATIONS_{GRAPH_NAME} r
JOIN ENTITIES_{GRAPH_NAME} e1 ON e1.ID = r.SOURCE_ID
JOIN ENTITIES_{GRAPH_NAME} e2 ON e2.ID = r.TARGET_ID
WHERE e1.NAME = 'REQUIREMENT'
"""
if safe_query:
base_sql += f"""
AND (
CONTAINS(e2.NAME, '{safe_query}') > 0
OR CONTAINS(r.RELATION_TYPE, '{safe_query}') > 0
)
"""
print("🔎 GRAPH QUERY:")
print(base_sql)
cursor.execute(base_sql)
rows = cursor.fetchall()
cursor.close()
print("📊 GRAPH FACTS:")
for s, r, t in rows:
print(f" REQUIREMENT -[{r}]-> {t}")
return rows
# RE-RANK
def extract_terms_from_graph_text(graph_context):
if not graph_context:
return set()
if isinstance(graph_context, list):
terms = set()
for row in graph_context:
for col in row:
if isinstance(col, str):
terms.add(col.lower())
return terms
if isinstance(graph_context, str):
terms = set()
pattern = re.findall(r"([\w\s]+)-$begin:math:display$\[\\w\_\]\+$end:math:display$->([\w\s]+)", graph_context)
for e1, e2 in pattern:
terms.add(e1.strip().lower())
terms.add(e2.strip().lower())
return terms
return set()
def rerank_documents_with_graph_terms(docs, query, graph_terms):
query_terms = set(re.findall(r'\b\w+\b', query.lower()))
all_terms = query_terms.union(graph_terms)
scored_docs = []
for doc in docs:
doc_text = doc.page_content.lower()
score = sum(1 for term in all_terms if term in doc_text)
scored_docs.append((score, doc))
top_docs = sorted(scored_docs, key=lambda x: x[0], reverse=True)[:5]
return [doc.page_content for _, doc in top_docs]
# SEMANTIC CHUNKING
def split_llm_output_into_chapters(llm_text):
chapters = []
current_chapter = []
lines = llm_text.splitlines()
for line in lines:
if re.match(chapter_separator_regex, line):
if current_chapter:
chapters.append("\n".join(current_chapter).strip())
current_chapter = [line]
else:
current_chapter.append(line)
if current_chapter:
chapters.append("\n".join(current_chapter).strip())
return chapters
def semantic_chunking(text):
prompt = f"""
You received the following text extracted via OCR:
{text}
Your task:
1. Identify headings (short uppercase or bold lines, no period at the end) putting the Product Name (Application Name) and the Subject
2. Separate paragraphs by heading
3. Indicate columns with [COLUMN 1], [COLUMN 2] if present
4. Indicate tables with [TABLE] in markdown format
5. Indicate explicity metrics (if it exists)
Examples:
- Oracle Financial Services RTO is 1 hour
- The Oracle Banking Supply Chain Finance Cloud Service A maximum number of 10K Hosted Transactions
- The Oracle Banking Payments Cloud Service, Additional Non-Production Environment: You may purchase up to a maximum of ten (10) additional Non-Production Environments
"""
get_out = False
while not get_out:
try:
response = llm_for_rag.invoke(prompt)
get_out = True
except:
print("[ERROR] Gen AI call error")
return response
def read_pdfs(pdf_path):
if "-ocr" in pdf_path:
doc_pages = PyMuPDFLoader(str(pdf_path)).load()
else:
doc_pages = UnstructuredPDFLoader(str(pdf_path)).load()
full_text = "\n".join([page.page_content for page in doc_pages])
return full_text
def smart_split_text(text, max_chunk_size=10_000):
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = min(start + max_chunk_size, text_length)
split_point = max(
text.rfind('.', start, end),
text.rfind('!', start, end),
text.rfind('?', start, end),
text.rfind('\n\n', start, end)
)
if split_point == -1 or split_point <= start:
split_point = end
else:
split_point += 1
chunk = text[start:split_point].strip()
if chunk:
chunks.append(chunk)
start = split_point
return chunks
def load_previously_indexed_docs():
if os.path.exists(PROCESSED_DOCS_FILE):
with open(PROCESSED_DOCS_FILE, "rb") as f:
return pickle.load(f)
return set()
def save_indexed_docs(docs):
with open(PROCESSED_DOCS_FILE, "wb") as f:
pickle.dump(docs, f)
# =========================
# Main Function
# =========================
def chat():
pdf_paths = ['RFP - Financial v2.pdf']
already_indexed_docs = load_previously_indexed_docs()
updated_docs = set()
try:
vectorstore = FAISS.load_local(INDEX_PATH, embeddings, allow_dangerous_deserialization=True)
print("✔️ FAISS index loaded.")
except Exception:
print("⚠️ FAISS index not found, creating a new one.")
vectorstore = None
new_chunks = []
for pdf_path in tqdm(pdf_paths, desc=f"📄 Processing PDFs"):
print(f" {os.path.basename(pdf_path)}")
if pdf_path in already_indexed_docs:
print(f"✅ Document already indexed: {pdf_path}")
continue
full_text = read_pdfs(pdf_path=pdf_path)
text_chunks = smart_split_text(full_text, max_chunk_size=10_000)
overflow_buffer = ""
for chunk in tqdm(text_chunks, desc=f"📄 Processing text chunks", dynamic_ncols=True, leave=False):
current_text = overflow_buffer + chunk
treated_text = semantic_chunking(current_text)
if hasattr(treated_text, "content"):
chapters = split_llm_output_into_chapters(treated_text.content)
last_chapter = chapters[-1] if chapters else ""
if last_chapter and not last_chapter.strip().endswith((".", "!", "?", "\n\n")):
print("📌 Last chapter seems incomplete, saving for the next cycle")
overflow_buffer = last_chapter
chapters = chapters[:-1]
else:
overflow_buffer = ""
for chapter_text in chapters:
doc = Document(page_content=chapter_text, metadata={"source": pdf_path})
new_chunks.append(doc)
print(f"✅ New chapter indexed:\n{chapter_text}...\n")
else:
print(f"[ERROR] semantic_chunking returned unexpected type: {type(treated_text)}")
updated_docs.add(str(pdf_path))
if new_chunks:
if vectorstore:
vectorstore.add_documents(new_chunks)
else:
vectorstore = FAISS.from_documents(new_chunks, embedding=embeddings)
vectorstore.save_local(INDEX_PATH)
save_indexed_docs(already_indexed_docs.union(updated_docs))
print(f"💾 {len(new_chunks)} chunks added to FAISS index.")
print("🧠 Building knowledge graph...")
create_knowledge_graph(new_chunks)
else:
print("📁 No new documents to index.")
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 50, "fetch_k": 100})
RFP_DECISION_TEMPLATE = """
You are answering an RFP requirement with risk awareness.
Requirement:
Type: {requirement_type}
Subject: {subject}
Expected value: {expected_value}
Document evidence:
{text_context}
Graph evidence:
{graph_context}
Decision rules:
- Answer ONLY with YES, NO or PARTIAL
- Do NOT assume anything not explicitly stated
- If value differs, answer PARTIAL
- If not found, answer NO
Confidence rules:
- HIGH: Explicit evidence directly answers the requirement
- MEDIUM: Evidence partially matches or requires light interpretation
- LOW: Requirement is ambiguous OR evidence is indirect OR missing
Ambiguity rules:
- ambiguity_detected = true if:
- The requirement can be interpreted in more than one way
- Keywords are vague (e.g. "support", "integration", "capability")
- Evidence does not clearly bind to subject + expected value
OUTPUT CONSTRAINTS (MANDATORY):
- Return ONLY a valid JSON object
- Do NOT include explanations, comments, markdown, lists, or code fences
- Do NOT write any text before or after the JSON
- The response must start with an opening curly brace and end with a closing curly brace
JSON schema (return exactly this structure):
{{
"answer": "YES | NO | PARTIAL",
"confidence": "HIGH | MEDIUM | LOW",
"ambiguity_detected": true,
"confidence_reason": "<short reason>",
"justification": "<short factual explanation>",
"evidence": [
{{
"quote": "<exact text>",
"source": "<document or section if available>"
}}
]
}}
"""
prompt = PromptTemplate.from_template(RFP_DECISION_TEMPLATE)
def get_context(x):
query = x.get("input") if isinstance(x, dict) else x
# 1. Recupera chunks vetoriais normalmente
docs = retriever.invoke(query)
req = parse_rfp_requirement(query)
query_terms = extract_graph_keywords_from_requirement(req)
graph_context = query_knowledge_graph(query_terms)
graph_terms = extract_terms_from_graph_text(graph_context)
reranked_chunks = rerank_documents_with_graph_terms(docs, query, graph_terms)
return "\n\n".join(reranked_chunks)
def get_context_from_requirement(req: dict):
query_terms = extract_graph_keywords_from_requirement(req)
docs = retriever.invoke(query_terms)
graph_context = query_knowledge_graph(query_terms)
return {
"text_context": "\n\n".join(doc.page_content for doc in docs),
"graph_context": graph_context,
"requirement_type": req["requirement_type"],
"subject": req["subject"],
"expected_value": req.get("expected_value", "")
}
parse_requirement_runnable = RunnableLambda(
lambda q: parse_rfp_requirement(q)
)
chain = (
parse_requirement_runnable
| RunnableMap({
"text_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["text_context"]
),
"graph_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["graph_context"]
),
"requirement_type": lambda req: req["requirement_type"],
"subject": lambda req: req["subject"],
"expected_value": lambda req: req.get("expected_value", "")
})
| prompt
| llm
| StrOutputParser()
)
print("✅ READY")
while True:
query = input("❓ Question (or 'quit' to exit): ")
if query.lower() == "quit":
break
response = chain.invoke(query)
print("\n📜 RESPONSE:\n")
print(response)
print("\n" + "=" * 80 + "\n")
def get_context(x):
query = x.get("input") if isinstance(x, dict) else x
docs = retriever.invoke(query)
req = parse_rfp_requirement(query)
query_terms = extract_graph_keywords_from_requirement(req)
graph_context = query_knowledge_graph(query_terms)
graph_terms = extract_terms_from_graph_text(graph_context)
reranked_chunks = rerank_documents_with_graph_terms(docs, query, graph_terms)
return "\n\n".join(reranked_chunks)
def get_context_from_requirement(req: dict):
query_terms = extract_graph_keywords_from_requirement(req)
docs = retriever.invoke(query_terms)
graph_context = query_knowledge_graph(query_terms)
graph_terms = extract_terms_from_graph_text(graph_context)
reranked_chunks = rerank_documents_with_graph_terms(
docs,
query_terms,
graph_terms
)
return {
"text_context": "\n\n".join(reranked_chunks),
"graph_context": graph_context,
"requirement_type": req["requirement_type"],
"subject": req["subject"],
"expected_value": req.get("expected_value", "")
}
try:
vectorstore = FAISS.load_local(
INDEX_PATH,
embeddings,
allow_dangerous_deserialization=True
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 50, "fetch_k": 100}
)
except:
print("No Faiss")
RFP_DECISION_TEMPLATE = """
You are answering an RFP requirement with risk awareness.
Requirement:
Type: {requirement_type}
Subject: {subject}
Expected value: {expected_value}
Document evidence:
{text_context}
Graph evidence:
{graph_context}
Decision rules:
- Answer ONLY with YES, NO or PARTIAL
- Do NOT assume anything not explicitly stated
- If value differs, answer PARTIAL
- If not found, answer NO
Confidence rules:
- HIGH: Explicit evidence directly answers the requirement
- MEDIUM: Evidence partially matches or requires light interpretation
- LOW: Requirement is ambiguous OR evidence is indirect OR missing
Ambiguity rules:
- ambiguity_detected = true if:
- The requirement can be interpreted in more than one way
- Keywords are vague (e.g. "support", "integration", "capability")
- Evidence does not clearly bind to subject + expected value
OUTPUT CONSTRAINTS (MANDATORY):
- Return ONLY a valid JSON object
- Do NOT include explanations, comments, markdown, lists, or code fences
- Do NOT write any text before or after the JSON
- The response must start with an opening curly brace and end with a closing curly brace
JSON schema (return exactly this structure):
{{
"answer": "YES | NO | PARTIAL",
"confidence": "HIGH | MEDIUM | LOW",
"ambiguity_detected": true,
"confidence_reason": "<short reason>",
"justification": "<short factual explanation>",
"evidence": [
{{
"quote": "<exact text>",
"source": "<document or section if available>"
}}
]
}}
"""
prompt = PromptTemplate.from_template(RFP_DECISION_TEMPLATE)
parse_requirement_runnable = RunnableLambda(
lambda q: parse_rfp_requirement(q)
)
chain = (
parse_requirement_runnable
| RunnableMap({
"text_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["text_context"]
),
"graph_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["graph_context"]
),
"requirement_type": lambda req: req["requirement_type"],
"subject": lambda req: req["subject"],
"expected_value": lambda req: req.get("expected_value", "")
})
| prompt
| llm
| StrOutputParser()
)
def answer_question(question: str) -> str:
return chain.invoke(question)
# 🚀 Run
if __name__ == "__main__":
chat()