Files
rfp_response_automation/files/graphrag_rerank.py
2026-01-14 17:12:23 -03:00

1104 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from langchain_community.chat_models.oci_generative_ai import ChatOCIGenAI
from langchain_core.prompts import PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain_community.embeddings import OCIGenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema.runnable import RunnableMap
from langchain_community.document_loaders import UnstructuredPDFLoader, PyMuPDFLoader
from langchain_core.documents import Document
from langchain_core.runnables import RunnableLambda
from pathlib import Path
from tqdm import tqdm
import os
import pickle
import re
import atexit
import oracledb
import json
import base64
# =========================
# Oracle Autonomous Configuration
# =========================
WALLET_PATH = "Wallet_oradb23ai"
DB_ALIAS = "oradb23ai_high"
USERNAME = "admin"
PASSWORD = "**********"
os.environ["TNS_ADMIN"] = WALLET_PATH
# =========================
# Global Configurations
# =========================
INDEX_PATH = "./faiss_index"
PROCESSED_DOCS_FILE = os.path.join(INDEX_PATH, "processed_docs.pkl")
chapter_separator_regex = r"^(#{1,6} .+|\*\*.+\*\*)$"
GRAPH_NAME = "OCI_GRAPH"
# =========================
# LLM Definitions
# =========================
llm = ChatOCIGenAI(
model_id="meta.llama-3.1-405b-instruct",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
model_kwargs={"temperature": 0, "top_p": 1, "max_tokens": 4000},
)
llm_for_rag = ChatOCIGenAI(
model_id="meta.llama-3.1-405b-instruct",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
)
embeddings = OCIGenAIEmbeddings(
model_id="cohere.embed-multilingual-v3.0",
service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com",
compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
auth_profile="DEFAULT",
)
oracle_conn = oracledb.connect(
user=USERNAME,
password=PASSWORD,
dsn=DB_ALIAS,
config_dir=WALLET_PATH,
wallet_location=WALLET_PATH,
wallet_password=PASSWORD
)
atexit.register(lambda: oracle_conn.close())
def filename_to_url(filename: str, suffix: str = ".pdf") -> str:
if filename.endswith(suffix):
filename = filename[: -len(suffix)]
decoded = base64.urlsafe_b64decode(filename.encode("ascii"))
return decoded.decode("utf-8")
# =========================
# Oracle Graph Client
# =========================
def ensure_oracle_text_index(
conn,
table_name: str,
column_name: str,
index_name: str
):
cursor = conn.cursor()
cursor.execute("""
SELECT status
FROM user_indexes
WHERE index_name = :idx
""", {"idx": index_name.upper()})
row = cursor.fetchone()
index_exists = row is not None
index_status = row[0] if row else None
if not index_exists:
print(f"🛠️ Creating Oracle Text index {index_name}")
cursor.execute(f"""
CREATE INDEX {index_name}
ON {table_name} ({column_name})
INDEXTYPE IS CTXSYS.CONTEXT
""")
conn.commit()
cursor.close()
print(f"✅ Index {index_name} created (sync deferred)")
return
if index_status != "VALID":
print(f"⚠️ Index {index_name} is {index_status}. Recreating...")
try:
cursor.execute(f"DROP INDEX {index_name}")
conn.commit()
except Exception as e:
print(f"❌ Failed to drop index {index_name}: {e}")
cursor.close()
return
cursor.execute(f"""
CREATE INDEX {index_name}
ON {table_name} ({column_name})
INDEXTYPE IS CTXSYS.CONTEXT
""")
conn.commit()
cursor.close()
print(f"♻️ Index {index_name} recreated (sync deferred)")
return
print(f"🔄 Syncing Oracle Text index: {index_name}")
try:
cursor.execute(f"""
BEGIN
CTX_DDL.SYNC_INDEX('{index_name}', '2M');
END;
""")
conn.commit()
print(f"✅ Index {index_name} synced")
except Exception as e:
print(f"⚠️ Sync failed for {index_name}: {e}")
print("⚠️ Continuing without breaking pipeline")
cursor.close()
def create_tables_if_not_exist(conn):
cursor = conn.cursor()
try:
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE ENTITIES_{GRAPH_NAME} (
ID NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
NAME VARCHAR2(500)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE RELATIONS_{GRAPH_NAME} (
ID NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
SOURCE_ID NUMBER,
TARGET_ID NUMBER,
RELATION_TYPE VARCHAR2(100),
SOURCE_TEXT VARCHAR2(4000)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
conn.commit()
print("✅ ENTITIES and RELATIONS tables created or already exist.")
except Exception as e:
print(f"[ERROR] Failed to create tables: {e}")
finally:
cursor.close()
create_tables_if_not_exist(oracle_conn)
# IF GRAPH INDEX PROBLEM, Reindex
# ensure_oracle_text_index(
# oracle_conn,
# "ENTITIES_" + GRAPH_NAME,
# "NAME",
# "IDX_ENT_" + GRAPH_NAME + "_NAME"
# )
#
# ensure_oracle_text_index(
# oracle_conn,
# "RELATIONS_" + GRAPH_NAME,
# "RELATION_TYPE",
# "IDX_REL_" + GRAPH_NAME + "_RELTYPE"
# )
def create_knowledge_graph(chunks):
cursor = oracle_conn.cursor()
# Creates graph if it does not exist
try:
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE PROPERTY GRAPH {GRAPH_NAME}
VERTEX TABLES (ENTITIES_{GRAPH_NAME}
KEY (ID)
LABEL ENTITIES
PROPERTIES (NAME))
EDGE TABLES (RELATIONS_{GRAPH_NAME}
KEY (ID)
SOURCE KEY (SOURCE_ID) REFERENCES ENTITIES_{GRAPH_NAME}(ID)
DESTINATION KEY (TARGET_ID) REFERENCES ENTITIES_{GRAPH_NAME}(ID)
LABEL RELATIONS
PROPERTIES (RELATION_TYPE, SOURCE_TEXT))
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -55358 THEN -- ORA-55358: Graph already exists
RAISE;
END IF;
END;
""")
print(f"🧠 Graph '{GRAPH_NAME}' created or already exists.")
except Exception as e:
print(f"[GRAPH ERROR] Failed to create graph: {e}")
# Inserting vertices and edges into the tables
for doc in chunks:
text = doc.page_content
source = doc.metadata.get("source", "unknown")
if not text.strip():
continue
prompt = f"""
You are extracting structured RFP evidence from technical documentation.
Given the text below, identify ONLY explicit, verifiable facts.
Text:
{text}
Extract triples in ONE of the following formats ONLY:
1. REQUIREMENT -[HAS_SUBJECT]-> <subject>
2. REQUIREMENT -[HAS_METRIC]-> <metric name>
3. REQUIREMENT -[HAS_VALUE]-> <exact value or limit>
4. REQUIREMENT -[SUPPORTED_BY]-> <document section or sentence>
Rules:
- Use REQUIREMENT as the source entity
- Use UPPERCASE relation names
- Do NOT infer or assume
- If nothing explicit is found, return NONE
"""
try:
response = llm_for_rag.invoke(prompt)
result = response.content.strip()
except Exception as e:
print(f"[ERROR] Gen AI call error: {e}")
continue
if result.upper() == "NONE":
continue
triples = result.splitlines()
for triple in triples:
parts = triple.split("-[")
if len(parts) != 2:
continue
right_part = parts[1].split("]->")
if len(right_part) != 2:
continue
raw_relation, entity2 = right_part
relation = re.sub(r'\W+', '_', raw_relation.strip().upper())
entity1 = parts[0].strip()
entity2 = entity2.strip()
if entity1.upper() != "REQUIREMENT":
entity1 = "REQUIREMENT"
try:
# Insertion of entities (with existence check)
cursor.execute(f"MERGE INTO ENTITIES_{GRAPH_NAME} e USING (SELECT :name AS NAME FROM dual) src ON (e.name = src.name) WHEN NOT MATCHED THEN INSERT (NAME) VALUES (:name)", [entity1, entity1])
cursor.execute(f"MERGE INTO ENTITIES_{GRAPH_NAME} e USING (SELECT :name AS NAME FROM dual) src ON (e.name = src.name) WHEN NOT MATCHED THEN INSERT (NAME) VALUES (:name)", [entity2, entity2])
# Retrieve the IDs
cursor.execute(f"SELECT ID FROM ENTITIES_{GRAPH_NAME} WHERE NAME = :name", [entity1])
source_id = cursor.fetchone()[0]
cursor.execute(f"SELECT ID FROM ENTITIES_{GRAPH_NAME} WHERE NAME = :name", [entity2])
target_id = cursor.fetchone()[0]
# Create relations
cursor.execute(f"""
INSERT INTO RELATIONS_{GRAPH_NAME} (SOURCE_ID, TARGET_ID, RELATION_TYPE, SOURCE_TEXT)
VALUES (:src, :tgt, :rel, :txt)
""", [source_id, target_id, relation, source])
print(f"{entity1} -[{relation}]-> {entity2}")
except Exception as e:
print(f"[INSERT ERROR] {e}")
oracle_conn.commit()
cursor.close()
print("💾 Knowledge graph updated.")
def parse_rfp_requirement(question: str) -> dict:
prompt = f"""
You are an RFP requirement NORMALIZER for Oracle Cloud Infrastructure (OCI).
Your job is NOT to summarize the question.
Your job is to STRUCTURE the requirement so it can be searched in:
- Technical documentation
- Knowledge Graph
- Vector databases
────────────────────────────────
STEP 1 — Understand the requirement
────────────────────────────────
From the question, identify:
1. The PRIMARY OCI SERVICE CATEGORY involved
2. The MAIN TECHNICAL SUBJECT (short and precise)
3. The EXPECTED TECHNICAL CAPABILITY or CONDITION (if any)
IMPORTANT:
- Ignore marketing language
- Ignore phrases like "possui", "permite", "oferece"
- Focus ONLY on concrete technical meaning
────────────────────────────────
STEP 2 — Mandatory service classification
────────────────────────────────
You MUST choose ONE primary technology from the list below
and INCLUDE IT EXPLICITLY in the keywords list.
Choose the MOST SPECIFIC applicable item.
Serviços da Oracle Cloud Infrastructure (OCI):
Compute (IaaS)
• Compute Instances (VM)
• Bare Metal Instances
• Dedicated VM Hosts
• GPU Instances
• Confidential Computing
• Capacity Reservations
• Autoscaling (Instance Pools)
• Live Migration
• Oracle Cloud VMware Solution (OCVS)
• HPC (High Performance Computing)
• Arm-based Compute (Ampere)
Storage
Object Storage
• Object Storage
• Object Storage Archive
• Pre-Authenticated Requests
• Replication
Block & File
• Block Volume
• Boot Volume
• Volume Groups
• File Storage
• File Storage Snapshots
• Data Transfer Service
Networking
• Virtual Cloud Network (VCN)
• Subnets
• Internet Gateway
• NAT Gateway
• Service Gateway
• Dynamic Routing Gateway (DRG)
• FastConnect
• Load Balancer (L7 / L4)
• Network Load Balancer
• DNS
• Traffic Management Steering Policies
• IP Address Management (IPAM)
• Network Firewall
• Web Application Firewall (WAF)
• Bastion
• Capture Traffic (VTAP)
• Private Endpoints
Security, Identity & Compliance
• Identity and Access Management (IAM)
• Compartments
• Policies
• OCI Vault
• OCI Key Management (KMS)
• OCI Certificates
• OCI Secrets
• OCI Bastion
• Cloud Guard
• Security Zones
• Vulnerability Scanning Service
• Data Safe
• Logging
• Audit
• OS Management / OS Management Hub
• Shielded Instances
• Zero Trust Packet Routing
Databases
Autonomous
• Autonomous Database (ATP)
• Autonomous Data Warehouse (ADW)
• Autonomous JSON Database
Databases Gerenciados
• Oracle Database Service
• Oracle Exadata Database Service
• Exadata Cloud@Customer
• Base Database Service
• MySQL Database Service
• MySQL HeatWave
• NoSQL Database Cloud Service
• TimesTen
• PostgreSQL (OCI managed)
• MongoDB API (OCI NoSQL compatibility)
Analytics & BI
• Oracle Analytics Cloud (OAC)
• OCI Data Catalog
• OCI Data Integration
• OCI Streaming Analytics
• OCI GoldenGate
• OCI Big Data Service (Hadoop/Spark)
• OCI Data Science
• OCI AI Anomaly Detection
• OCI AI Forecasting
AI & Machine Learning
Generative AI
• OCI Generative AI
• OCI Generative AI Agents
• OCI Generative AI RAG
• OCI Generative AI Embeddings
• OCI AI Gateway (OpenAI-compatible)
AI Services
• OCI Vision (OCR, image analysis)
• OCI Speech (STT / TTS)
• OCI Language (NLP)
• OCI Document Understanding
• OCI Anomaly Detection
• OCI Forecasting
• OCI Data Labeling
Containers & Cloud Native
• OCI Container Engine for Kubernetes (OKE)
• Container Registry (OCIR)
• Service Mesh
• API Gateway
• OCI Functions (FaaS)
• OCI Streaming (Kafka-compatible)
• OCI Queue
• OCI Events
• OCI Resource Manager (Terraform)
Integration & Messaging
• OCI Integration Cloud (OIC)
• OCI Service Connector Hub
• OCI Streaming
• OCI GoldenGate
• OCI API Gateway
• OCI Events Service
• OCI Queue
• Real Applications Clusters (RAC)
Developer Services
• OCI DevOps (CI/CD)
• OCI Code Repository
• OCI Build Pipelines
• OCI Artifact Registry
• OCI Logging Analytics
• OCI Monitoring
• OCI Notifications
• OCI Bastion
• OCI CLI
• OCI SDKs
Observability & Management
• OCI Monitoring
• OCI Alarms
• OCI Logging
• OCI Logging Analytics
• OCI Application Performance Monitoring (APM)
• OCI Operations Insights
• OCI Management Agent
• OCI Resource Discovery
Enterprise & Hybrid
• Oracle Cloud@Customer
• Exadata Cloud@Customer
• Compute Cloud@Customer
• Dedicated Region Cloud@Customer
• OCI Roving Edge Infrastructure
• OCI Alloy
Governance & FinOps
• OCI Budgets
• Cost Analysis
• Usage Reports
• Quotas
• Tagging
• Compartments
• Resource Search
Regions & Edge
• OCI Regions (Commercial, Government, EU Sovereign)
• OCI Edge Services
• OCI Roving Edge
• OCI Dedicated Region
────────────────────────────────
STEP 3 — Keywords rules (CRITICAL)
────────────────────────────────
The "keywords" field MUST:
- ALWAYS include at least ONE OCI service keyword (e.g. "compute", "object storage", "oke")
- Include technical capability terms (e.g. resize, autoscaling, encryption)
- NEVER include generic verbs (permitir, possuir, oferecer)
- NEVER include full sentences
────────────────────────────────
STEP 4 — Output rules
────────────────────────────────
Return ONLY valid JSON between <json> tags.
Do NOT explain your reasoning.
Question:
{question}
<json>
{{
"requirement_type": "COMPLIANCE | FUNCTIONAL | NON_FUNCTIONAL",
"subject": "<short technical subject, e.g. 'Compute Instances'>",
"expected_value": "<technical capability or condition, or empty string>",
"decision_type": "YES_NO | YES_NO_PARTIAL",
"keywords": ["mandatory_oci_service", "technical_capability", "additional_term"]
}}
</json>
"""
resp = llm_for_rag.invoke(prompt)
raw = resp.content.strip()
try:
# remove ```json ``` ou ``` ```
raw = re.sub(r"```json|```", "", raw).strip()
match = re.search(r"<json>\s*(\{.*?\})\s*</json>", raw, re.DOTALL)
if not match:
raise ValueError("No JSON block found")
json_text = match.group(1)
return json.loads(json_text)
except Exception as e:
print("⚠️ RFP PARSER FAILED")
print("RAW RESPONSE:")
print(raw)
return {
"requirement_type": "UNKNOWN",
"subject": question,
"expected_value": "",
"decision_type": "YES_NO_PARTIAL",
"keywords": re.findall(r"\b\w+\b", question.lower())[:5]
}
def extract_graph_keywords_from_requirement(req: dict) -> str:
keywords = set(req.get("keywords", []))
if req.get("subject"):
keywords.add(req["subject"].lower())
if req.get("expected_value"):
keywords.add(str(req["expected_value"]).lower())
return ", ".join(sorted(keywords))
def build_oracle_text_query(text: str) -> str | None:
ORACLE_TEXT_STOPWORDS = {
"and", "or", "the", "with", "between", "of", "to", "for",
"in", "on", "by", "is", "are", "was", "were", "be", "within", "between"
}
tokens = []
text = text.lower()
text = re.sub(r"[^a-z0-9\s]", " ", text)
for token in text.split():
if len(token) >= 4 and token not in ORACLE_TEXT_STOPWORDS:
tokens.append(f"{token}")
tokens = sorted(set(tokens))
return " OR ".join(tokens) if tokens else None
def query_knowledge_graph(raw_keywords: str, top_k: int = 20, min_score: int = 50):
cursor = oracle_conn.cursor()
safe_query = build_oracle_text_query(raw_keywords)
if not safe_query:
cursor.close()
return []
sql = f"""
SELECT
e1.NAME AS source_name,
r.RELATION_TYPE,
e2.NAME AS target_name,
GREATEST(SCORE(1), SCORE(2)) AS relevance_score
FROM RELATIONS_{GRAPH_NAME} r
JOIN ENTITIES_{GRAPH_NAME} e1 ON e1.ID = r.SOURCE_ID
JOIN ENTITIES_{GRAPH_NAME} e2 ON e2.ID = r.TARGET_ID
WHERE e1.NAME = 'REQUIREMENT'
AND (
CONTAINS(e2.NAME, '{safe_query}', 1) > 0
OR CONTAINS(r.RELATION_TYPE, '{safe_query}', 2) > 0
)
AND GREATEST(SCORE(1), SCORE(2)) >= {min_score}
ORDER BY relevance_score DESC
FETCH FIRST {top_k} ROWS ONLY
"""
print("🔎 GRAPH QUERY (ranked):")
print(sql)
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
print(f"📊 GRAPH FACTS (top {top_k}):")
for s, r, t, sc in rows:
print(f" [{sc:>3}] REQUIREMENT -[{r}]-> {t}")
# mantém compatibilidade com o pipeline atual
return [(s, r, t) for s, r, t, _ in rows]
# RE-RANK
def extract_terms_from_graph_text(graph_context):
if not graph_context:
return set()
if isinstance(graph_context, list):
terms = set()
for row in graph_context:
for col in row:
if isinstance(col, str):
terms.add(col.lower())
return terms
if isinstance(graph_context, str):
terms = set()
pattern = re.findall(r"([\w\s]+)-$begin:math:display$\[\\w\_\]\+$end:math:display$->([\w\s]+)", graph_context)
for e1, e2 in pattern:
terms.add(e1.strip().lower())
terms.add(e2.strip().lower())
return terms
return set()
def rerank_documents_with_graph_terms(docs, query, graph_terms):
query_terms = set(re.findall(r'\b\w+\b', query.lower()))
all_terms = query_terms.union(graph_terms)
scored_docs = []
for doc in docs:
doc_text = doc.page_content.lower()
score = sum(1 for term in all_terms if term in doc_text)
scored_docs.append((score, doc))
top_docs = sorted(scored_docs, key=lambda x: x[0], reverse=True)[:5]
return [doc.page_content for _, doc in top_docs]
# SEMANTIC CHUNKING
def split_llm_output_into_chapters(llm_text):
chapters = []
current_chapter = []
lines = llm_text.splitlines()
for line in lines:
if re.match(chapter_separator_regex, line):
if current_chapter:
chapters.append("\n".join(current_chapter).strip())
current_chapter = [line]
else:
current_chapter.append(line)
if current_chapter:
chapters.append("\n".join(current_chapter).strip())
return chapters
def semantic_chunking(text):
prompt = f"""
You received the following text extracted via OCR:
{text}
Your task:
1. Identify headings (short uppercase or bold lines, no period at the end) putting the Product Name (Application Name) and the Subject
2. Separate paragraphs by heading
3. Indicate columns with [COLUMN 1], [COLUMN 2] if present
4. Indicate tables with [TABLE] in markdown format
5. ALWAYS PUT THE URL if there is a Reference
6. Indicate explicity metrics (if it exists)
Examples:
- Oracle Financial Services RTO is 1 hour
- The Oracle Banking Supply Chain Finance Cloud Service A maximum number of 10K Hosted Transactions
- The Oracle Banking Payments Cloud Service, Additional Non-Production Environment: You may purchase up to a maximum of ten (10) additional Non-Production Environments
"""
get_out = False
while not get_out:
try:
response = llm_for_rag.invoke(prompt)
get_out = True
except:
print("[ERROR] Gen AI call error")
return response
def read_pdfs(pdf_path):
if "-ocr" in pdf_path:
doc_pages = PyMuPDFLoader(str(pdf_path)).load()
else:
doc_pages = UnstructuredPDFLoader(str(pdf_path)).load()
full_text = "\n".join([page.page_content for page in doc_pages])
return full_text
def smart_split_text(text, max_chunk_size=10_000):
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = min(start + max_chunk_size, text_length)
split_point = max(
text.rfind('.', start, end),
text.rfind('!', start, end),
text.rfind('?', start, end),
text.rfind('\n\n', start, end)
)
if split_point == -1 or split_point <= start:
split_point = end
else:
split_point += 1
chunk = text[start:split_point].strip()
if chunk:
chunks.append(chunk)
start = split_point
return chunks
def load_previously_indexed_docs():
if os.path.exists(PROCESSED_DOCS_FILE):
with open(PROCESSED_DOCS_FILE, "rb") as f:
return pickle.load(f)
return set()
def save_indexed_docs(docs):
with open(PROCESSED_DOCS_FILE, "wb") as f:
pickle.dump(docs, f)
# =========================
# Main Function
# =========================
def chat():
PDF_FOLDER = Path("docs") # pasta onde estão os PDFs
pdf_paths = sorted(
str(p) for p in PDF_FOLDER.glob("*.pdf")
)
already_indexed_docs = load_previously_indexed_docs()
updated_docs = set()
try:
vectorstore = FAISS.load_local(INDEX_PATH, embeddings, allow_dangerous_deserialization=True)
print("✔️ FAISS index loaded.")
except Exception:
print("⚠️ FAISS index not found, creating a new one.")
vectorstore = None
new_chunks = []
for pdf_path in tqdm(pdf_paths, desc=f"📄 Processing PDFs"):
print(f" {os.path.basename(pdf_path)}")
if pdf_path in already_indexed_docs:
print(f"✅ Document already indexed: {pdf_path}")
continue
full_text = read_pdfs(pdf_path=pdf_path)
path_url = filename_to_url(os.path.basename(pdf_path))
text_chunks = smart_split_text(full_text, max_chunk_size=10_000)
overflow_buffer = ""
for chunk in tqdm(text_chunks, desc=f"📄 Processing text chunks", dynamic_ncols=True, leave=False):
current_text = overflow_buffer + chunk
treated_text = semantic_chunking(current_text)
if hasattr(treated_text, "content"):
chapters = split_llm_output_into_chapters(treated_text.content)
last_chapter = chapters[-1] if chapters else ""
if last_chapter and not last_chapter.strip().endswith((".", "!", "?", "\n\n")):
print("📌 Last chapter seems incomplete, saving for the next cycle")
overflow_buffer = last_chapter
chapters = chapters[:-1]
else:
overflow_buffer = ""
for chapter_text in chapters:
reference_url = "Reference: " + path_url
chapter_text = chapter_text + "\n" + reference_url
doc = Document(page_content=chapter_text, metadata={"source": pdf_path, "reference": reference_url})
new_chunks.append(doc)
print(f"✅ New chapter indexed:\n{chapter_text}...\n")
else:
print(f"[ERROR] semantic_chunking returned unexpected type: {type(treated_text)}")
updated_docs.add(str(pdf_path))
if new_chunks:
if vectorstore:
vectorstore.add_documents(new_chunks)
else:
vectorstore = FAISS.from_documents(new_chunks, embedding=embeddings)
vectorstore.save_local(INDEX_PATH)
save_indexed_docs(already_indexed_docs.union(updated_docs))
print(f"💾 {len(new_chunks)} chunks added to FAISS index.")
print("🧠 Building knowledge graph...")
create_knowledge_graph(new_chunks)
else:
print("📁 No new documents to index.")
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 50, "fetch_k": 100})
RFP_DECISION_TEMPLATE = """
You are answering an RFP requirement with risk awareness.
Requirement:
Type: {requirement_type}
Subject: {subject}
Expected value: {expected_value}
Document evidence:
{text_context}
Graph evidence:
{graph_context}
Decision rules:
- Answer ONLY with YES, NO or PARTIAL
- If value differs, answer PARTIAL
- If not found, answer NO
Interpretation rules (MANDATORY):
- If a capability is supported but requires reboot, downtime, or restart, it STILL counts as YES unless the requirement explicitly forbids it.
- "Servidor em funcionamento" means the resource exists and is active before the operation, not that it must remain online without interruption.
- Only answer NO if the operation is NOT supported at all or requires destroying and recreating the resource.
- Reboot, restart, or brief unavailability MUST NOT be interpreted as lack of support.
Confidence rules:
- HIGH: Explicit evidence directly answers the requirement
- MEDIUM: Evidence partially matches or requires light interpretation
- LOW: Requirement is ambiguous OR evidence is indirect OR missing
Ambiguity rules:
- ambiguity_detected = true if:
- The requirement can be interpreted in more than one way
- Keywords are vague (e.g. "support", "integration", "capability")
- Evidence does not clearly bind to subject + expected value
Service scope rules (MANDATORY):
- Do NOT use evidence from a different Oracle Cloud service to justify another.
OUTPUT CONSTRAINTS (MANDATORY):
- Return ONLY a valid JSON object
- Do NOT include explanations, comments, markdown, lists, or code fences
- Do NOT write any text before or after the JSON
- The response must start with an opening curly brace and end with a closing curly brace
JSON schema (return exactly this structure):
{{
"answer": "YES | NO | PARTIAL",
"confidence": "HIGH | MEDIUM | LOW",
"ambiguity_detected": true,
"confidence_reason": "<short reason>",
"justification": "<short factual explanation>",
"evidence": [
{{
"quote": "<exact text>",
"source": "<document or section if available>"
}}
]
}}
"""
prompt = PromptTemplate.from_template(RFP_DECISION_TEMPLATE)
def get_context_from_requirement(req: dict):
query_terms = extract_graph_keywords_from_requirement(req)
docs = retriever.invoke(query_terms)
graph_context = query_knowledge_graph(query_terms)
return {
"text_context": "\n\n".join(doc.page_content for doc in docs),
"graph_context": graph_context,
"requirement_type": req["requirement_type"],
"subject": req["subject"],
"expected_value": req.get("expected_value", "")
}
parse_requirement_runnable = RunnableLambda(
lambda q: parse_rfp_requirement(q)
)
chain = (
parse_requirement_runnable
| RunnableMap({
"text_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["text_context"]
),
"graph_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["graph_context"]
),
"requirement_type": lambda req: req["requirement_type"],
"subject": lambda req: req["subject"],
"expected_value": lambda req: req.get("expected_value", "")
})
| prompt
| llm
| StrOutputParser()
)
print("✅ READY")
while True:
query = input("❓ Question (or 'quit' to exit): ")
if query.lower() == "quit":
break
response = chain.invoke(query)
print("\n📜 RESPONSE:\n")
print(response)
print("\n" + "=" * 80 + "\n")
def get_context_from_requirement(req: dict):
query_terms = extract_graph_keywords_from_requirement(req)
docs = retriever.invoke(query_terms)
graph_context = query_knowledge_graph(query_terms)
graph_terms = extract_terms_from_graph_text(graph_context)
reranked_chunks = rerank_documents_with_graph_terms(
docs,
query_terms,
graph_terms
)
return {
"text_context": "\n\n".join(reranked_chunks),
"graph_context": graph_context,
"requirement_type": req["requirement_type"],
"subject": req["subject"],
"expected_value": req.get("expected_value", "")
}
try:
vectorstore = FAISS.load_local(
INDEX_PATH,
embeddings,
allow_dangerous_deserialization=True
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 50, "fetch_k": 100}
)
except:
print("No Faiss")
RFP_DECISION_TEMPLATE = """
You are answering an RFP requirement with risk awareness.
Requirement:
Type: {requirement_type}
Subject: {subject}
Expected value: {expected_value}
Document evidence:
{text_context}
Graph evidence:
{graph_context}
Decision rules:
- Answer ONLY with YES, NO or PARTIAL
- If value differs, answer PARTIAL
- If not found, answer NO
Interpretation rules (MANDATORY):
- If a capability is supported but requires reboot, downtime, or restart, it STILL counts as YES unless the requirement explicitly forbids it.
- "Servidor em funcionamento" means the resource exists and is active before the operation, not that it must remain online without interruption.
- Only answer NO if the operation is NOT supported at all or requires destroying and recreating the resource.
- Reboot, restart, or brief unavailability MUST NOT be interpreted as lack of support.
Confidence rules:
- HIGH: Explicit evidence directly answers the requirement
- MEDIUM: Evidence partially matches or requires light interpretation
- LOW: Requirement is ambiguous OR evidence is indirect OR missing
Ambiguity rules:
- ambiguity_detected = true if:
- The requirement can be interpreted in more than one way
- Keywords are vague (e.g. "support", "integration", "capability")
- Evidence does not clearly bind to subject + expected value
Service scope rules (MANDATORY):
- Do NOT use evidence from a different Oracle Cloud service to justify another.
OUTPUT CONSTRAINTS (MANDATORY):
- Return ONLY a valid JSON object
- Do NOT include explanations, comments, markdown, lists, or code fences
- Do NOT write any text before or after the JSON
- The response must start with an opening curly brace and end with a closing curly brace
JSON schema (return exactly this structure):
{{
"answer": "YES | NO | PARTIAL",
"confidence": "HIGH | MEDIUM | LOW",
"ambiguity_detected": true,
"confidence_reason": "<short reason>",
"justification": "<short factual explanation>",
"evidence": [
{{
"quote": "<exact text>",
"source": "<document or section if available>"
}}
]
}}
"""
prompt = PromptTemplate.from_template(RFP_DECISION_TEMPLATE)
parse_requirement_runnable = RunnableLambda(
lambda q: parse_rfp_requirement(q)
)
chain = (
parse_requirement_runnable
| RunnableMap({
"text_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["text_context"]
),
"graph_context": RunnableLambda(
lambda req: get_context_from_requirement(req)["graph_context"]
),
"requirement_type": lambda req: req["requirement_type"],
"subject": lambda req: req["subject"],
"expected_value": lambda req: req.get("expected_value", "")
})
| prompt
| llm
| StrOutputParser()
)
def answer_question(question: str) -> str:
return chain.invoke(question)
# 🚀 Run
if __name__ == "__main__":
chat()