Files
mdm_project/files/app.py
2025-09-02 22:21:05 -03:00

81 lines
2.8 KiB
Python

import asyncio, logging, time, json, httpx
from fastapi import FastAPI, HTTPException
from schemas import RequestPayload, ResponseTemplate
from services.address_service import parse_address
from services.normalize_service import normalize_customer
from services.dedupe_service import dedupe_candidates
from services.golden_service import pick_golden
from services.harmonize_service import harmonize
from services.enrich_service import enrich
from config import settings
app = FastAPI()
logging.basicConfig(level=getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("mdm.app")
@app.get("/health")
async def health():
return {"ok": True}
@app.get("/llm/ping")
async def llm_ping():
ep = settings.OLLAMA_ENDPOINTS[0]
try:
async with httpx.AsyncClient(timeout=5) as c:
r = await c.get(f"{ep}/api/tags")
return {"endpoint": ep, "status": r.status_code}
except Exception as e:
raise HTTPException(status_code=502, detail=f"Ollama not reachable at {ep}: {e}")
@app.post("/mdm/process", response_model=ResponseTemplate)
async def process(payload: RequestPayload):
t0 = time.time()
logger.info(f"[STEP] domain={payload.domain} ops={payload.operations} records={len(payload.records)}")
issues = []
sem_n = asyncio.Semaphore(settings.CONCURRENCY_NORMALIZE)
async def _n(r):
try:
async with sem_n:
return await normalize_customer(r.model_dump())
except Exception as e:
issues.append({"stage":"normalize","id": getattr(r,'id',None),"error":str(e)})
return {}
norm = await asyncio.gather(*[_n(r) for r in payload.records])
logger.info(f"[STEP] normalize done in {time.time()-t0:.2f}s")
sem_a = asyncio.Semaphore(settings.CONCURRENCY_ADDRESS)
async def _a(r):
try:
if r.get("address") or r.get("cep"):
async with sem_a:
r["_parsed"] = await parse_address(r)
return r
except Exception as e:
issues.append({"stage":"address","id": r.get('id'),"error":str(e)})
return r
norm = await asyncio.gather(*[_a(r) for r in norm])
logger.info(f"[STEP] address-parse done in {time.time()-t0:.2f}s")
matches = dedupe_candidates(norm)
golden = pick_golden(norm) if any(op in payload.operations for op in ("consolidate","dedupe")) else None
harm = harmonize(golden or {})
enr = enrich(norm) if "enrich" in payload.operations else []
return ResponseTemplate(
record_clean=norm,
golden_record=golden,
matches=matches,
harmonization=harm,
enrichment=enr,
issues=issues,
actions=[],
pii_masks={},
audit_log=[],
confidence=0.9 if golden else 0.7
)