From b51b2f5e1e1f6d7b7f1122121f4941d24dc10877 Mon Sep 17 00:00:00 2001 From: Oracle Public Cloud User Date: Tue, 2 Sep 2025 23:15:17 +0000 Subject: [PATCH] first commit --- __init__.py | 0 __pycache__/app.cpython-310.pyc | Bin 0 -> 3716 bytes __pycache__/config.cpython-310.pyc | Bin 0 -> 1538 bytes __pycache__/schemas.cpython-310.pyc | Bin 0 -> 2517 bytes app.py | 80 ++++++++ config.py | 25 +++ prompts/__init__.py | 0 prompts/address_prompt.txt | 69 +++++++ prompts/customer_prompt.txt | 15 ++ requirements.txt | 6 + run.sh | 10 + schemas.py | 48 +++++ services/__init__.py | 0 services/__pycache__/__init__.cpython-310.pyc | Bin 0 -> 153 bytes .../address_service.cpython-310.pyc | Bin 0 -> 2435 bytes services/__pycache__/common.cpython-310.pyc | Bin 0 -> 723 bytes .../dedupe_service.cpython-310.pyc | Bin 0 -> 1079 bytes .../enrich_service.cpython-310.pyc | Bin 0 -> 258 bytes .../golden_service.cpython-310.pyc | Bin 0 -> 824 bytes .../harmonize_service.cpython-310.pyc | Bin 0 -> 285 bytes .../normalize_service.cpython-310.pyc | Bin 0 -> 2344 bytes .../zipcode_service.cpython-310.pyc | Bin 0 -> 5056 bytes services/address_service.py | 56 ++++++ services/common.py | 25 +++ services/dedupe_service.py | 23 +++ services/enrich_service.py | 1 + services/golden_service.py | 10 + services/harmonize_service.py | 1 + services/normalize_service.py | 57 ++++++ services/zipcode_service.py | 172 ++++++++++++++++++ 30 files changed, 598 insertions(+) create mode 100644 __init__.py create mode 100644 __pycache__/app.cpython-310.pyc create mode 100644 __pycache__/config.cpython-310.pyc create mode 100644 __pycache__/schemas.cpython-310.pyc create mode 100644 app.py create mode 100644 config.py create mode 100644 prompts/__init__.py create mode 100644 prompts/address_prompt.txt create mode 100644 prompts/customer_prompt.txt create mode 100644 requirements.txt create mode 100755 run.sh create mode 100644 schemas.py create mode 100644 services/__init__.py create mode 100644 services/__pycache__/__init__.cpython-310.pyc create mode 100644 services/__pycache__/address_service.cpython-310.pyc create mode 100644 services/__pycache__/common.cpython-310.pyc create mode 100644 services/__pycache__/dedupe_service.cpython-310.pyc create mode 100644 services/__pycache__/enrich_service.cpython-310.pyc create mode 100644 services/__pycache__/golden_service.cpython-310.pyc create mode 100644 services/__pycache__/harmonize_service.cpython-310.pyc create mode 100644 services/__pycache__/normalize_service.cpython-310.pyc create mode 100644 services/__pycache__/zipcode_service.cpython-310.pyc create mode 100644 services/address_service.py create mode 100644 services/common.py create mode 100644 services/dedupe_service.py create mode 100644 services/enrich_service.py create mode 100644 services/golden_service.py create mode 100644 services/harmonize_service.py create mode 100644 services/normalize_service.py create mode 100644 services/zipcode_service.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/__pycache__/app.cpython-310.pyc b/__pycache__/app.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..100db9e6f9358b58d98cff16fd7921651f2c90a3 GIT binary patch literal 3716 zcmai0&2JRR6|bu9>G|+@@E68rft`hgFiF%D$e z_1^D&gu}xng6B79{@nh2h>*Xl$>5X2w6r zaeu;|(Bq}f&QkBhei6;tacc2DjR%uj6_e2AByF!0db zxB2iMwX0%!_Z?o@qx=XTc}TXYeOj-KVr7hv>ylY3OYgtV!drc7~ZWw@4Sr)f9M4a4kHoef{>BzE( zqcDiYs_-MvNkomN!;vFn(Qr7IB95^z7KGAwJhvkn&5bw-eIce=(2xFfv4 z>CLjihgkP<5T^wmTp_2KQfo~eD(U2X?$0<;)XC4bh2tgdrqaUT!!Y>^zF0x)0p*l4 zSLw^Br!2g+`hv(25DuDu&&;-Q$JsDyVi338Th3i)7TYvpA@2yOhuHDLESi~UGqtjf zN7WuC8g{?H;;iTSvk|WP#;FvOI*dNmA2E196S9tzO_MFs`Dwz|4bHgngwBy` zK;b9k_b0b#m+qQf@|1A%IoqSz`WEYwE?dv>+%tBQFz}jqw~dTS$xX6Ce$tZ>&)ubz zsP{8`H^I9C-o)Bmp=8UTq*G4v+~Rq?N1e8RqlUu_v*8F%SjQY+OZ9FKgf z{1ArBE6ar&2`=hH?rh?;U?IC80v?5Kkfc_eILSurzQj+RsU^>Ie5V?ONmUA`*>=A4 zMAb>E9rj@rh8@b78_kdl+u|Z|T(4G;Wo%`SEG;h1U!QL*)UV!Hx>jFZNptNaiSF4$ z^YLcTyyCe6+Zip9$Vu?0#Xjhzp^j>%OhhSdrc|a>9N1baqooB{siCZN6th?fVE0Rm z(Mh}|I`^t#Owq!9_FA3-sZ&+}n2luE)b3$d;3nFK8a__$rTHjxuBOt|{*%W*-DDeI zybOvm`7YTbTQs4&jME)s9V+Jxy@}pPau`|L6lSpYV7L}}%#!?WAt|nx_UPl>BlakV z5Mc7$!@^^l zqa7ENtsEayx4%Q=H_2$m(IQ(`GS+2Xvukv7U2BK6hznhGP5B2tj!;-yA$%h1Ym@a( z<@1%*g&SW~dFVTCaH*54hEaT}Qv~3eq2%$UH;kepgYv^~lu@3cc|be0odd5s3gwV4 z*`<3pUglH(*UQd)r+mutyK`)Z>>8XV@MZWod^O2+p%iXB)5dV&nHD@0s-a&Y zN@czFJ+qz``;jtaGKIZ&yAC2=tq}ll{jU`3DSA%&>0T+I@3L5^fd^K1fuE&)s8Xq!CFe=L&xny z<4=$eXv_f$C^`0MT-GCmVR>NyE+_#K+U-LH1~xA0K>KD;?qY2qEYA!ACZi!Nv(1}5 zz)WU<(L0)Z3OEWbxeP9f|9jwK)zf@0hua1!06X8%iJfb-$kVt}EhneYNlP^fB=TL= zoKuZ5ca6#O=rTAs0|sCX=K=_^!j6K&1OPEXM_yZO@|DFZ^u+&wg>y*H`>>Edg1CX$ z{9{aEe6`cT-f{-hOKM4(zH_o?*4|Cu(^u@9_Rv-;8R(NA$KZiNU_oWp9kNdDkWY~p zZ^I{18_+tkTPci$bX4VptR~;XA1D|ioak)~4^EU8FSbM=?nUzQJP5rKV2X5(AI9T8 zBeLXB6LLdOmrg3(sbWf^<|VDa{Z&2x{HV32pMQK*E7|2NfHj()Z~}WctFRi`Vwy)G z)NG5`9^S9F^r`JqR&n{rjoOfQ-a)N}ZS7*@x((lnZ(-08osRaTeOlrEpT9{f{V3l)98HOiL*fAHA|vzw+zl z<%Rl{&l>fmv0&! zV;I``HB{*+o1_X~x=gY2G0X_FNaYgxuJ)8FI$e|+Gd+mwDijOaYwhEGazC*38FU^v z)@SI|>%F;Fv3)@8A!?th)IK15A6KF;T$yCx_Y7%$s={$=vj>K`wqMP#W(jXe1a@MUpI)zrEMOU)=m_EKsd wh!?ty&kDMW$bK81xU5RBSXur&89_`czwshk!GzZ95-YG0Ef^(QE>6(@0K%t_CjbBd literal 0 HcmV?d00001 diff --git a/__pycache__/config.cpython-310.pyc b/__pycache__/config.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ea42bb8369866fec6dc003b410d6e327fb476263 GIT binary patch literal 1538 zcmZvc-E!JS6vuZZ@nIPPHrU4TN9v?W%}mFDZcg~(Y`yqMhw2Gi@{qpBUGltOL zymP-P(D@F2!2t+G6uW4EEsVJ>xza$kWMRv$VkyUnDm3;1(U^m8Wh>^yZjlwIat9UT z|G;=u#BA=!4xFY(9Tz%EqaTE=9(UbuP{?%Bzc4}$vM@!K)J2Ybr%+6#i3F+RPJ-e? znw;hZA3X9(Jw^os>tyu z;#)KRQ1Go;j>q)LM513_BWulBhZ-Nx_*41}a&L&-PhtPRf3!?MtR|w|Uams&z}MykzsH)-<$s z)$F#lu?+joVy12oV=Pxb*=LrtS7w>E_Cr_en8ZA7YI@hSBw$SZp#DLSTHb_I)&$6~fQ7*9^6an=DDl&>lF1FqM#jN3UFNrwF;cvfY_S@&Xn=cS2`)?eotC zli~PTh7fx=^g=uAd&2+*$^BvI$GJZijNE?cr@{7RE*UOaE(=^1x#YOyL0Igq>)D~d z#KAI`6)puXt6bK&Y;f5GVaglNbNwg0AKkmDHP0Ep$-K(o8HkGVxPZ5D9-hK~o*9t} zSd1wqSaNhj?P1vO`TP#Dcz82{_i4I>Z6G3!C4<>}_J7l}8Ay0m4Iv3B9#vt68erp%(OBxX7~66rIaFsw z6>Q-P_ietz53(2Eb z0OUIS)sFzwp>FLoZtgZ-?zzal+HZn9Xu>>fqC9HiJZ_RaaUB*>f7i)Ja1Y>~e&*1S zMn5|=V(w3VJ_a7s1bAY327E+Q;Hl*k;A5Hr&n%zP$=A+e`Zw&tS$Jl2lPLD3psY6G zr<~Pv5t`sOSK1_po#vt?wTW+Y%_KDZ8|~Tzw|J=+VV;z|(xPF~#GMqhFLj=%zUykv zWIkH*mb4|urwOH!DYZzpyHNVG9au@EDXl5eHXx}k2E=8-vgU(S!WFuOf8L?LK zvNAzK)JZ-Oy~Z~bWyQ*q62Icn%Lj5D{_0Nv=HxDQa_?J+cJ8x)g}WF{Zy$L)aNqJ| z;DP0%frpl-1CJ~p4?MO!8+c;*WZ)yqrvpzdpACF$`F!A+<>v-Iv3zUbQ{<4tnHhi4 zcD+7grI7T%L`w9sWX9vv1TATp@yl*$g0k(tHz8{Xha7j6XmNmb67_1*V=i@7*h~Z^ z*J~3hjcLhB(YIQzixR}KIB$}oz%`4)q(#v{zH4k}MRBJm^=3q#hZSXr%lb-7Gb;+x zwgQ#|9jT&_+c^Fd!VbdI2+ttwB3wYgP042wE+RaK@I1hMN4|n>gnzE=RUo`Q(Us6p z6+aFvwrgiTSy-RL$bJ1k@oh$}#Gv6v;)|HXxpP1%@bOV5sgtYFr}woy_Mq%bzqBX*}}N`FPxwICF# zK|rEflZp;X&IazjYi5)!Nnh(?NlGo`+9VxRO39frv9#$|2aBnE0rnu9C~L%FBl^9#&*rQT@x&En0IcNhsHR~x7c~u@eJBN zWOZ)lM@)6#H|!e*sQ@dJ3;0}GA7xtNpnoewbyYW4))iC0}wT3BCZtJ1stlS^#(KQI4|&IRAxka%5_nT)e2r=FVthZ=_b kWUIig`nWAYS~kbnH5VRGbAY*X(M`O>oqO}yJlanG1-FqgKmY&$ literal 0 HcmV?d00001 diff --git a/app.py b/app.py new file mode 100644 index 0000000..fe3e683 --- /dev/null +++ b/app.py @@ -0,0 +1,80 @@ +import asyncio, logging, time, json, httpx +from fastapi import FastAPI, HTTPException +from schemas import RequestPayload, ResponseTemplate +from services.address_service import parse_address +from services.normalize_service import normalize_customer +from services.dedupe_service import dedupe_candidates +from services.golden_service import pick_golden +from services.harmonize_service import harmonize +from services.enrich_service import enrich +from config import settings + +app = FastAPI() +logging.basicConfig(level=getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger("mdm.app") + +@app.get("/health") +async def health(): + return {"ok": True} + +@app.get("/llm/ping") +async def llm_ping(): + ep = settings.OLLAMA_ENDPOINTS[0] + try: + async with httpx.AsyncClient(timeout=5) as c: + r = await c.get(f"{ep}/api/tags") + return {"endpoint": ep, "status": r.status_code} + except Exception as e: + raise HTTPException(status_code=502, detail=f"Ollama not reachable at {ep}: {e}") + +@app.post("/mdm/process", response_model=ResponseTemplate) +async def process(payload: RequestPayload): + t0 = time.time() + logger.info(f"[STEP] domain={payload.domain} ops={payload.operations} records={len(payload.records)}") + + issues = [] + + sem_n = asyncio.Semaphore(settings.CONCURRENCY_NORMALIZE) + async def _n(r): + try: + async with sem_n: + return await normalize_customer(r.model_dump()) + except Exception as e: + issues.append({"stage":"normalize","id": getattr(r,'id',None),"error":str(e)}) + return {} + + norm = await asyncio.gather(*[_n(r) for r in payload.records]) + logger.info(f"[STEP] normalize done in {time.time()-t0:.2f}s") + + sem_a = asyncio.Semaphore(settings.CONCURRENCY_ADDRESS) + async def _a(r): + try: + if r.get("address") or r.get("cep"): + async with sem_a: + r["_parsed"] = await parse_address(r) + return r + except Exception as e: + issues.append({"stage":"address","id": r.get('id'),"error":str(e)}) + return r + + norm = await asyncio.gather(*[_a(r) for r in norm]) + logger.info(f"[STEP] address-parse done in {time.time()-t0:.2f}s") + + matches = dedupe_candidates(norm) + golden = pick_golden(norm) if any(op in payload.operations for op in ("consolidate","dedupe")) else None + harm = harmonize(golden or {}) + enr = enrich(norm) if "enrich" in payload.operations else [] + + return ResponseTemplate( + record_clean=norm, + golden_record=golden, + matches=matches, + harmonization=harm, + enrichment=enr, + issues=issues, + actions=[], + pii_masks={}, + audit_log=[], + confidence=0.9 if golden else 0.7 + ) diff --git a/config.py b/config.py new file mode 100644 index 0000000..eb2328d --- /dev/null +++ b/config.py @@ -0,0 +1,25 @@ +from pydantic import BaseModel +from typing import List +import os + +class Settings(BaseModel): + APP_HOST: str = os.getenv("APP_HOST", "0.0.0.0") + APP_PORT: int = int(os.getenv("APP_PORT", "8001")) + OLLAMA_ENDPOINTS: List[str] = [e.strip() for e in os.getenv("OLLAMA_ENDPOINTS","http://localhost:11434").split(",") if e.strip()] + MODEL_ADDRESS: str = os.getenv("MODEL_ADDRESS", "qwen2.5:7b") + MODEL_NORMALIZE: str = os.getenv("MODEL_NORMALIZE", "qwen2.5:7b") + NUM_GPU: int = int(os.getenv("NUM_GPU", "22")) + NUM_BATCH: int = int(os.getenv("NUM_BATCH", "512")) + NUM_CTX: int = int(os.getenv("NUM_CTX", "4096")) + NUM_THREAD: int = int(os.getenv("NUM_THREAD", "16")) + TEMPERATURE: float = float(os.getenv("TEMPERATURE", "0.0")) + TOP_P: float = float(os.getenv("TOP_P", "1.0")) + TOP_K: int = int(os.getenv("TOP_K", "40")) + REQUEST_TIMEOUT: float = float(os.getenv("REQUEST_TIMEOUT", "180")) + LOG_LEVEL: str = os.getenv("LOG_LEVEL","INFO") + CONCURRENCY_NORMALIZE: int = int(os.getenv("CONCURRENCY_NORMALIZE","8")) + CONCURRENCY_ADDRESS: int = int(os.getenv("CONCURRENCY_ADDRESS","8")) + # Optional: best-effort postal lookup (disabled by default) + USE_POSTAL_LOOKUP: bool = os.getenv("USE_POSTAL_LOOKUP","0") in ("1","true","True") + ZIPCODEBASE_KEY: str = os.getenv("ZIPCODEBASE_KEY","") +settings = Settings() diff --git a/prompts/__init__.py b/prompts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prompts/address_prompt.txt b/prompts/address_prompt.txt new file mode 100644 index 0000000..fde450b --- /dev/null +++ b/prompts/address_prompt.txt @@ -0,0 +1,69 @@ +You are a strict address parser/normalizer. Output ONLY valid JSON with exactly these keys: +{"thoroughfare":null,"house_number":null,"neighborhood":null,"city":null,"state":null,"postal_code":null,"country_code":null,"complement":null} + +GLOBAL RULES +- Parse logradouro and number from free text (e.g., "Rua X, 123 - ap 4"). +- Normalize country_code to ISO-2 UPPER. +- Be conservative: if uncertain, set null. Do NOT invent values. +- Never add extra keys. Never return explanations. JSON ONLY. + +## 🇧🇷 Brazil (BR) — Neighborhood vs City Rules (Mandatory) + +**Definitions:** +- `neighborhood` (bairro) ≠ `city` (municipality). +- Examples of **neighborhoods** (not exhaustive): Copacabana, Ipanema, Leblon, Botafogo, Flamengo, Centro, Santa Teresa, Barra da Tijuca, Tijuca, Méier, Madureira, Bangu, Grajaú, Maracanã, Jardim Botânico, Urca, Laranjeiras, Recreio dos Bandeirantes, Jacarepaguá, Campo Grande, etc. + +**Precedence and Conflict Handling (deterministic):** +1. **Postal code is mandatory** and must be canonical `00000-000` when valid. +2. **Postal code lookup** (`find_postal_code`) is the **source of truth** for `city` and `state` when available. +3. If the **input `city`** contains a **neighborhood** (e.g. “Copacabana”): + - `neighborhood = ` + - `city = ` (e.g. “Rio de Janeiro”) + - **Never** assign a neighborhood to `city`. +4. If there is a **conflict** (`city_input` ≠ `city_lookup`): + - Prefer `city_lookup` if it exists and is a valid municipality. + - Move `city_input` to `neighborhood` **only if** it matches a known neighborhood. + - If both are real cities and different → keep `city_lookup` and add an `issues` entry: + `{"field": "city", "type": "conflict", "observed": "", "tool": ""}`. +5. If **no lookup** is available and `city_input` is a neighborhood: + - `neighborhood = ` + - `city = null` + - Add to `enrichment`: + `{"status":"pending","reason":"postal_lookup_missing_city","hint":"neighborhood_detected","value":""}` +6. **Never overwrite `city` with a neighborhood**, even if the input provided it under `city`. +7. `state`: always use **UF code** (e.g. RJ). If input has full name (“Rio de Janeiro”), map to UF when possible. +8. `neighborhood` must never override `city`. If both exist, keep **both** in their proper fields. + +**Output format (Address) — Keys must always be present:** +```json +{ + "thoroughfare": null, + "house_number": null, + "neighborhood": null, + "city": null, + "state": null, + "postal_code": null, + "country_code": null, + "complement": null +} + + +FEW-SHOTS (BR) + +INPUT: +{"address":"Rua Figueiredo Magalhães, 123 - Copacabana","cep":"22041001","city":"Rio de Janeiro","state":"RJ","country_code":"BR"} +OUTPUT: +{"thoroughfare":"Rua Figueiredo Magalhães","house_number":"123","neighborhood":"Copacabana","city":"Rio de Janeiro","state":"RJ","postal_code":"22041-001","country_code":"BR","complement":null} + +INPUT: +{"address":"Av. Paulista 1000, Bela Vista, ap 121","cep":"01310100","city":"São Paulo","state":"SP","country_code":"BR"} +OUTPUT: +{"thoroughfare":"Avenida Paulista","house_number":"1000","neighborhood":"Bela Vista","city":"São Paulo","state":"SP","postal_code":"01310-100","country_code":"BR","complement":"ap 121"} + +INPUT: +{"address":"Rua Jericó, 227 - Sumarezinho","cep":"05435040","country_code":"BR","state":"SP","city":"São Paulo"} +OUTPUT: +{"thoroughfare":"Rua Jericó","house_number":"227","neighborhood":"Sumarezinho","city":"São Paulo","state":"SP","postal_code":"05435-040","country_code":"BR","complement":null} + +NOW PARSE THIS INPUT JSON STRICTLY AND RETURN ONLY THE JSON OBJECT: +{input_json} diff --git a/prompts/customer_prompt.txt b/prompts/customer_prompt.txt new file mode 100644 index 0000000..93e2a6a --- /dev/null +++ b/prompts/customer_prompt.txt @@ -0,0 +1,15 @@ +You are a strict MDM normalizer. Return ONLY valid JSON for a single **customer** record with exactly these keys: +{"source":null,"id":null,"name":null,"cpf":null,"cnpj":null,"email":null,"phone":null,"cep":null,"address":null,"birth_date":null,"city":null,"state":null,"country_code":null} + +Rules (Brazil-first but generic): +- Trim whitespace; names in Title Case; states (UF) UPPER (e.g., "RJ"), country_code ISO-2 UPPER. +- Email: lowercase; RFC-like format; if invalid keep null. +- CPF/CNPJ: if check digits valid, return canonical format (CPF=000.000.000-00, CNPJ=00.000.000/0000-00). If invalid, set null. +- CEP (BR): canonical 00000-000; if cannot canonicalize, set null. +- Phone (BR): return **E.164** (+55DDDNXXXXXXX). If only 8 digits after DDD, prefix '9'. If can't ensure E.164, set null. +- City in Title Case; State as UF 2 letters when BR, else leave state as given or null. +- Never invent values; be conservative: if uncertain set null. +- Never add extra keys. Never return explanations. + +Input JSON: +{input_json} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..078c1ff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +fastapi==0.111.1 +uvicorn[standard]==0.30.5 +httpx==0.27.0 +pydantic==2.8.2 +python-dotenv==1.0.1 +rapidfuzz==3.9.6 diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..96bb78a --- /dev/null +++ b/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e +pip install -r requirements.txt +echo "== MDM Orchestrator LLM-centric ==" +echo "APP_PORT=${APP_PORT:-8001} LOG_LEVEL=${LOG_LEVEL:-INFO}" +echo "OLLAMA_ENDPOINTS=${OLLAMA_ENDPOINTS:-http://localhost:11434}" +echo "NUM_GPU=${NUM_GPU:-22} NUM_BATCH=${NUM_BATCH:-512} NUM_CTX=${NUM_CTX:-4096} NUM_THREAD=${NUM_THREAD:-16}" +echo "CONCURRENCY_NORMALIZE=${CONCURRENCY_NORMALIZE:-8} CONCURRENCY_ADDRESS=${CONCURRENCY_ADDRESS:-8}" +export PYTHONPATH=$(pwd) +uvicorn app:app --host 0.0.0.0 --port "${APP_PORT:-8001}" --workers 1 diff --git a/schemas.py b/schemas.py new file mode 100644 index 0000000..74adf09 --- /dev/null +++ b/schemas.py @@ -0,0 +1,48 @@ +from pydantic import BaseModel, Field +from typing import List, Optional, Literal, Any, Dict + +Domain = Literal["customer","product","supplier","financial","address"] +Operation = Literal["normalize","validate","dedupe","consolidate","harmonize","enrich","mask","outlier_check"] + +class InputRecord(BaseModel): + source: Optional[str] = None + id: Optional[str] = None + name: Optional[str] = None + cpf: Optional[str] = None + cnpj: Optional[str] = None + email: Optional[str] = None + phone: Optional[str] = None + cep: Optional[str] = None + address: Optional[str] = None + birth_date: Optional[str] = None + city: Optional[str] = None + state: Optional[str] = None + country_code: Optional[str] = None + +class RequestPayload(BaseModel): + domain: Domain + operations: List[Operation] + policies: Dict[str, Any] = Field(default_factory=dict) + records: List[InputRecord] + +class AddressOut(BaseModel): + thoroughfare: Optional[str] = None + house_number: Optional[str] = None + neighborhood: Optional[str] = None + city: Optional[str] = None + state: Optional[str] = None + postal_code: Optional[str] = None + country_code: Optional[str] = None + complement: Optional[str] = None + +class ResponseTemplate(BaseModel): + record_clean: List[dict] = Field(default_factory=list) + golden_record: Optional[dict] = None + matches: List[dict] = Field(default_factory=list) + harmonization: dict = Field(default_factory=lambda: {"codes": [], "units": []}) + enrichment: List[dict] = Field(default_factory=list) + issues: List[dict] = Field(default_factory=list) + actions: List[dict] = Field(default_factory=list) + pii_masks: dict = Field(default_factory=dict) + audit_log: List[dict] = Field(default_factory=list) + confidence: float = 0.0 diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/__pycache__/__init__.cpython-310.pyc b/services/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..343c1519160ad15a300deac17afba48c2c19231a GIT binary patch literal 153 zcmd1j<>g`kf&}I586f&Gh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o6v8KO;XkRX@KV z8OSKk$jnYGPt?y%$&D{gEh8P^qPSEO)g&kG+5k&k^BiJf(Q(zk(%_{1W%V{sK4A_wpZv1 zF}4sfw~)O`ilp>8vTLMF*N8>eIvS~v>gSrhPVgyeEIb2i(P0A%?bzi#fM!BD_rq@T z48sl(mJ_!_oqiI9PKQMUhtmr_vIb;uBcbe5-=m3b#IOIpGGs;GR6- zLJPqJ zp*oBSHK5_25tE_Grsummy=XvpqSyl@>G{8O&)pp`){(|imZ5GOzqj+k_?;If&j~l; zOHfT?OPZ*_w2qm?MS)Q-VnjkdWT7YB^Ws|oL^AyqMzRl5zrpf01QX>7PZgqawXdaF z7_XF>GR9o*7g9_L01X%>j3SH@j50Bv=n!!8i9S}iH9=#QSNc^_^T3YO{0q0U$BFm38lFO~8#$)t_Il~yG#p(JCVZ=9p zb8~{nWxkb``D+t}{Vpv}Une!PHd8`O)~4HVw#C)y_0*c-aRvA)8Q+&_1^Cuym_6n7 z>37IRS|RHjXr{2IIR-_r`CDZJk*zf}r)&}=Df#*XpnilDutLrRY4A_f{7xA12;^V= zUwdNyKYk7IH!h!JG_GPaT28Cr$<^tb3q?C%Ku{+MAkz6aHJ1;=iOVfJZb1f1nP5S zJ@UhN$g|2E{}8q;8YV+V9XIiOUud#o8@g?T!-3=R3%e+hhc5SeIjtKH?UJPN9-}U? z%bX5k%3Mfp+R%kQhzJctA+Kan=tOMba$!U<_oFZog@l9HpqrP#%Dp*q_g+I2h8Kk) z^|;WaW}%-mpHrd5Q51lk2KNUv8geE(!819HDm!lM?{sM>&!-K{u1V_Z&-V8ZKB>@rg=N*;8;fDUNLN3~b&o?BKQm3Q8&cU(W9|qIyi2fG}~tWPtpL-OW}pJoSP9kjdvw z6rfl;em8$?VKEp)VIG7cID0>f9D84K>UqYS5;q;E1DWhNG7*y~_>@AcfFcUHZGPB1 zZ8>`nPS`uZnPn2YEiq~DOCHM}?aXcF2hePW0lh2V)MN#wTgU=mSh%E^cu}#isnpc} E092x;4gdfE literal 0 HcmV?d00001 diff --git a/services/__pycache__/common.cpython-310.pyc b/services/__pycache__/common.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..27567a62bd869fcd4ca701317337309bb67ce684 GIT binary patch literal 723 zcmYjP&2AGh5FUH&&5u&3Xc2JX!YL9;4%`q_NQ4`z1U*zlLSb2Z6O;Xm?UJ_a-rzY% zjywTx!2|fpDX+k#jF&`YEYCMHJ2U?5+2Y`!3n(8R{hWQL0KdJnF9A9)aGS@d1PGGC z_#cunkz_>$U6XMj0?868nBZU8m-sLUkQlSdj|WR(RHig$nk7; z4vSI4^5d@4VuUPNnJDDn=ckT*airedpltlj`)$WQnId;QEh1rD=hbB_ORF%#5|szxnfU2Q-I!5XJ{j(&HH?caEAl~k<&I30$cbk?Npz7- z{1`blH3L+X^l&jkZ;t7Ga!CIBjP^*oiQL*|w5h&}CCI*o;96-hovM85I#X$1qb;=y jeGD`P2kH00R8CTTIY@hrCXp`YNwChII7Q4H1rFNc#? zuRNq4p88lTJJH2}ett>;OX3Y;bD4x=S~@T>!X~rq03360$dRuy3M+csbfl&^c(~r- zIt1x2+Hyw~TSnO}o!YDQR5+ezfz;@fa5r%Sk?3wVoQD?hV|lFGWcd2|U3L4+=WloZ89>NA?8OGL`AA}$O}S;38b5g`kf&}I58PY)dF^Gc(44TZzj6fk60I@;X z8HkHnfJ8b2Pz|HsN+5BIr6|9=xCkT-CVn~TXXNLm>gN|E0~y5`nc0cuiTb%Ix$(uR zMP;c)U}j=Lfw4Y_pP8Imte=`!l$o3XmIw0n3My~0L4-ht7K0qg!cYXV)=!fWOe1WA Wu$X~zw>WHa^HWN5QtjZnco+Z={4-Vn literal 0 HcmV?d00001 diff --git a/services/__pycache__/golden_service.cpython-310.pyc b/services/__pycache__/golden_service.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3124866d5502c9c09bf7aa3d07d878e57eac183b GIT binary patch literal 824 zcmZ`$%Zd|06uq~qIu9pN7vjLU8%1aslL2uTf;hS~$OK#jhcR}QNt1NCQ`N~l=!TJG zEBFbSrC;E`)Yesg!G(CMM`kjD1?Sd1^*Hz3s<>Xi12jH9{xST906!hs9E`|If~irE zAaMyjkgN+n59s{$C{yNeKros@21bO?r5lnhA;*+fGL-x)%UOyUTVT#JcnwFpxyZzd zUAp|y2bOzzDdQ`id6!7_!~jD0EasqJWTg#`j%ReL1=F4BgN&0SuXD?iv*!`I&eC3C zX+?HOmXTal@(D_jqV!~GN#?FBEO3F_bj2@sY{|qk9Vnmm_?yFbJ$`WbcI^b$1h`eo zy;6x_9G+14LV`#29I0a}`L_quOtSZYFlEw};2jHV=kh!F%(BM5#kqDeMfmlqIgBIK zjFW&8GYm)llYTRj(Xml_u5^+1ljM2R@i0)PIf%znO^SWujpWQ$TLU#wr-^QJXH8IG z7rSu&ED3|r@zwc8Ya9ptv1vC<5NmZqukZTY@_|Y%H?zo==Kb+Z86Pd`8@IYj($*^! zic;=L_TYTy|Ak0_kJOoVrhH*})IYUChCyn*FjbM!TW+Ilg^o{*zDJ}dJ}Hx0;e6}6 zv|w?hD>VP>V{4WBUJD{KBdSm*l)r0Hf?o$EanZJ;vb5pDy=A%v$1Ly JO0KUO`~d|4w~_z= literal 0 HcmV?d00001 diff --git a/services/__pycache__/harmonize_service.cpython-310.pyc b/services/__pycache__/harmonize_service.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c1ef709116754696d2fb52f44f5828277b6ea8dd GIT binary patch literal 285 zcmd1j<>g`kf&}I58Ad?*F^Gc(44TZzj6fk60I`{Y z*cph6K}MznVG83MkV+;$O{QC{$@wX%#kW{X^D;|{S2Aca-eNAwFE0Y=0TaJm^)vEw zQ}y!;l7WokjLhuB@C~ng61$NJx9&0?H*wh*#Y+VHHHTs^05Yugbso>i4P~ z*XtDoPjc^1<8m3Hzp9UyA00j(LQDS$9Yque$Sx$dasEwgl1t6ja;e*TE)ClNNrF;n z+9pb{TY?8bKwj+SS}%3Tk29u4BZsWrLb4Y_Bo0Y_5=fiPwoFR`xN3cr)<{cJs-y1tFrJSXybK#4zGxHNKrfgOVX}Fs* z{)z9gRGJCQOGG-`@TRGVL&lxLvMVlyrvSYCsAN8ZmbRf2Xo3Y1+5!t*=?mOK+X#T; zlUpPn2X51vRp0*F#`IG0+$qCX&B1IN7ks=qe|0DE!bfs&5> zU$__U9-ws;TE=tLO_KY2H|Y1Gn1^oQ&sgpP0qrJFVR4q!6-WYI8}o)jQsT^uIaN|j zdE}`C-aKodC=`EzmafCBUt@tXw7>_j>5tJI^d+Wvh51$n1lmSRvO-I31$K(xMnaz$ zlu+#oFH2Mx=EMTKK}%O;MdnzPCzT9S6J!-y4O$&q1KJw3o*Up(&swZZ)ZBKU*R910@d$+KL>iwOQNMnvmpEEqu8->8nnt55%i(Y%4tY$Izx)Z5F~=(a*g?_>?YsVkiC zqYq&1dE-}&Uw&A~3w(jV_jS6HmB5QAuj04<7sL0jGy7Ydk_+%t?+4JR;`A%W(qN(M z2Qy8C z^06ztabX)JQ&~|qG3LyrvMN}ZFzy0V8O$?6sM|o6io%qop<=y|W}FB=j#61l1xyP^ z1)nz`UMufB?CR3=;wWOCkb1tdalw7Tq@Kia060zIhb*27t_u5w>SpylH}UsIEK(+;dhescEzOJ`UDga&jU{>m=B_7PQ-8Y9bBki!{!K~bIg^EBtkQSUj31DP1 z^67lm{^lyzew|r$?|x?p*=6*t&g|Yd-HMTpI~0s}pn$p0(ziO@@A490liG+0X+RGA zL>fVS!MLoRo}C<>4xH28;IS;L$n&F7nw%wp>#;m2Y0znyq{T_Kc674eKXi^y&W?JA z2XOLy<^`yh%P zF9Z3x0Pe%Zr;-Z^;6H*hh2!sGgxeDkuUUVHihwrtS949khFW zuio511t-hE$-{j2_>a{>h8IsG!PQ?CX*tdinC>`=j%gfx!l1NWu1H|L+k4V?_7Bc@ x7d&$|@t)FDBrhd2zccf!T5rSQFb&u{>OxCf(7l0bkRSC{h=rS^hAq<4{sZ8#k7fV> literal 0 HcmV?d00001 diff --git a/services/__pycache__/zipcode_service.cpython-310.pyc b/services/__pycache__/zipcode_service.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..31b2264b1b71edd6a54e02394b378160b0bda3ab GIT binary patch literal 5056 zcmbtYU5FgV5uTozot@p=+xyl1=_IdzrM0C!OP1rry1Kf$y6daz^ZWa=8h-ohzgS+M*0euUXY0>K=P^Lk%Q|#& zG&AEg8k_u=oIsSSejPM-=E}aAKsd6x9A_(-UYR{IdtxrmR8GzxKXvkz6KBiYw+emj z$oDFBdW?jtCjE>gTq#m(1HKtw0HGu>+Uu1FG4Y<4H zhA^96dNyX&+nUT{=AGG@Wxp44Wj~GP{hyQ_g8R z=9@Ux8%8pZfwrJUM#t)EZ8I`EI?r^OJpFLqFdgB@d3IHoJc2ad_YQm!E9KDZ>;$*f5x=7 z#kAH4_MG9v%4#E#wROi9=0FqcZ3`Dy)oW}`D~>kz!rsT7TJ4fkT`erhkGr886po%L z><&b|P;={UD1w5sBt;OoSNxz@YE5qX;uY7SYXTQ6U6M0v!A#4VSqXe^W`hp$Hg0g} z)`i~;=Qn-}LTm8Co31lE_xi;`5ISKq_|BbFVbPa`vMHqWH|VZxkmKB8d&|}(#|5#R zRd>+k4i{y;xftt5&)l}-T)6B@zqz!$=tvRgmi=ZRDqgdGNys?o33qAvk}sEipT}m^ z4cFroEhyr?hL1+ALTD4`s(#Z8<$BV~Vi%2?s0%M_joj6f^pZ-)JNiq7Vm40E-mk`e zM}l>)I$LwW9ve#{jQf%;CBm$lzqABZ(rb?NTyF_Z1GB_NO?Yu;?kbEHy6C5r_+na; z2gXKI)?%xg$Ye4$xf42Ny-|&sc+ku23T#h4I*ysaODObfIbCOCY>;Ky1k>5XzwK0( z>Mu$e9V52-(?QXh`T2o=_d+R*4|un+!^I>u$4!!$Nzur0@nOvZ_R0C!D5sLitGQ2l{WXXMiJtH`{p9b zsc53SFxc1{PiU<8bnEb}?*+|T=oHFHT1IJB0q9sjZY0-dC{Fl`iv@vx&=79u7o5Tw z8rpkgF%+^?G~yHiY#_f4=qOvr&Q`_x2GBz*k8MrDj#7GeZ0^*N^DSdv>DeVP;5T}> z*LdP^(Ii;%Jb{$al)~Y0YE2>@#b(3zYjOC3Hc7q)EcV&TC47Yn{uXl3^loG=viMp(GRW?FrF3PDYX2C%!(GD`awV;00|Y#4?K z%2p6cA;PlFU9^zU%Ez!qIje$#+g&{8O6VhgAg)^4HvO5`=(QCia83CNS+;! z`}H{8^jw7Xy4(YPN%=*bb|_xDJ_0eBH6E0Xi---bN)k6^iFjv-H`Q!35IYA>oIU&c zg31#T@iyHz+)W|(5t|u`t6`kOZKeB>u;APs-rElt)*B+WD;3YFi%La4i&@116>HVa zl!r+S4i!fzf^wGbd4-Fbvo0yB%IDN@?^ehI=#_Qpl&v+#4J(0so_J=7Y6}mSU|Dea zEuvA1z9k_Zsq~U>CBNqTtIY<94z8h~sBIz+59>K@e^q*d71?-23# z>%SlT%6xv}6Z3ib7L=e@Pt}+F;pzB4a9^(@>_=J$NjcY5s7ES*9v-TV0gi+jK@f?! zoC=MIAu&#^;?_r$RI|{$qRHK%8L>%7;MPHu3tE`!;O&Dzj+9-#+Ctz(xKAV8BmI^y zM@S?h22OKx1|{f#XKVetn<=n|1U5`}Y^7QK~%HKZCQ0_R%TPkP@Qk?l<#Nb024$#=EjhrRYdWD+Dp)00rcox{EN`gI|mVA8iF+q%SY1-TejdPpSMf;ZSE7&lZNjlDU`4!iST?w42R9t~%aYQ{Q6;jDFG@HOfT2!Q|Aa|FiQN)9QyNYajl!;g1 zfQ`7nif9r7f0}shBb-zg=3Y}Msr(^Hh)wQ$BF+XhSK)qaH2g+_X#_w>+b_}*l)EcD zi4BNTaR}ClO@5=ZBQR(gMb3t$ieBeCV@g$ zxxWpE2Tj+%v{O{)K_w8$8E6@HQUffI{Qo__ed;qx^+#DA9C-uo|Aqdyo!jLy7Aa22 zIaG<2OnR%y*QlSw1$x8K+rJq99H$WqGV&x9bOy13$QTb) zDvsy*0C7Bb0$?zmOl%!b%$`{gb*F)coRF{5JckPUAAvf6CG4RwwkFHpcWZeORBec1}Na=szck@wNZ} literal 0 HcmV?d00001 diff --git a/services/address_service.py b/services/address_service.py new file mode 100644 index 0000000..38af1c5 --- /dev/null +++ b/services/address_service.py @@ -0,0 +1,56 @@ +import httpx, json, itertools, logging, time, asyncio, re +from pathlib import Path +from config import settings +from .common import safe_json_from_text + +logger = logging.getLogger("mdm.services") +_rr = itertools.count() + +BASE_DIR = Path(__file__).resolve().parent.parent +PROMPT_PATH = BASE_DIR / "prompts" / "address_prompt.txt" + +def _ep(): + return settings.OLLAMA_ENDPOINTS[next(_rr)%len(settings.OLLAMA_ENDPOINTS)] + +async def parse_address(record: dict) -> dict: + prompt = PROMPT_PATH.read_text(encoding="utf-8").replace("{input_json}", json.dumps(record, ensure_ascii=False)) + payload = { + "model": settings.MODEL_ADDRESS, + "prompt": prompt, + "format": "json", + "options": { + "num_ctx": settings.NUM_CTX, + "num_batch": settings.NUM_BATCH, + "num_gpu": settings.NUM_GPU, + "num_thread": settings.NUM_THREAD, + "temperature": settings.TEMPERATURE + }, + "stream": False + } + ep = _ep() + timeout = httpx.Timeout(connect=5.0, read=float(settings.REQUEST_TIMEOUT), write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as c: + last_exc = None + for attempt in range(1,4): + try: + t0=time.time() + r = await c.post(f"{ep}/api/generate", json=payload) + dt=time.time()-t0 + logger.info(f"[LLM] address status={r.status_code} time={dt:.2f}s ep={ep} attempt={attempt}") + r.raise_for_status() + data = safe_json_from_text(r.text) + resp = (data.get("response","{}") if isinstance(data, dict) else "{}").strip() + out = json.loads(resp) + if isinstance(out, dict): + # canonicalize BR CEP if model returned '00000000' + if out.get("postal_code") and re.fullmatch(r"\d{8}", out["postal_code"]): + out["postal_code"] = out["postal_code"][:5] + "-" + out["postal_code"][5:] + return out + return { + "thoroughfare": None,"house_number": None,"neighborhood": None,"city": None,"state": None,"postal_code": None,"country_code": None,"complement": None + } + except Exception as e: + last_exc = e + logger.warning(f"[LLM] address attempt {attempt}/3 failed: {e}") + logger.error(f"[LLM] address failed after retries: {last_exc}") + return {"thoroughfare": None,"house_number": None,"neighborhood": None,"city": None,"state": None,"postal_code": None,"country_code": None,"complement": None} diff --git a/services/common.py b/services/common.py new file mode 100644 index 0000000..bfb88f7 --- /dev/null +++ b/services/common.py @@ -0,0 +1,25 @@ +import json, logging + +logger = logging.getLogger("mdm.services") + +def safe_json_from_text(text: str): + try: + return json.loads(text) + except Exception: + pass + first = text.find("{"); last = text.rfind("}") + if first != -1 and last != -1 and last > first: + chunk = text[first:last+1] + try: + return json.loads(chunk) + except Exception: + pass + last_ok = None + for line in text.splitlines(): + line=line.strip() + if not line or not line.startswith("{"): continue + try: + last_ok = json.loads(line) + except Exception: + continue + return last_ok or {} diff --git a/services/dedupe_service.py b/services/dedupe_service.py new file mode 100644 index 0000000..840f4e6 --- /dev/null +++ b/services/dedupe_service.py @@ -0,0 +1,23 @@ +from rapidfuzz import fuzz + +def _sim(a: str, b: str) -> float: + if not a or not b: return 0.0 + return fuzz.token_set_ratio(a, b) / 100.0 + +def _pairs(n: int): + for i in range(n): + for j in range(i+1, n): + yield i, j + +def dedupe_candidates(rows, threshold=0.87): + out = [] + for i,j in _pairs(len(rows)): + a,b = rows[i], rows[j] + s = ( + _sim(a.get("name",""), b.get("name","")) + + max(_sim(a.get("email",""), b.get("email","")), _sim(a.get("phone",""), b.get("phone",""))) + + _sim(a.get("address",""), b.get("address","")) + ) / 3.0 + if s >= threshold: + out.append({"i": i, "j": j, "score": round(s,3)}) + return out diff --git a/services/enrich_service.py b/services/enrich_service.py new file mode 100644 index 0000000..be2e843 --- /dev/null +++ b/services/enrich_service.py @@ -0,0 +1 @@ +def enrich(rows): return [] \ No newline at end of file diff --git a/services/golden_service.py b/services/golden_service.py new file mode 100644 index 0000000..84ebf57 --- /dev/null +++ b/services/golden_service.py @@ -0,0 +1,10 @@ +def pick_golden(rows): + if not rows: return {} + def score(r): return (5 if r.get("source") in ("ERP","CRM") else 0) + sum(1 for v in r.values() if v not in (None,"",[],{})) + best = max(rows, key=score) + gold = dict(best) + for r in rows: + for k,v in r.items(): + if gold.get(k) in (None,"",[],{}) and v not in (None,"",[],{}): + gold[k]=v + return gold diff --git a/services/harmonize_service.py b/services/harmonize_service.py new file mode 100644 index 0000000..2892172 --- /dev/null +++ b/services/harmonize_service.py @@ -0,0 +1 @@ +def harmonize(row): return {'codes':[],'units':[]} \ No newline at end of file diff --git a/services/normalize_service.py b/services/normalize_service.py new file mode 100644 index 0000000..a3874a7 --- /dev/null +++ b/services/normalize_service.py @@ -0,0 +1,57 @@ +import httpx, json, itertools, logging, time, asyncio +from pathlib import Path +from config import settings +from .common import safe_json_from_text +from services.zipcode_service import enrich_address_with_zipcode + +logger = logging.getLogger("mdm.services") +_rr = itertools.count() + +BASE_DIR = Path(__file__).resolve().parent.parent +PROMPT_PATH = BASE_DIR / "prompts" / "customer_prompt.txt" + +def _ep(): + return settings.OLLAMA_ENDPOINTS[next(_rr)%len(settings.OLLAMA_ENDPOINTS)] + +async def normalize_customer(record: dict) -> dict: + record = await enrich_address_with_zipcode(record) + # Minimal pre-cleaning: lower email only to reduce model ambiguity + if record.get("email"): + record["email"] = record["email"].strip().lower() + + prompt = PROMPT_PATH.read_text(encoding="utf-8").replace("{input_json}", json.dumps(record, ensure_ascii=False)) + payload = { + "model": settings.MODEL_NORMALIZE, + "prompt": prompt, + "format": "json", + "options": { + "num_ctx": settings.NUM_CTX, + "num_batch": settings.NUM_BATCH, + "num_gpu": settings.NUM_GPU, + "num_thread": settings.NUM_THREAD, + "temperature": settings.TEMPERATURE + }, + "stream": False + } + ep = _ep() + timeout = httpx.Timeout(connect=5.0, read=float(settings.REQUEST_TIMEOUT), write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as c: + last_exc = None + for attempt in range(1,4): + try: + t0=time.time() + r = await c.post(f"{ep}/api/generate", json=payload) + dt=time.time()-t0 + logger.info(f"[LLM] normalize status={r.status_code} time={dt:.2f}s ep={ep} attempt={attempt}") + r.raise_for_status() + data = safe_json_from_text(r.text) + resp = (data.get("response","{}") if isinstance(data, dict) else "{}").strip() + out = json.loads(resp) + if isinstance(out, dict): + return out + return {} + except Exception as e: + last_exc = e + logger.warning(f"[LLM] normalize attempt {attempt}/3 failed: {e}") + logger.error(f"[LLM] normalize failed after retries: {last_exc}") + return {} diff --git a/services/zipcode_service.py b/services/zipcode_service.py new file mode 100644 index 0000000..6cb35b8 --- /dev/null +++ b/services/zipcode_service.py @@ -0,0 +1,172 @@ +# services/zipcode_service.py +import os, asyncio, time, random, httpx, logging +from typing import Dict, Any + +logger = logging.getLogger("services.zipcode_service") + +ZIPCODEBASE_KEY = os.getenv("ZIPCODEBASE_KEY", "") +ZIPCODEBASE_URL = "https://app.zipcodebase.com/api/v1/search" + +# Cache simples em memória (poderia ser LRU/Redis) +_ZIP_CACHE: Dict[str, Dict[str, Any]] = {} +# "In-flight" para coalescer chamadas concorrentes do mesmo CEP +_INFLIGHT: Dict[str, asyncio.Future] = {} +# Limitar concorrência global das chamadas externas +_SEM = asyncio.Semaphore(int(os.getenv("ZIPCODEBASE_MAX_CONCURRENCY", "4"))) + +def _norm_cep(cep: str) -> str: + if not cep: + return "" + d = "".join(c for c in cep if c.isdigit()) + return d + +async def _via_cep(cep_digits: str) -> Dict[str, Any]: + """Fallback grátis BR (sem limites agressivos).""" + url = f"https://viacep.com.br/ws/{cep_digits}/json/" + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.get(url) + if r.status_code != 200: + logger.warning(f"[ViaCEP] status={r.status_code} for {cep_digits}") + return {} + data = r.json() + if data.get("erro"): + return {} + # Formata postal_code no padrão 00000-000 + pc = f"{cep_digits[:5]}-{cep_digits[5:]}" if len(cep_digits) == 8 else None + return { + "thoroughfare": None, + "house_number": None, + "neighborhood": data.get("bairro"), + "city": data.get("localidade"), + "state": data.get("uf"), + "postal_code": pc, + "country_code": "BR", + "complement": None + } + except Exception as e: + logger.error(f"[ViaCEP] error for {cep_digits}: {e}") + return {} + +async def _zipcodebase_lookup(cep_digits: str, country: str) -> Dict[str, Any]: + """Consulta Zipcodebase com retry/backoff e respeito a Retry-After.""" + params = {"codes": cep_digits, "country": country, "apikey": ZIPCODEBASE_KEY} + retries = 3 + base_delay = float(os.getenv("ZIPCODEBASE_BASE_DELAY", "1.0")) + + timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + for attempt in range(1, retries + 1): + try: + r = await client.get(ZIPCODEBASE_URL, params=params) + if r.status_code == 429: + # Respeita Retry-After se existir + ra = r.headers.get("Retry-After") + if ra: + try: + wait_s = int(ra) + except ValueError: + wait_s = base_delay * attempt + else: + wait_s = base_delay * attempt + # Jitter leve para evitar sincronização + wait_s += random.uniform(0, 0.5) + logger.warning(f"[Zipcodebase] 429 on {cep_digits}, attempt {attempt}/{retries}, sleeping {wait_s:.2f}s") + await asyncio.sleep(wait_s) + continue + + r.raise_for_status() + data = r.json() + results = (data.get("results") or {}).get(cep_digits, []) + if not results: + return {} + enriched = results[0] + # Monta saída mínima estável (Zipcodebase varia por plano) + return { + "thoroughfare": enriched.get("street") or None, + "house_number": None, + "neighborhood": enriched.get("district") or None, + "city": enriched.get("city") or None, + "state": (enriched.get("state_code") or enriched.get("state")) or None, + "postal_code": f"{cep_digits[:5]}-{cep_digits[5:]}" if len(cep_digits) == 8 else cep_digits, + "country_code": country.upper(), + "complement": None + } + except httpx.HTTPStatusError as e: + # Para 4xx (exceto 429) não adianta muito retry + if 400 <= e.response.status_code < 500 and e.response.status_code != 429: + logger.error(f"[Zipcodebase] {e.response.status_code} for {cep_digits}: {e.response.text[:200]}") + return {} + # Para 5xx tenta novamente + wait_s = base_delay * attempt + random.uniform(0, 0.5) + logger.warning(f"[Zipcodebase] {e.response.status_code} retry {attempt}/{retries} in {wait_s:.2f}s") + await asyncio.sleep(wait_s) + except (httpx.ConnectError, httpx.ReadTimeout) as e: + wait_s = base_delay * attempt + random.uniform(0, 0.5) + logger.warning(f"[Zipcodebase] network {type(e).__name__} retry {attempt}/{retries} in {wait_s:.2f}s") + await asyncio.sleep(wait_s) + except Exception as e: + logger.error(f"[Zipcodebase] unexpected error: {e}") + return {} + return {} + +async def enrich_address_with_zipcode(record: Dict[str, Any]) -> Dict[str, Any]: + """Enriquece record['_parsed'] via Zipcodebase com cache, coalescência e fallback ViaCEP.""" + cep_digits = _norm_cep(record.get("cep", "")) + country = (record.get("country_code") or "BR").upper() + + if not cep_digits or len(cep_digits) < 5: + return record + + # 1) cache hit + if cep_digits in _ZIP_CACHE: + record["_parsed"] = _ZIP_CACHE[cep_digits] + return record + + # 2) coalescer chamadas concorrentes do mesmo CEP + # (quem chegar depois aguarda a futura resposta da primeira chamada) + fut = _INFLIGHT.get(cep_digits) + if fut: + try: + parsed = await fut + if parsed: + _ZIP_CACHE[cep_digits] = parsed + record["_parsed"] = parsed + return record + except Exception: + # Se a future falhou, vamos tentar nós mesmos + pass + + # 3) primeira thread: cria a future e executa consulta sob semáforo + loop = asyncio.get_running_loop() + _INFLIGHT[cep_digits] = loop.create_future() + + try: + async with _SEM: + parsed = {} + if ZIPCODEBASE_KEY: + parsed = await _zipcodebase_lookup(cep_digits, country) + + # Fallback a ViaCEP se Zipcodebase falhar/limitar + if not parsed and country == "BR": + parsed = await _via_cep(cep_digits) + + # Guarda no cache + if parsed: + _ZIP_CACHE[cep_digits] = parsed + + # Resolve as esperas coalescidas + if not _INFLIGHT[cep_digits].done(): + _INFLIGHT[cep_digits].set_result(parsed) + + if parsed: + record["_parsed"] = parsed + return record + except Exception as e: + logger.error(f"[Zip] enrich error for {cep_digits}: {e}") + if cep_digits in _INFLIGHT and not _INFLIGHT[cep_digits].done(): + _INFLIGHT[cep_digits].set_result({}) + return record + finally: + # Limpa a chave de in-flight (evita vazamento) + _INFLIGHT.pop(cep_digits, None)