Files
rfp_response_automation/files/graphrag_rerank.py
2026-01-14 09:44:22 -03:00

1009 lines
33 KiB
Python

from langchain_community.chat_models.oci_generative_ai import ChatOCIGenAI
from langchain_core.prompts import PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain_community.embeddings import OCIGenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema.runnable import RunnableMap
from langchain_community.document_loaders import UnstructuredPDFLoader, PyMuPDFLoader
from langchain_core.documents import Document
from langchain_core.runnables import RunnableLambda
from pathlib import Path
from tqdm import tqdm
import os
import pickle
import re
import atexit
import oracledb
import json
import base64
# =========================
# Oracle Autonomous Configuration
# =========================
WALLET_PATH = "Wallet_oradb23ai"
DB_ALIAS = "oradb23ai_high"
USERNAME = "admin"
PASSWORD = "Moniquinha1972"
os.environ["TNS_ADMIN"] = WALLET_PATH
# =========================
# Global Configurations
# =========================
INDEX_PATH = "./faiss_index"
PROCESSED_DOCS_FILE = os.path.join(INDEX_PATH, "processed_docs.pkl")
chapter_separator_regex = r"^(#{1,6} .+|\*\*.+\*\*)$"
GRAPH_NAME = "OCI_GRAPH"
# =========================
# LLM Definitions
# =========================
llm = ChatOCIGenAI(
model_id="meta.llama-3.1-405b-instruct",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
model_kwargs={"temperature": 0.1, "top_p": 0.75, "max_tokens": 4000},
)
llm_for_rag = ChatOCIGenAI(
model_id="meta.llama-3.1-405b-instruct",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
)
embeddings = OCIGenAIEmbeddings(
model_id="cohere.embed-multilingual-v3.0",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
)
oracle_conn = oracledb.connect(
user=USERNAME,
password=PASSWORD,
dsn=DB_ALIAS,
config_dir=WALLET_PATH,
wallet_location=WALLET_PATH,
wallet_password=PASSWORD
)
atexit.register(lambda: oracle_conn.close())
def filename_to_url(filename: str, suffix: str = ".pdf") -> str:
if filename.endswith(suffix):
filename = filename[: -len(suffix)]
decoded = base64.urlsafe_b64decode(filename.encode("ascii"))
return decoded.decode("utf-8")
# =========================
# Oracle Graph Client
# =========================
def ensure_oracle_text_index(
conn,
table_name: str,
column_name: str,
index_name: str
):
cursor = conn.cursor()
cursor.execute("""
SELECT status
FROM user_indexes
WHERE index_name = :idx
""", {"idx": index_name.upper()})
row = cursor.fetchone()
index_exists = row is not None
index_status = row[0] if row else None
if not index_exists:
print(f"🛠️ Creating Oracle Text index {index_name}")
cursor.execute(f"""
CREATE INDEX {index_name}
ON {table_name} ({column_name})
INDEXTYPE IS CTXSYS.CONTEXT
""")
conn.commit()
cursor.close()
print(f"✅ Index {index_name} created (sync deferred)")
return
if index_status != "VALID":
print(f"⚠️ Index {index_name} is {index_status}. Recreating...")
try:
cursor.execute(f"DROP INDEX {index_name}")
conn.commit()
except Exception as e:
print(f"❌ Failed to drop index {index_name}: {e}")
cursor.close()
return
cursor.execute(f"""
CREATE INDEX {index_name}
ON {table_name} ({column_name})
INDEXTYPE IS CTXSYS.CONTEXT
""")
conn.commit()
cursor.close()
print(f"♻️ Index {index_name} recreated (sync deferred)")
return
print(f"🔄 Syncing Oracle Text index: {index_name}")
try:
cursor.execute(f"""
BEGIN
CTX_DDL.SYNC_INDEX('{index_name}', '2M');
END;
""")
conn.commit()
print(f"✅ Index {index_name} synced")
except Exception as e:
print(f"⚠️ Sync failed for {index_name}: {e}")
print("⚠️ Continuing without breaking pipeline")
cursor.close()
def create_tables_if_not_exist(conn):
cursor = conn.cursor()
try:
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE ENTITIES_{GRAPH_NAME} (
ID NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
NAME VARCHAR2(500)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE RELATIONS_{GRAPH_NAME} (
ID NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
SOURCE_ID NUMBER,
TARGET_ID NUMBER,
RELATION_TYPE VARCHAR2(100),
SOURCE_TEXT VARCHAR2(4000)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
conn.commit()
print("✅ ENTITIES and RELATIONS tables created or already exist.")
except Exception as e:
print(f"[ERROR] Failed to create tables: {e}")
finally:
cursor.close()
create_tables_if_not_exist(oracle_conn)
# IF GRAPH INDEX PROBLEM, Reindex
# ensure_oracle_text_index(
# oracle_conn,
# "ENTITIES_" + GRAPH_NAME,
# "NAME",
# "IDX_ENT_" + GRAPH_NAME + "_NAME"
# )
#
# ensure_oracle_text_index(
# oracle_conn,
# "RELATIONS_" + GRAPH_NAME,
# "RELATION_TYPE",
# "IDX_REL_" + GRAPH_NAME + "_RELTYPE"
# )
def create_knowledge_graph(chunks):
cursor = oracle_conn.cursor()
# Creates graph if it does not exist
try:
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE PROPERTY GRAPH {GRAPH_NAME}
VERTEX TABLES (ENTITIES_{GRAPH_NAME}
KEY (ID)
LABEL ENTITIES
PROPERTIES (NAME))
EDGE TABLES (RELATIONS_{GRAPH_NAME}
KEY (ID)
SOURCE KEY (SOURCE_ID) REFERENCES ENTITIES_{GRAPH_NAME}(ID)
DESTINATION KEY (TARGET_ID) REFERENCES ENTITIES_{GRAPH_NAME}(ID)
LABEL RELATIONS
PROPERTIES (RELATION_TYPE, SOURCE_TEXT))
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -55358 THEN -- ORA-55358: Graph already exists
RAISE;
END IF;
END;
""")
print(f"🧠 Graph '{GRAPH_NAME}' created or already exists.")
except Exception as e:
print(f"[GRAPH ERROR] Failed to create graph: {e}")
# Inserting vertices and edges into the tables
for doc in chunks:
text = doc.page_content
source = doc.metadata.get("source", "unknown")
if not text.strip():
continue
prompt = f"""
You are extracting structured RFP evidence from technical documentation.
Given the text below, identify ONLY explicit, verifiable facts.
Text:
{text}
Extract triples in ONE of the following formats ONLY:
1. REQUIREMENT -[HAS_SUBJECT]-> <subject>
2. REQUIREMENT -[HAS_METRIC]-> <metric name>
3. REQUIREMENT -[HAS_VALUE]-> <exact value or limit>
4. REQUIREMENT -[SUPPORTED_BY]-> <document section or sentence>
Rules:
- Use REQUIREMENT as the source entity
- Use UPPERCASE relation names
- Do NOT infer or assume
- If nothing explicit is found, return NONE
"""
try:
response = llm_for_rag.invoke(prompt)
result = response.content.strip()
except Exception as e:
print(f"[ERROR] Gen AI call error: {e}")
continue
if result.upper() == "NONE":
continue
triples = result.splitlines()
for triple in triples:
parts = triple.split("-[")
if len(parts) != 2:
continue
right_part = parts[1].split("]->")
if len(right_part) != 2:
continue
raw_relation, entity2 = right_part
relation = re.sub(r'\W+', '_', raw_relation.strip().upper())
entity1 = parts[0].strip()
entity2 = entity2.strip()
if entity1.upper() != "REQUIREMENT":
entity1 = "REQUIREMENT"
try:
# Insertion of entities (with existence check)
cursor.execute(f"MERGE INTO ENTITIES_{GRAPH_NAME} e USING (SELECT :name AS NAME FROM dual) src ON (e.name = src.name) WHEN NOT MATCHED THEN INSERT (NAME) VALUES (:name)", [entity1, entity1])
cursor.execute(f"MERGE INTO ENTITIES_{GRAPH_NAME} e USING (SELECT :name AS NAME FROM dual) src ON (e.name = src.name) WHEN NOT MATCHED THEN INSERT (NAME) VALUES (:name)", [entity2, entity2])
# Retrieve the IDs
cursor.execute(f"SELECT ID FROM ENTITIES_{GRAPH_NAME} WHERE NAME = :name", [entity1])
source_id = cursor.fetchone()[0]
cursor.execute(f"SELECT ID FROM ENTITIES_{GRAPH_NAME} WHERE NAME = :name", [entity2])
target_id = cursor.fetchone()[0]
# Create relations
cursor.execute(f"""
INSERT INTO RELATIONS_{GRAPH_NAME} (SOURCE_ID, TARGET_ID, RELATION_TYPE, SOURCE_TEXT)
VALUES (:src, :tgt, :rel, :txt)
""", [source_id, target_id, relation, source])
print(f"{entity1} -[{relation}]-> {entity2}")
except Exception as e:
print(f"[INSERT ERROR] {e}")
oracle_conn.commit()
cursor.close()
print("💾 Knowledge graph updated.")
def parse_rfp_requirement(question: str) -> dict:
prompt = f"""
You are an RFP requirement NORMALIZER for Oracle Cloud Infrastructure (OCI).
Your job is NOT to summarize the question.
Your job is to STRUCTURE the requirement so it can be searched in:
- Technical documentation
- Knowledge Graph
- Vector databases
────────────────────────────────
STEP 1 — Understand the requirement
────────────────────────────────
From the question, identify:
1. The PRIMARY OCI SERVICE CATEGORY involved
2. The MAIN TECHNICAL SUBJECT (short and precise)
3. The EXPECTED TECHNICAL CAPABILITY or CONDITION (if any)
IMPORTANT:
- Ignore marketing language
- Ignore phrases like "possui", "permite", "oferece"
- Focus ONLY on concrete technical meaning
────────────────────────────────
STEP 2 — Mandatory service classification
────────────────────────────────
You MUST choose ONE primary technology from the list below
and INCLUDE IT EXPLICITLY in the keywords list.
Choose the MOST SPECIFIC applicable item.
☁️ OCI SERVICE CATEGORIES (MANDATORY)
🖥️ Compute (IaaS)
- compute
- compute instances
- virtual machine
- bare metal
- gpu
- hpc
- confidential computing
- autoscaling
- instance pools
- live migration
- ocvs (vmware)
- arm compute
💾 Storage
- object storage
- archive storage
- block volume
- boot volume
- file storage
- volume groups
- snapshots
- replication
🌐 Networking
- vcn
- load balancer
- network load balancer
- dns
- fastconnect
- drg
- firewall
- waf
- bastion
- vtap
- private endpoint
🔐 Security & Identity
- iam
- compartments
- policies
- oci vault
- key management
- certificates
- secrets
- cloud guard
- security zones
- vulnerability scanning
- data safe
- audit
- logging
- shielded instances
📦 Containers & Cloud Native
- oke
- kubernetes
- container registry
- api gateway
- functions
- streaming
- events
- service mesh
🗄️ Databases
- autonomous database
- adw
- atp
- base database
- exadata
- mysql
- nosql
📊 Analytics & AI
- analytics cloud
- data science
- data catalog
- big data service
- generative ai
- ai services
────────────────────────────────
STEP 3 — Keywords rules (CRITICAL)
────────────────────────────────
The "keywords" field MUST:
- ALWAYS include at least ONE OCI service keyword (e.g. "compute", "object storage", "oke")
- Include technical capability terms (e.g. resize, autoscaling, encryption)
- NEVER include generic verbs (permitir, possuir, oferecer)
- NEVER include full sentences
────────────────────────────────
STEP 4 — Output rules
────────────────────────────────
Return ONLY valid JSON between <json> tags.
Do NOT explain your reasoning.
Question:
{question}
<json>
{{
"requirement_type": "COMPLIANCE | FUNCTIONAL | NON_FUNCTIONAL",
"subject": "<short technical subject, e.g. 'Compute Instances'>",
"expected_value": "<technical capability or condition, or empty string>",
"decision_type": "YES_NO | YES_NO_PARTIAL",
"keywords": ["mandatory_oci_service", "technical_capability", "additional_term"]
}}
</json>
"""
resp = llm_for_rag.invoke(prompt)
raw = resp.content.strip()
try:
# remove ```json ``` ou ``` ```
raw = re.sub(r"```json|```", "", raw).strip()
match = re.search(r"<json>\s*(\{.*?\})\s*</json>", raw, re.DOTALL)
if not match:
raise ValueError("No JSON block found")
json_text = match.group(1)
return json.loads(json_text)
except Exception as e:
print("⚠️ RFP PARSER FAILED")
print("RAW RESPONSE:")
print(raw)
return {
"requirement_type": "UNKNOWN",
"subject": question,
"expected_value": "",
"decision_type": "YES_NO_PARTIAL",
"keywords": re.findall(r"\b\w+\b", question.lower())[:5]
}
def extract_graph_keywords_from_requirement(req: dict) -> str:
keywords = set(req.get("keywords", []))
if req.get("subject"):
keywords.add(req["subject"].lower())
if req.get("expected_value"):
keywords.add(str(req["expected_value"]).lower())
return ", ".join(sorted(keywords))
def build_oracle_text_query(text: str) -> str | None:
ORACLE_TEXT_STOPWORDS = {
"and", "or", "the", "with", "between", "of", "to", "for",
"in", "on", "by", "is", "are", "was", "were", "be", "within", "between"
}
tokens = []
text = text.lower()
text = re.sub(r"[^a-z0-9\s]", " ", text)
for token in text.split():
if len(token) >= 4 and token not in ORACLE_TEXT_STOPWORDS:
tokens.append(f"{token}")
tokens = sorted(set(tokens))
return " OR ".join(tokens) if tokens else None
def query_knowledge_graph(raw_keywords: str, top_k: int = 20, min_score: int = 50):
cursor = oracle_conn.cursor()
safe_query = build_oracle_text_query(raw_keywords)
if not safe_query:
cursor.close()
return []
sql = f"""
SELECT
e1.NAME AS source_name,
r.RELATION_TYPE,
e2.NAME AS target_name,
GREATEST(SCORE(1), SCORE(2)) AS relevance_score
FROM RELATIONS_{GRAPH_NAME} r
JOIN ENTITIES_{GRAPH_NAME} e1 ON e1.ID = r.SOURCE_ID
JOIN ENTITIES_{GRAPH_NAME} e2 ON e2.ID = r.TARGET_ID
WHERE e1.NAME = 'REQUIREMENT'
AND (
CONTAINS(e2.NAME, '{safe_query}', 1) > 0
OR CONTAINS(r.RELATION_TYPE, '{safe_query}', 2) > 0
)
AND GREATEST(SCORE(1), SCORE(2)) >= {min_score}
ORDER BY relevance_score DESC
FETCH FIRST {top_k} ROWS ONLY
"""
print("🔎 GRAPH QUERY (ranked):")
print(sql)
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
print(f"📊 GRAPH FACTS (top {top_k}):")
for s, r, t, sc in rows:
print(f" [{sc:>3}] REQUIREMENT -[{r}]-> {t}")
# mantém compatibilidade com o pipeline atual
return [(s, r, t) for s, r, t, _ in rows]
# RE-RANK
def extract_terms_from_graph_text(graph_context):
if not graph_context:
return set()
if isinstance(graph_context, list):
terms = set()
for row in graph_context:
for col in row:
if isinstance(col, str):
terms.add(col.lower())
return terms
if isinstance(graph_context, str):
terms = set()
pattern = re.findall(r"([\w\s]+)-$begin:math:display$\[\\w\_\]\+$end:math:display$->([\w\s]+)", graph_context)
for e1, e2 in pattern:
terms.add(e1.strip().lower())
terms.add(e2.strip().lower())
return terms
return set()
def rerank_documents_with_graph_terms(docs, query, graph_terms):
query_terms = set(re.findall(r'\b\w+\b', query.lower()))
all_terms = query_terms.union(graph_terms)
scored_docs = []
for doc in docs:
doc_text = doc.page_content.lower()
score = sum(1 for term in all_terms if term in doc_text)
scored_docs.append((score, doc))
top_docs = sorted(scored_docs, key=lambda x: x[0], reverse=True)[:5]
return [doc.page_content for _, doc in top_docs]
# SEMANTIC CHUNKING
def split_llm_output_into_chapters(llm_text):
chapters = []
current_chapter = []
lines = llm_text.splitlines()
for line in lines:
if re.match(chapter_separator_regex, line):
if current_chapter:
chapters.append("\n".join(current_chapter).strip())
current_chapter = [line]
else:
current_chapter.append(line)
if current_chapter:
chapters.append("\n".join(current_chapter).strip())
return chapters
def semantic_chunking(text):
prompt = f"""
You received the following text extracted via OCR:
{text}
Your task:
1. Identify headings (short uppercase or bold lines, no period at the end) putting the Product Name (Application Name) and the Subject
2. Separate paragraphs by heading
3. Indicate columns with [COLUMN 1], [COLUMN 2] if present
4. Indicate tables with [TABLE] in markdown format
5. ALWAYS PUT THE URL if there is a Reference
6. Indicate explicity metrics (if it exists)
Examples:
- Oracle Financial Services RTO is 1 hour
- The Oracle Banking Supply Chain Finance Cloud Service A maximum number of 10K Hosted Transactions
- The Oracle Banking Payments Cloud Service, Additional Non-Production Environment: You may purchase up to a maximum of ten (10) additional Non-Production Environments
"""
get_out = False
while not get_out:
try:
response = llm_for_rag.invoke(prompt)
get_out = True
except:
print("[ERROR] Gen AI call error")
return response
def read_pdfs(pdf_path):
if "-ocr" in pdf_path:
doc_pages = PyMuPDFLoader(str(pdf_path)).load()
else:
doc_pages = UnstructuredPDFLoader(str(pdf_path)).load()
full_text = "\n".join([page.page_content for page in doc_pages])
return full_text
def smart_split_text(text, max_chunk_size=10_000):
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = min(start + max_chunk_size, text_length)
split_point = max(
text.rfind('.', start, end),
text.rfind('!', start, end),
text.rfind('?', start, end),
text.rfind('\n\n', start, end)
)
if split_point == -1 or split_point <= start:
split_point = end
else:
split_point += 1
chunk = text[start:split_point].strip()
if chunk:
chunks.append(chunk)
start = split_point
return chunks
def load_previously_indexed_docs():
if os.path.exists(PROCESSED_DOCS_FILE):
with open(PROCESSED_DOCS_FILE, "rb") as f:
return pickle.load(f)
return set()
def save_indexed_docs(docs):
with open(PROCESSED_DOCS_FILE, "wb") as f:
pickle.dump(docs, f)
# =========================
# Main Function
# =========================
def chat():
PDF_FOLDER = Path("docs") # pasta onde estão os PDFs
pdf_paths = sorted(
str(p) for p in PDF_FOLDER.glob("*.pdf")
)
already_indexed_docs = load_previously_indexed_docs()
updated_docs = set()
try:
vectorstore = FAISS.load_local(INDEX_PATH, embeddings, allow_dangerous_deserialization=True)
print("✔️ FAISS index loaded.")
except Exception:
print("⚠️ FAISS index not found, creating a new one.")
vectorstore = None
new_chunks = []
for pdf_path in tqdm(pdf_paths, desc=f"📄 Processing PDFs"):
print(f" {os.path.basename(pdf_path)}")
if pdf_path in already_indexed_docs:
print(f"✅ Document already indexed: {pdf_path}")
continue
full_text = read_pdfs(pdf_path=pdf_path)
path_url = filename_to_url(os.path.basename(pdf_path))
text_chunks = smart_split_text(full_text, max_chunk_size=10_000)
overflow_buffer = ""
for chunk in tqdm(text_chunks, desc=f"📄 Processing text chunks", dynamic_ncols=True, leave=False):
current_text = overflow_buffer + chunk
treated_text = semantic_chunking(current_text)
if hasattr(treated_text, "content"):
chapters = split_llm_output_into_chapters(treated_text.content)
last_chapter = chapters[-1] if chapters else ""
if last_chapter and not last_chapter.strip().endswith((".", "!", "?", "\n\n")):
print("📌 Last chapter seems incomplete, saving for the next cycle")
overflow_buffer = last_chapter
chapters = chapters[:-1]
else:
overflow_buffer = ""
for chapter_text in chapters:
reference_url = "Reference: " + path_url
chapter_text = chapter_text + "\n" + reference_url
doc = Document(page_content=chapter_text, metadata={"source": pdf_path, "reference": reference_url})
new_chunks.append(doc)
print(f"✅ New chapter indexed:\n{chapter_text}...\n")
else:
print(f"[ERROR] semantic_chunking returned unexpected type: {type(treated_text)}")
updated_docs.add(str(pdf_path))
if new_chunks:
if vectorstore:
vectorstore.add_documents(new_chunks)
else:
vectorstore = FAISS.from_documents(new_chunks, embedding=embeddings)
vectorstore.save_local(INDEX_PATH)
save_indexed_docs(already_indexed_docs.union(updated_docs))
print(f"💾 {len(new_chunks)} chunks added to FAISS index.")
print("🧠 Building knowledge graph...")
create_knowledge_graph(new_chunks)
else:
print("📁 No new documents to index.")
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 50, "fetch_k": 100})
RFP_DECISION_TEMPLATE = """
You are answering an RFP requirement with risk awareness.
Requirement:
Type: {requirement_type}
Subject: {subject}
Expected value: {expected_value}
Document evidence:
{text_context}
Graph evidence:
{graph_context}
Decision rules:
- Answer ONLY with YES, NO or PARTIAL
- If value differs, answer PARTIAL
- If not found, answer NO
Interpretation rules (MANDATORY):
- If a capability is supported but requires reboot, downtime, or restart, it STILL counts as YES unless the requirement explicitly forbids it.
- "Servidor em funcionamento" means the resource exists and is active before the operation, not that it must remain online without interruption.
- Only answer NO if the operation is NOT supported at all or requires destroying and recreating the resource.
- Reboot, restart, or brief unavailability MUST NOT be interpreted as lack of support.
Confidence rules:
- HIGH: Explicit evidence directly answers the requirement
- MEDIUM: Evidence partially matches or requires light interpretation
- LOW: Requirement is ambiguous OR evidence is indirect OR missing
Ambiguity rules:
- ambiguity_detected = true if:
- The requirement can be interpreted in more than one way
- Keywords are vague (e.g. "support", "integration", "capability")
- Evidence does not clearly bind to subject + expected value
Service scope rules (MANDATORY):
- Evidence is valid ONLY if it refers to the SAME service category as the requirement.
- Do NOT use evidence from a different Oracle Cloud service to justify another.
OUTPUT CONSTRAINTS (MANDATORY):
- Return ONLY a valid JSON object
- Do NOT include explanations, comments, markdown, lists, or code fences
- Do NOT write any text before or after the JSON
- The response must start with an opening curly brace and end with a closing curly brace
JSON schema (return exactly this structure):
{{
"answer": "YES | NO | PARTIAL",
"confidence": "HIGH | MEDIUM | LOW",
"ambiguity_detected": true,
"confidence_reason": "<short reason>",
"justification": "<short factual explanation>",
"evidence": [
{{
"quote": "<exact text>",
"source": "<document or section if available>"
}}
]
}}
"""
prompt = PromptTemplate.from_template(RFP_DECISION_TEMPLATE)
def get_context_from_requirement(req: dict):
query_terms = extract_graph_keywords_from_requirement(req)
docs = retriever.invoke(query_terms)
graph_context = query_knowledge_graph(query_terms)
return {
"text_context": "\n\n".join(doc.page_content for doc in docs),
"graph_context": graph_context,
"requirement_type": req["requirement_type"],
"subject": req["subject"],
"expected_value": req.get("expected_value", "")
}
parse_requirement_runnable = RunnableLambda(
lambda q: parse_rfp_requirement(q)
)
chain = (
parse_requirement_runnable
| RunnableMap({
"text_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["text_context"]
),
"graph_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["graph_context"]
),
"requirement_type": lambda req: req["requirement_type"],
"subject": lambda req: req["subject"],
"expected_value": lambda req: req.get("expected_value", "")
})
| prompt
| llm
| StrOutputParser()
)
print("✅ READY")
while True:
query = input("❓ Question (or 'quit' to exit): ")
if query.lower() == "quit":
break
response = chain.invoke(query)
print("\n📜 RESPONSE:\n")
print(response)
print("\n" + "=" * 80 + "\n")
def get_context_from_requirement(req: dict):
query_terms = extract_graph_keywords_from_requirement(req)
docs = retriever.invoke(query_terms)
graph_context = query_knowledge_graph(query_terms)
graph_terms = extract_terms_from_graph_text(graph_context)
reranked_chunks = rerank_documents_with_graph_terms(
docs,
query_terms,
graph_terms
)
return {
"text_context": "\n\n".join(reranked_chunks),
"graph_context": graph_context,
"requirement_type": req["requirement_type"],
"subject": req["subject"],
"expected_value": req.get("expected_value", "")
}
try:
vectorstore = FAISS.load_local(
INDEX_PATH,
embeddings,
allow_dangerous_deserialization=True
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 50, "fetch_k": 100}
)
except:
print("No Faiss")
RFP_DECISION_TEMPLATE = """
You are answering an RFP requirement with risk awareness.
Requirement:
Type: {requirement_type}
Subject: {subject}
Expected value: {expected_value}
Document evidence:
{text_context}
Graph evidence:
{graph_context}
Decision rules:
- Answer ONLY with YES, NO or PARTIAL
- If value differs, answer PARTIAL
- If not found, answer NO
Interpretation rules (MANDATORY):
- If a capability is supported but requires reboot, downtime, or restart, it STILL counts as YES unless the requirement explicitly forbids it.
- "Servidor em funcionamento" means the resource exists and is active before the operation, not that it must remain online without interruption.
- Only answer NO if the operation is NOT supported at all or requires destroying and recreating the resource.
- Reboot, restart, or brief unavailability MUST NOT be interpreted as lack of support.
Confidence rules:
- HIGH: Explicit evidence directly answers the requirement
- MEDIUM: Evidence partially matches or requires light interpretation
- LOW: Requirement is ambiguous OR evidence is indirect OR missing
Ambiguity rules:
- ambiguity_detected = true if:
- The requirement can be interpreted in more than one way
- Keywords are vague (e.g. "support", "integration", "capability")
- Evidence does not clearly bind to subject + expected value
Service scope rules (MANDATORY):
- Evidence is valid ONLY if it refers to the SAME service category as the requirement.
- Do NOT use evidence from a different Oracle Cloud service to justify another.
- PaaS services (e.g. Autonomous Database) MUST NOT be used as evidence for IaaS/Compute requirements.
- If the requirement is under Compute/IaaS, evidence MUST explicitly mention Compute, IaaS, VM, Bare Metal, or equivalent infrastructure services.
- Cross-service inference (vendor-level capability applied to another service) is strictly forbidden.
- Get all URL references or sources as evidences
OUTPUT CONSTRAINTS (MANDATORY):
- Return ONLY a valid JSON object
- Do NOT include explanations, comments, markdown, lists, or code fences
- Do NOT write any text before or after the JSON
- The response must start with an opening curly brace and end with a closing curly brace
JSON schema (return exactly this structure):
{{
"answer": "YES | NO | PARTIAL",
"confidence": "HIGH | MEDIUM | LOW",
"ambiguity_detected": true,
"confidence_reason": "<short reason>",
"justification": "<short factual explanation>",
"evidence": [
{{
"quote": "<exact text>",
"source": "<document or section if available>"
}}
]
}}
"""
prompt = PromptTemplate.from_template(RFP_DECISION_TEMPLATE)
parse_requirement_runnable = RunnableLambda(
lambda q: parse_rfp_requirement(q)
)
chain = (
parse_requirement_runnable
| RunnableMap({
"text_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["text_context"]
),
"graph_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["graph_context"]
),
"requirement_type": lambda req: req["requirement_type"],
"subject": lambda req: req["subject"],
"expected_value": lambda req: req.get("expected_value", "")
})
| prompt
| llm
| StrOutputParser()
)
def answer_question(question: str) -> str:
return chain.invoke(question)
# 🚀 Run
if __name__ == "__main__":
chat()