first commit

This commit is contained in:
Oracle Public Cloud User
2025-09-02 23:15:17 +00:00
commit b51b2f5e1e
30 changed files with 598 additions and 0 deletions

0
__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

80
app.py Normal file
View File

@@ -0,0 +1,80 @@
import asyncio, logging, time, json, httpx
from fastapi import FastAPI, HTTPException
from schemas import RequestPayload, ResponseTemplate
from services.address_service import parse_address
from services.normalize_service import normalize_customer
from services.dedupe_service import dedupe_candidates
from services.golden_service import pick_golden
from services.harmonize_service import harmonize
from services.enrich_service import enrich
from config import settings
app = FastAPI()
logging.basicConfig(level=getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("mdm.app")
@app.get("/health")
async def health():
return {"ok": True}
@app.get("/llm/ping")
async def llm_ping():
ep = settings.OLLAMA_ENDPOINTS[0]
try:
async with httpx.AsyncClient(timeout=5) as c:
r = await c.get(f"{ep}/api/tags")
return {"endpoint": ep, "status": r.status_code}
except Exception as e:
raise HTTPException(status_code=502, detail=f"Ollama not reachable at {ep}: {e}")
@app.post("/mdm/process", response_model=ResponseTemplate)
async def process(payload: RequestPayload):
t0 = time.time()
logger.info(f"[STEP] domain={payload.domain} ops={payload.operations} records={len(payload.records)}")
issues = []
sem_n = asyncio.Semaphore(settings.CONCURRENCY_NORMALIZE)
async def _n(r):
try:
async with sem_n:
return await normalize_customer(r.model_dump())
except Exception as e:
issues.append({"stage":"normalize","id": getattr(r,'id',None),"error":str(e)})
return {}
norm = await asyncio.gather(*[_n(r) for r in payload.records])
logger.info(f"[STEP] normalize done in {time.time()-t0:.2f}s")
sem_a = asyncio.Semaphore(settings.CONCURRENCY_ADDRESS)
async def _a(r):
try:
if r.get("address") or r.get("cep"):
async with sem_a:
r["_parsed"] = await parse_address(r)
return r
except Exception as e:
issues.append({"stage":"address","id": r.get('id'),"error":str(e)})
return r
norm = await asyncio.gather(*[_a(r) for r in norm])
logger.info(f"[STEP] address-parse done in {time.time()-t0:.2f}s")
matches = dedupe_candidates(norm)
golden = pick_golden(norm) if any(op in payload.operations for op in ("consolidate","dedupe")) else None
harm = harmonize(golden or {})
enr = enrich(norm) if "enrich" in payload.operations else []
return ResponseTemplate(
record_clean=norm,
golden_record=golden,
matches=matches,
harmonization=harm,
enrichment=enr,
issues=issues,
actions=[],
pii_masks={},
audit_log=[],
confidence=0.9 if golden else 0.7
)

25
config.py Normal file
View File

@@ -0,0 +1,25 @@
from pydantic import BaseModel
from typing import List
import os
class Settings(BaseModel):
APP_HOST: str = os.getenv("APP_HOST", "0.0.0.0")
APP_PORT: int = int(os.getenv("APP_PORT", "8001"))
OLLAMA_ENDPOINTS: List[str] = [e.strip() for e in os.getenv("OLLAMA_ENDPOINTS","http://localhost:11434").split(",") if e.strip()]
MODEL_ADDRESS: str = os.getenv("MODEL_ADDRESS", "qwen2.5:7b")
MODEL_NORMALIZE: str = os.getenv("MODEL_NORMALIZE", "qwen2.5:7b")
NUM_GPU: int = int(os.getenv("NUM_GPU", "22"))
NUM_BATCH: int = int(os.getenv("NUM_BATCH", "512"))
NUM_CTX: int = int(os.getenv("NUM_CTX", "4096"))
NUM_THREAD: int = int(os.getenv("NUM_THREAD", "16"))
TEMPERATURE: float = float(os.getenv("TEMPERATURE", "0.0"))
TOP_P: float = float(os.getenv("TOP_P", "1.0"))
TOP_K: int = int(os.getenv("TOP_K", "40"))
REQUEST_TIMEOUT: float = float(os.getenv("REQUEST_TIMEOUT", "180"))
LOG_LEVEL: str = os.getenv("LOG_LEVEL","INFO")
CONCURRENCY_NORMALIZE: int = int(os.getenv("CONCURRENCY_NORMALIZE","8"))
CONCURRENCY_ADDRESS: int = int(os.getenv("CONCURRENCY_ADDRESS","8"))
# Optional: best-effort postal lookup (disabled by default)
USE_POSTAL_LOOKUP: bool = os.getenv("USE_POSTAL_LOOKUP","0") in ("1","true","True")
ZIPCODEBASE_KEY: str = os.getenv("ZIPCODEBASE_KEY","")
settings = Settings()

0
prompts/__init__.py Normal file
View File

View File

@@ -0,0 +1,69 @@
You are a strict address parser/normalizer. Output ONLY valid JSON with exactly these keys:
{"thoroughfare":null,"house_number":null,"neighborhood":null,"city":null,"state":null,"postal_code":null,"country_code":null,"complement":null}
GLOBAL RULES
- Parse logradouro and number from free text (e.g., "Rua X, 123 - ap 4").
- Normalize country_code to ISO-2 UPPER.
- Be conservative: if uncertain, set null. Do NOT invent values.
- Never add extra keys. Never return explanations. JSON ONLY.
## 🇧🇷 Brazil (BR) — Neighborhood vs City Rules (Mandatory)
**Definitions:**
- `neighborhood` (bairro) ≠ `city` (municipality).
- Examples of **neighborhoods** (not exhaustive): Copacabana, Ipanema, Leblon, Botafogo, Flamengo, Centro, Santa Teresa, Barra da Tijuca, Tijuca, Méier, Madureira, Bangu, Grajaú, Maracanã, Jardim Botânico, Urca, Laranjeiras, Recreio dos Bandeirantes, Jacarepaguá, Campo Grande, etc.
**Precedence and Conflict Handling (deterministic):**
1. **Postal code is mandatory** and must be canonical `00000-000` when valid.
2. **Postal code lookup** (`find_postal_code`) is the **source of truth** for `city` and `state` when available.
3. If the **input `city`** contains a **neighborhood** (e.g. “Copacabana”):
- `neighborhood = <input city>`
- `city = <lookup city>` (e.g. “Rio de Janeiro”)
- **Never** assign a neighborhood to `city`.
4. If there is a **conflict** (`city_input` ≠ `city_lookup`):
- Prefer `city_lookup` if it exists and is a valid municipality.
- Move `city_input` to `neighborhood` **only if** it matches a known neighborhood.
- If both are real cities and different → keep `city_lookup` and add an `issues` entry:
`{"field": "city", "type": "conflict", "observed": "<city_input>", "tool": "<city_lookup>"}`.
5. If **no lookup** is available and `city_input` is a neighborhood:
- `neighborhood = <city_input>`
- `city = null`
- Add to `enrichment`:
`{"status":"pending","reason":"postal_lookup_missing_city","hint":"neighborhood_detected","value":"<neighborhood>"}`
6. **Never overwrite `city` with a neighborhood**, even if the input provided it under `city`.
7. `state`: always use **UF code** (e.g. RJ). If input has full name (“Rio de Janeiro”), map to UF when possible.
8. `neighborhood` must never override `city`. If both exist, keep **both** in their proper fields.
**Output format (Address) — Keys must always be present:**
```json
{
"thoroughfare": null,
"house_number": null,
"neighborhood": null,
"city": null,
"state": null,
"postal_code": null,
"country_code": null,
"complement": null
}
FEW-SHOTS (BR)
INPUT:
{"address":"Rua Figueiredo Magalhães, 123 - Copacabana","cep":"22041001","city":"Rio de Janeiro","state":"RJ","country_code":"BR"}
OUTPUT:
{"thoroughfare":"Rua Figueiredo Magalhães","house_number":"123","neighborhood":"Copacabana","city":"Rio de Janeiro","state":"RJ","postal_code":"22041-001","country_code":"BR","complement":null}
INPUT:
{"address":"Av. Paulista 1000, Bela Vista, ap 121","cep":"01310100","city":"São Paulo","state":"SP","country_code":"BR"}
OUTPUT:
{"thoroughfare":"Avenida Paulista","house_number":"1000","neighborhood":"Bela Vista","city":"São Paulo","state":"SP","postal_code":"01310-100","country_code":"BR","complement":"ap 121"}
INPUT:
{"address":"Rua Jericó, 227 - Sumarezinho","cep":"05435040","country_code":"BR","state":"SP","city":"São Paulo"}
OUTPUT:
{"thoroughfare":"Rua Jericó","house_number":"227","neighborhood":"Sumarezinho","city":"São Paulo","state":"SP","postal_code":"05435-040","country_code":"BR","complement":null}
NOW PARSE THIS INPUT JSON STRICTLY AND RETURN ONLY THE JSON OBJECT:
{input_json}

View File

@@ -0,0 +1,15 @@
You are a strict MDM normalizer. Return ONLY valid JSON for a single **customer** record with exactly these keys:
{"source":null,"id":null,"name":null,"cpf":null,"cnpj":null,"email":null,"phone":null,"cep":null,"address":null,"birth_date":null,"city":null,"state":null,"country_code":null}
Rules (Brazil-first but generic):
- Trim whitespace; names in Title Case; states (UF) UPPER (e.g., "RJ"), country_code ISO-2 UPPER.
- Email: lowercase; RFC-like format; if invalid keep null.
- CPF/CNPJ: if check digits valid, return canonical format (CPF=000.000.000-00, CNPJ=00.000.000/0000-00). If invalid, set null.
- CEP (BR): canonical 00000-000; if cannot canonicalize, set null.
- Phone (BR): return **E.164** (+55DDDNXXXXXXX). If only 8 digits after DDD, prefix '9'. If can't ensure E.164, set null.
- City in Title Case; State as UF 2 letters when BR, else leave state as given or null.
- Never invent values; be conservative: if uncertain set null.
- Never add extra keys. Never return explanations.
Input JSON:
{input_json}

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
fastapi==0.111.1
uvicorn[standard]==0.30.5
httpx==0.27.0
pydantic==2.8.2
python-dotenv==1.0.1
rapidfuzz==3.9.6

10
run.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
set -e
pip install -r requirements.txt
echo "== MDM Orchestrator LLM-centric =="
echo "APP_PORT=${APP_PORT:-8001} LOG_LEVEL=${LOG_LEVEL:-INFO}"
echo "OLLAMA_ENDPOINTS=${OLLAMA_ENDPOINTS:-http://localhost:11434}"
echo "NUM_GPU=${NUM_GPU:-22} NUM_BATCH=${NUM_BATCH:-512} NUM_CTX=${NUM_CTX:-4096} NUM_THREAD=${NUM_THREAD:-16}"
echo "CONCURRENCY_NORMALIZE=${CONCURRENCY_NORMALIZE:-8} CONCURRENCY_ADDRESS=${CONCURRENCY_ADDRESS:-8}"
export PYTHONPATH=$(pwd)
uvicorn app:app --host 0.0.0.0 --port "${APP_PORT:-8001}" --workers 1

48
schemas.py Normal file
View File

@@ -0,0 +1,48 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Literal, Any, Dict
Domain = Literal["customer","product","supplier","financial","address"]
Operation = Literal["normalize","validate","dedupe","consolidate","harmonize","enrich","mask","outlier_check"]
class InputRecord(BaseModel):
source: Optional[str] = None
id: Optional[str] = None
name: Optional[str] = None
cpf: Optional[str] = None
cnpj: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
cep: Optional[str] = None
address: Optional[str] = None
birth_date: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
country_code: Optional[str] = None
class RequestPayload(BaseModel):
domain: Domain
operations: List[Operation]
policies: Dict[str, Any] = Field(default_factory=dict)
records: List[InputRecord]
class AddressOut(BaseModel):
thoroughfare: Optional[str] = None
house_number: Optional[str] = None
neighborhood: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
postal_code: Optional[str] = None
country_code: Optional[str] = None
complement: Optional[str] = None
class ResponseTemplate(BaseModel):
record_clean: List[dict] = Field(default_factory=list)
golden_record: Optional[dict] = None
matches: List[dict] = Field(default_factory=list)
harmonization: dict = Field(default_factory=lambda: {"codes": [], "units": []})
enrichment: List[dict] = Field(default_factory=list)
issues: List[dict] = Field(default_factory=list)
actions: List[dict] = Field(default_factory=list)
pii_masks: dict = Field(default_factory=dict)
audit_log: List[dict] = Field(default_factory=list)
confidence: float = 0.0

0
services/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,56 @@
import httpx, json, itertools, logging, time, asyncio, re
from pathlib import Path
from config import settings
from .common import safe_json_from_text
logger = logging.getLogger("mdm.services")
_rr = itertools.count()
BASE_DIR = Path(__file__).resolve().parent.parent
PROMPT_PATH = BASE_DIR / "prompts" / "address_prompt.txt"
def _ep():
return settings.OLLAMA_ENDPOINTS[next(_rr)%len(settings.OLLAMA_ENDPOINTS)]
async def parse_address(record: dict) -> dict:
prompt = PROMPT_PATH.read_text(encoding="utf-8").replace("{input_json}", json.dumps(record, ensure_ascii=False))
payload = {
"model": settings.MODEL_ADDRESS,
"prompt": prompt,
"format": "json",
"options": {
"num_ctx": settings.NUM_CTX,
"num_batch": settings.NUM_BATCH,
"num_gpu": settings.NUM_GPU,
"num_thread": settings.NUM_THREAD,
"temperature": settings.TEMPERATURE
},
"stream": False
}
ep = _ep()
timeout = httpx.Timeout(connect=5.0, read=float(settings.REQUEST_TIMEOUT), write=30.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as c:
last_exc = None
for attempt in range(1,4):
try:
t0=time.time()
r = await c.post(f"{ep}/api/generate", json=payload)
dt=time.time()-t0
logger.info(f"[LLM] address status={r.status_code} time={dt:.2f}s ep={ep} attempt={attempt}")
r.raise_for_status()
data = safe_json_from_text(r.text)
resp = (data.get("response","{}") if isinstance(data, dict) else "{}").strip()
out = json.loads(resp)
if isinstance(out, dict):
# canonicalize BR CEP if model returned '00000000'
if out.get("postal_code") and re.fullmatch(r"\d{8}", out["postal_code"]):
out["postal_code"] = out["postal_code"][:5] + "-" + out["postal_code"][5:]
return out
return {
"thoroughfare": None,"house_number": None,"neighborhood": None,"city": None,"state": None,"postal_code": None,"country_code": None,"complement": None
}
except Exception as e:
last_exc = e
logger.warning(f"[LLM] address attempt {attempt}/3 failed: {e}")
logger.error(f"[LLM] address failed after retries: {last_exc}")
return {"thoroughfare": None,"house_number": None,"neighborhood": None,"city": None,"state": None,"postal_code": None,"country_code": None,"complement": None}

25
services/common.py Normal file
View File

@@ -0,0 +1,25 @@
import json, logging
logger = logging.getLogger("mdm.services")
def safe_json_from_text(text: str):
try:
return json.loads(text)
except Exception:
pass
first = text.find("{"); last = text.rfind("}")
if first != -1 and last != -1 and last > first:
chunk = text[first:last+1]
try:
return json.loads(chunk)
except Exception:
pass
last_ok = None
for line in text.splitlines():
line=line.strip()
if not line or not line.startswith("{"): continue
try:
last_ok = json.loads(line)
except Exception:
continue
return last_ok or {}

View File

@@ -0,0 +1,23 @@
from rapidfuzz import fuzz
def _sim(a: str, b: str) -> float:
if not a or not b: return 0.0
return fuzz.token_set_ratio(a, b) / 100.0
def _pairs(n: int):
for i in range(n):
for j in range(i+1, n):
yield i, j
def dedupe_candidates(rows, threshold=0.87):
out = []
for i,j in _pairs(len(rows)):
a,b = rows[i], rows[j]
s = (
_sim(a.get("name",""), b.get("name","")) +
max(_sim(a.get("email",""), b.get("email","")), _sim(a.get("phone",""), b.get("phone",""))) +
_sim(a.get("address",""), b.get("address",""))
) / 3.0
if s >= threshold:
out.append({"i": i, "j": j, "score": round(s,3)})
return out

View File

@@ -0,0 +1 @@
def enrich(rows): return []

View File

@@ -0,0 +1,10 @@
def pick_golden(rows):
if not rows: return {}
def score(r): return (5 if r.get("source") in ("ERP","CRM") else 0) + sum(1 for v in r.values() if v not in (None,"",[],{}))
best = max(rows, key=score)
gold = dict(best)
for r in rows:
for k,v in r.items():
if gold.get(k) in (None,"",[],{}) and v not in (None,"",[],{}):
gold[k]=v
return gold

View File

@@ -0,0 +1 @@
def harmonize(row): return {'codes':[],'units':[]}

View File

@@ -0,0 +1,57 @@
import httpx, json, itertools, logging, time, asyncio
from pathlib import Path
from config import settings
from .common import safe_json_from_text
from services.zipcode_service import enrich_address_with_zipcode
logger = logging.getLogger("mdm.services")
_rr = itertools.count()
BASE_DIR = Path(__file__).resolve().parent.parent
PROMPT_PATH = BASE_DIR / "prompts" / "customer_prompt.txt"
def _ep():
return settings.OLLAMA_ENDPOINTS[next(_rr)%len(settings.OLLAMA_ENDPOINTS)]
async def normalize_customer(record: dict) -> dict:
record = await enrich_address_with_zipcode(record)
# Minimal pre-cleaning: lower email only to reduce model ambiguity
if record.get("email"):
record["email"] = record["email"].strip().lower()
prompt = PROMPT_PATH.read_text(encoding="utf-8").replace("{input_json}", json.dumps(record, ensure_ascii=False))
payload = {
"model": settings.MODEL_NORMALIZE,
"prompt": prompt,
"format": "json",
"options": {
"num_ctx": settings.NUM_CTX,
"num_batch": settings.NUM_BATCH,
"num_gpu": settings.NUM_GPU,
"num_thread": settings.NUM_THREAD,
"temperature": settings.TEMPERATURE
},
"stream": False
}
ep = _ep()
timeout = httpx.Timeout(connect=5.0, read=float(settings.REQUEST_TIMEOUT), write=30.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as c:
last_exc = None
for attempt in range(1,4):
try:
t0=time.time()
r = await c.post(f"{ep}/api/generate", json=payload)
dt=time.time()-t0
logger.info(f"[LLM] normalize status={r.status_code} time={dt:.2f}s ep={ep} attempt={attempt}")
r.raise_for_status()
data = safe_json_from_text(r.text)
resp = (data.get("response","{}") if isinstance(data, dict) else "{}").strip()
out = json.loads(resp)
if isinstance(out, dict):
return out
return {}
except Exception as e:
last_exc = e
logger.warning(f"[LLM] normalize attempt {attempt}/3 failed: {e}")
logger.error(f"[LLM] normalize failed after retries: {last_exc}")
return {}

172
services/zipcode_service.py Normal file
View File

@@ -0,0 +1,172 @@
# services/zipcode_service.py
import os, asyncio, time, random, httpx, logging
from typing import Dict, Any
logger = logging.getLogger("services.zipcode_service")
ZIPCODEBASE_KEY = os.getenv("ZIPCODEBASE_KEY", "")
ZIPCODEBASE_URL = "https://app.zipcodebase.com/api/v1/search"
# Cache simples em memória (poderia ser LRU/Redis)
_ZIP_CACHE: Dict[str, Dict[str, Any]] = {}
# "In-flight" para coalescer chamadas concorrentes do mesmo CEP
_INFLIGHT: Dict[str, asyncio.Future] = {}
# Limitar concorrência global das chamadas externas
_SEM = asyncio.Semaphore(int(os.getenv("ZIPCODEBASE_MAX_CONCURRENCY", "4")))
def _norm_cep(cep: str) -> str:
if not cep:
return ""
d = "".join(c for c in cep if c.isdigit())
return d
async def _via_cep(cep_digits: str) -> Dict[str, Any]:
"""Fallback grátis BR (sem limites agressivos)."""
url = f"https://viacep.com.br/ws/{cep_digits}/json/"
try:
async with httpx.AsyncClient(timeout=10) as client:
r = await client.get(url)
if r.status_code != 200:
logger.warning(f"[ViaCEP] status={r.status_code} for {cep_digits}")
return {}
data = r.json()
if data.get("erro"):
return {}
# Formata postal_code no padrão 00000-000
pc = f"{cep_digits[:5]}-{cep_digits[5:]}" if len(cep_digits) == 8 else None
return {
"thoroughfare": None,
"house_number": None,
"neighborhood": data.get("bairro"),
"city": data.get("localidade"),
"state": data.get("uf"),
"postal_code": pc,
"country_code": "BR",
"complement": None
}
except Exception as e:
logger.error(f"[ViaCEP] error for {cep_digits}: {e}")
return {}
async def _zipcodebase_lookup(cep_digits: str, country: str) -> Dict[str, Any]:
"""Consulta Zipcodebase com retry/backoff e respeito a Retry-After."""
params = {"codes": cep_digits, "country": country, "apikey": ZIPCODEBASE_KEY}
retries = 3
base_delay = float(os.getenv("ZIPCODEBASE_BASE_DELAY", "1.0"))
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
for attempt in range(1, retries + 1):
try:
r = await client.get(ZIPCODEBASE_URL, params=params)
if r.status_code == 429:
# Respeita Retry-After se existir
ra = r.headers.get("Retry-After")
if ra:
try:
wait_s = int(ra)
except ValueError:
wait_s = base_delay * attempt
else:
wait_s = base_delay * attempt
# Jitter leve para evitar sincronização
wait_s += random.uniform(0, 0.5)
logger.warning(f"[Zipcodebase] 429 on {cep_digits}, attempt {attempt}/{retries}, sleeping {wait_s:.2f}s")
await asyncio.sleep(wait_s)
continue
r.raise_for_status()
data = r.json()
results = (data.get("results") or {}).get(cep_digits, [])
if not results:
return {}
enriched = results[0]
# Monta saída mínima estável (Zipcodebase varia por plano)
return {
"thoroughfare": enriched.get("street") or None,
"house_number": None,
"neighborhood": enriched.get("district") or None,
"city": enriched.get("city") or None,
"state": (enriched.get("state_code") or enriched.get("state")) or None,
"postal_code": f"{cep_digits[:5]}-{cep_digits[5:]}" if len(cep_digits) == 8 else cep_digits,
"country_code": country.upper(),
"complement": None
}
except httpx.HTTPStatusError as e:
# Para 4xx (exceto 429) não adianta muito retry
if 400 <= e.response.status_code < 500 and e.response.status_code != 429:
logger.error(f"[Zipcodebase] {e.response.status_code} for {cep_digits}: {e.response.text[:200]}")
return {}
# Para 5xx tenta novamente
wait_s = base_delay * attempt + random.uniform(0, 0.5)
logger.warning(f"[Zipcodebase] {e.response.status_code} retry {attempt}/{retries} in {wait_s:.2f}s")
await asyncio.sleep(wait_s)
except (httpx.ConnectError, httpx.ReadTimeout) as e:
wait_s = base_delay * attempt + random.uniform(0, 0.5)
logger.warning(f"[Zipcodebase] network {type(e).__name__} retry {attempt}/{retries} in {wait_s:.2f}s")
await asyncio.sleep(wait_s)
except Exception as e:
logger.error(f"[Zipcodebase] unexpected error: {e}")
return {}
return {}
async def enrich_address_with_zipcode(record: Dict[str, Any]) -> Dict[str, Any]:
"""Enriquece record['_parsed'] via Zipcodebase com cache, coalescência e fallback ViaCEP."""
cep_digits = _norm_cep(record.get("cep", ""))
country = (record.get("country_code") or "BR").upper()
if not cep_digits or len(cep_digits) < 5:
return record
# 1) cache hit
if cep_digits in _ZIP_CACHE:
record["_parsed"] = _ZIP_CACHE[cep_digits]
return record
# 2) coalescer chamadas concorrentes do mesmo CEP
# (quem chegar depois aguarda a futura resposta da primeira chamada)
fut = _INFLIGHT.get(cep_digits)
if fut:
try:
parsed = await fut
if parsed:
_ZIP_CACHE[cep_digits] = parsed
record["_parsed"] = parsed
return record
except Exception:
# Se a future falhou, vamos tentar nós mesmos
pass
# 3) primeira thread: cria a future e executa consulta sob semáforo
loop = asyncio.get_running_loop()
_INFLIGHT[cep_digits] = loop.create_future()
try:
async with _SEM:
parsed = {}
if ZIPCODEBASE_KEY:
parsed = await _zipcodebase_lookup(cep_digits, country)
# Fallback a ViaCEP se Zipcodebase falhar/limitar
if not parsed and country == "BR":
parsed = await _via_cep(cep_digits)
# Guarda no cache
if parsed:
_ZIP_CACHE[cep_digits] = parsed
# Resolve as esperas coalescidas
if not _INFLIGHT[cep_digits].done():
_INFLIGHT[cep_digits].set_result(parsed)
if parsed:
record["_parsed"] = parsed
return record
except Exception as e:
logger.error(f"[Zip] enrich error for {cep_digits}: {e}")
if cep_digits in _INFLIGHT and not _INFLIGHT[cep_digits].done():
_INFLIGHT[cep_digits].set_result({})
return record
finally:
# Limpa a chave de in-flight (evita vazamento)
_INFLIGHT.pop(cep_digits, None)