mirror of
https://github.com/hoshikawa2/mdm_project.git
synced 2026-03-03 16:09:37 +00:00
81 lines
2.8 KiB
Python
81 lines
2.8 KiB
Python
import asyncio, logging, time, json, httpx
|
|
from fastapi import FastAPI, HTTPException
|
|
from schemas import RequestPayload, ResponseTemplate
|
|
from services.address_service import parse_address
|
|
from services.normalize_service import normalize_customer
|
|
from services.dedupe_service import dedupe_candidates
|
|
from services.golden_service import pick_golden
|
|
from services.harmonize_service import harmonize
|
|
from services.enrich_service import enrich
|
|
from config import settings
|
|
|
|
app = FastAPI()
|
|
logging.basicConfig(level=getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO),
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
|
|
logger = logging.getLogger("mdm.app")
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"ok": True}
|
|
|
|
@app.get("/llm/ping")
|
|
async def llm_ping():
|
|
ep = settings.OLLAMA_ENDPOINTS[0]
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as c:
|
|
r = await c.get(f"{ep}/api/tags")
|
|
return {"endpoint": ep, "status": r.status_code}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=502, detail=f"Ollama not reachable at {ep}: {e}")
|
|
|
|
@app.post("/mdm/process", response_model=ResponseTemplate)
|
|
async def process(payload: RequestPayload):
|
|
t0 = time.time()
|
|
logger.info(f"[STEP] domain={payload.domain} ops={payload.operations} records={len(payload.records)}")
|
|
|
|
issues = []
|
|
|
|
sem_n = asyncio.Semaphore(settings.CONCURRENCY_NORMALIZE)
|
|
async def _n(r):
|
|
try:
|
|
async with sem_n:
|
|
return await normalize_customer(r.model_dump())
|
|
except Exception as e:
|
|
issues.append({"stage":"normalize","id": getattr(r,'id',None),"error":str(e)})
|
|
return {}
|
|
|
|
norm = await asyncio.gather(*[_n(r) for r in payload.records])
|
|
logger.info(f"[STEP] normalize done in {time.time()-t0:.2f}s")
|
|
|
|
sem_a = asyncio.Semaphore(settings.CONCURRENCY_ADDRESS)
|
|
async def _a(r):
|
|
try:
|
|
if r.get("address") or r.get("cep"):
|
|
async with sem_a:
|
|
r["_parsed"] = await parse_address(r)
|
|
return r
|
|
except Exception as e:
|
|
issues.append({"stage":"address","id": r.get('id'),"error":str(e)})
|
|
return r
|
|
|
|
norm = await asyncio.gather(*[_a(r) for r in norm])
|
|
logger.info(f"[STEP] address-parse done in {time.time()-t0:.2f}s")
|
|
|
|
matches = dedupe_candidates(norm)
|
|
golden = pick_golden(norm) if any(op in payload.operations for op in ("consolidate","dedupe")) else None
|
|
harm = harmonize(golden or {})
|
|
enr = enrich(norm) if "enrich" in payload.operations else []
|
|
|
|
return ResponseTemplate(
|
|
record_clean=norm,
|
|
golden_record=golden,
|
|
matches=matches,
|
|
harmonization=harm,
|
|
enrichment=enr,
|
|
issues=issues,
|
|
actions=[],
|
|
pii_masks={},
|
|
audit_log=[],
|
|
confidence=0.9 if golden else 0.7
|
|
)
|