mirror of
https://github.com/hoshikawa2/mdm_project.git
synced 2026-03-06 10:11:03 +00:00
First commit
This commit is contained in:
0
files/services/__init__.py
Normal file
0
files/services/__init__.py
Normal file
56
files/services/address_service.py
Normal file
56
files/services/address_service.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import httpx, json, itertools, logging, time, asyncio, re
|
||||
from pathlib import Path
|
||||
from files.config import settings
|
||||
from .common import safe_json_from_text
|
||||
|
||||
logger = logging.getLogger("mdm.services")
|
||||
_rr = itertools.count()
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent.parent
|
||||
PROMPT_PATH = BASE_DIR / "prompts" / "address_prompt.txt"
|
||||
|
||||
def _ep():
|
||||
return settings.OLLAMA_ENDPOINTS[next(_rr)%len(settings.OLLAMA_ENDPOINTS)]
|
||||
|
||||
async def parse_address(record: dict) -> dict:
|
||||
prompt = PROMPT_PATH.read_text(encoding="utf-8").replace("{input_json}", json.dumps(record, ensure_ascii=False))
|
||||
payload = {
|
||||
"model": settings.MODEL_ADDRESS,
|
||||
"prompt": prompt,
|
||||
"format": "json",
|
||||
"options": {
|
||||
"num_ctx": settings.NUM_CTX,
|
||||
"num_batch": settings.NUM_BATCH,
|
||||
"num_gpu": settings.NUM_GPU,
|
||||
"num_thread": settings.NUM_THREAD,
|
||||
"temperature": settings.TEMPERATURE
|
||||
},
|
||||
"stream": False
|
||||
}
|
||||
ep = _ep()
|
||||
timeout = httpx.Timeout(connect=5.0, read=float(settings.REQUEST_TIMEOUT), write=30.0, pool=5.0)
|
||||
async with httpx.AsyncClient(timeout=timeout) as c:
|
||||
last_exc = None
|
||||
for attempt in range(1,4):
|
||||
try:
|
||||
t0=time.time()
|
||||
r = await c.post(f"{ep}/api/generate", json=payload)
|
||||
dt=time.time()-t0
|
||||
logger.info(f"[LLM] address status={r.status_code} time={dt:.2f}s ep={ep} attempt={attempt}")
|
||||
r.raise_for_status()
|
||||
data = safe_json_from_text(r.text)
|
||||
resp = (data.get("response","{}") if isinstance(data, dict) else "{}").strip()
|
||||
out = json.loads(resp)
|
||||
if isinstance(out, dict):
|
||||
# canonicalize BR CEP if model returned '00000000'
|
||||
if out.get("postal_code") and re.fullmatch(r"\d{8}", out["postal_code"]):
|
||||
out["postal_code"] = out["postal_code"][:5] + "-" + out["postal_code"][5:]
|
||||
return out
|
||||
return {
|
||||
"thoroughfare": None,"house_number": None,"neighborhood": None,"city": None,"state": None,"postal_code": None,"country_code": None,"complement": None
|
||||
}
|
||||
except Exception as e:
|
||||
last_exc = e
|
||||
logger.warning(f"[LLM] address attempt {attempt}/3 failed: {e}")
|
||||
logger.error(f"[LLM] address failed after retries: {last_exc}")
|
||||
return {"thoroughfare": None,"house_number": None,"neighborhood": None,"city": None,"state": None,"postal_code": None,"country_code": None,"complement": None}
|
||||
25
files/services/common.py
Normal file
25
files/services/common.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import json, logging
|
||||
|
||||
logger = logging.getLogger("mdm.services")
|
||||
|
||||
def safe_json_from_text(text: str):
|
||||
try:
|
||||
return json.loads(text)
|
||||
except Exception:
|
||||
pass
|
||||
first = text.find("{"); last = text.rfind("}")
|
||||
if first != -1 and last != -1 and last > first:
|
||||
chunk = text[first:last+1]
|
||||
try:
|
||||
return json.loads(chunk)
|
||||
except Exception:
|
||||
pass
|
||||
last_ok = None
|
||||
for line in text.splitlines():
|
||||
line=line.strip()
|
||||
if not line or not line.startswith("{"): continue
|
||||
try:
|
||||
last_ok = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
return last_ok or {}
|
||||
23
files/services/dedupe_service.py
Normal file
23
files/services/dedupe_service.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from rapidfuzz import fuzz
|
||||
|
||||
def _sim(a: str, b: str) -> float:
|
||||
if not a or not b: return 0.0
|
||||
return fuzz.token_set_ratio(a, b) / 100.0
|
||||
|
||||
def _pairs(n: int):
|
||||
for i in range(n):
|
||||
for j in range(i+1, n):
|
||||
yield i, j
|
||||
|
||||
def dedupe_candidates(rows, threshold=0.87):
|
||||
out = []
|
||||
for i,j in _pairs(len(rows)):
|
||||
a,b = rows[i], rows[j]
|
||||
s = (
|
||||
_sim(a.get("name",""), b.get("name","")) +
|
||||
max(_sim(a.get("email",""), b.get("email","")), _sim(a.get("phone",""), b.get("phone",""))) +
|
||||
_sim(a.get("address",""), b.get("address",""))
|
||||
) / 3.0
|
||||
if s >= threshold:
|
||||
out.append({"i": i, "j": j, "score": round(s,3)})
|
||||
return out
|
||||
1
files/services/enrich_service.py
Normal file
1
files/services/enrich_service.py
Normal file
@@ -0,0 +1 @@
|
||||
def enrich(rows): return []
|
||||
10
files/services/golden_service.py
Normal file
10
files/services/golden_service.py
Normal file
@@ -0,0 +1,10 @@
|
||||
def pick_golden(rows):
|
||||
if not rows: return {}
|
||||
def score(r): return (5 if r.get("source") in ("ERP","CRM") else 0) + sum(1 for v in r.values() if v not in (None,"",[],{}))
|
||||
best = max(rows, key=score)
|
||||
gold = dict(best)
|
||||
for r in rows:
|
||||
for k,v in r.items():
|
||||
if gold.get(k) in (None,"",[],{}) and v not in (None,"",[],{}):
|
||||
gold[k]=v
|
||||
return gold
|
||||
1
files/services/harmonize_service.py
Normal file
1
files/services/harmonize_service.py
Normal file
@@ -0,0 +1 @@
|
||||
def harmonize(row): return {'codes':[],'units':[]}
|
||||
57
files/services/normalize_service.py
Normal file
57
files/services/normalize_service.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import httpx, json, itertools, logging, time, asyncio
|
||||
from pathlib import Path
|
||||
from files.config import settings
|
||||
from .common import safe_json_from_text
|
||||
from files.services.zipcode_service import enrich_address_with_zipcode
|
||||
|
||||
logger = logging.getLogger("mdm.services")
|
||||
_rr = itertools.count()
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent.parent
|
||||
PROMPT_PATH = BASE_DIR / "prompts" / "customer_prompt.txt"
|
||||
|
||||
def _ep():
|
||||
return settings.OLLAMA_ENDPOINTS[next(_rr)%len(settings.OLLAMA_ENDPOINTS)]
|
||||
|
||||
async def normalize_customer(record: dict) -> dict:
|
||||
record = await enrich_address_with_zipcode(record)
|
||||
# Minimal pre-cleaning: lower email only to reduce model ambiguity
|
||||
if record.get("email"):
|
||||
record["email"] = record["email"].strip().lower()
|
||||
|
||||
prompt = PROMPT_PATH.read_text(encoding="utf-8").replace("{input_json}", json.dumps(record, ensure_ascii=False))
|
||||
payload = {
|
||||
"model": settings.MODEL_NORMALIZE,
|
||||
"prompt": prompt,
|
||||
"format": "json",
|
||||
"options": {
|
||||
"num_ctx": settings.NUM_CTX,
|
||||
"num_batch": settings.NUM_BATCH,
|
||||
"num_gpu": settings.NUM_GPU,
|
||||
"num_thread": settings.NUM_THREAD,
|
||||
"temperature": settings.TEMPERATURE
|
||||
},
|
||||
"stream": False
|
||||
}
|
||||
ep = _ep()
|
||||
timeout = httpx.Timeout(connect=5.0, read=float(settings.REQUEST_TIMEOUT), write=30.0, pool=5.0)
|
||||
async with httpx.AsyncClient(timeout=timeout) as c:
|
||||
last_exc = None
|
||||
for attempt in range(1,4):
|
||||
try:
|
||||
t0=time.time()
|
||||
r = await c.post(f"{ep}/api/generate", json=payload)
|
||||
dt=time.time()-t0
|
||||
logger.info(f"[LLM] normalize status={r.status_code} time={dt:.2f}s ep={ep} attempt={attempt}")
|
||||
r.raise_for_status()
|
||||
data = safe_json_from_text(r.text)
|
||||
resp = (data.get("response","{}") if isinstance(data, dict) else "{}").strip()
|
||||
out = json.loads(resp)
|
||||
if isinstance(out, dict):
|
||||
return out
|
||||
return {}
|
||||
except Exception as e:
|
||||
last_exc = e
|
||||
logger.warning(f"[LLM] normalize attempt {attempt}/3 failed: {e}")
|
||||
logger.error(f"[LLM] normalize failed after retries: {last_exc}")
|
||||
return {}
|
||||
172
files/services/zipcode_service.py
Normal file
172
files/services/zipcode_service.py
Normal file
@@ -0,0 +1,172 @@
|
||||
# services/zipcode_service.py
|
||||
import os, asyncio, time, random, httpx, logging
|
||||
from typing import Dict, Any
|
||||
|
||||
logger = logging.getLogger("services.zipcode_service")
|
||||
|
||||
ZIPCODEBASE_KEY = os.getenv("ZIPCODEBASE_KEY", "")
|
||||
ZIPCODEBASE_URL = "https://app.zipcodebase.com/api/v1/search"
|
||||
|
||||
# Simple in-memory cache (could be LRU/Redis)
|
||||
_ZIP_CACHE: Dict[str, Dict[str, Any]] = {}
|
||||
# "In-flight" to coalesce concurrent calls from the same zip code
|
||||
_INFLIGHT: Dict[str, asyncio.Future] = {}
|
||||
# Limit global competition for external calls
|
||||
_SEM = asyncio.Semaphore(int(os.getenv("ZIPCODEBASE_MAX_CONCURRENCY", "4")))
|
||||
|
||||
def _norm_cep(cep: str) -> str:
|
||||
if not cep:
|
||||
return ""
|
||||
d = "".join(c for c in cep if c.isdigit())
|
||||
return d
|
||||
|
||||
async def _via_cep(cep_digits: str) -> Dict[str, Any]:
|
||||
"""Free BR fallback (no aggressive limits)."""
|
||||
url = f"https://viacep.com.br/ws/{cep_digits}/json/"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
r = await client.get(url)
|
||||
if r.status_code != 200:
|
||||
logger.warning(f"[ViaCEP] status={r.status_code} for {cep_digits}")
|
||||
return {}
|
||||
data = r.json()
|
||||
if data.get("erro"):
|
||||
return {}
|
||||
# Format postal_code in the pattern 00000-000
|
||||
pc = f"{cep_digits[:5]}-{cep_digits[5:]}" if len(cep_digits) == 8 else None
|
||||
return {
|
||||
"thoroughfare": None,
|
||||
"house_number": None,
|
||||
"neighborhood": data.get("bairro"),
|
||||
"city": data.get("localidade"),
|
||||
"state": data.get("uf"),
|
||||
"postal_code": pc,
|
||||
"country_code": "BR",
|
||||
"complement": None
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"[ViaCEP] error for {cep_digits}: {e}")
|
||||
return {}
|
||||
|
||||
async def _zipcodebase_lookup(cep_digits: str, country: str) -> Dict[str, Any]:
|
||||
"""Zipcodebase query with retry/backoff and respect for Retry-After."""
|
||||
params = {"codes": cep_digits, "country": country, "apikey": ZIPCODEBASE_KEY}
|
||||
retries = 3
|
||||
base_delay = float(os.getenv("ZIPCODEBASE_BASE_DELAY", "1.0"))
|
||||
|
||||
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
|
||||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||
for attempt in range(1, retries + 1):
|
||||
try:
|
||||
r = await client.get(ZIPCODEBASE_URL, params=params)
|
||||
if r.status_code == 429:
|
||||
# Respeita Retry-After se existir
|
||||
ra = r.headers.get("Retry-After")
|
||||
if ra:
|
||||
try:
|
||||
wait_s = int(ra)
|
||||
except ValueError:
|
||||
wait_s = base_delay * attempt
|
||||
else:
|
||||
wait_s = base_delay * attempt
|
||||
# Slight jitter to avoid synchronization
|
||||
wait_s += random.uniform(0, 0.5)
|
||||
logger.warning(f"[Zipcodebase] 429 on {cep_digits}, attempt {attempt}/{retries}, sleeping {wait_s:.2f}s")
|
||||
await asyncio.sleep(wait_s)
|
||||
continue
|
||||
|
||||
r.raise_for_status()
|
||||
data = r.json()
|
||||
results = (data.get("results") or {}).get(cep_digits, [])
|
||||
if not results:
|
||||
return {}
|
||||
enriched = results[0]
|
||||
# Assembles minimum stable output (Zipcodebase varies by plan)
|
||||
return {
|
||||
"thoroughfare": enriched.get("street") or None,
|
||||
"house_number": None,
|
||||
"neighborhood": enriched.get("district") or None,
|
||||
"city": enriched.get("city") or None,
|
||||
"state": (enriched.get("state_code") or enriched.get("state")) or None,
|
||||
"postal_code": f"{cep_digits[:5]}-{cep_digits[5:]}" if len(cep_digits) == 8 else cep_digits,
|
||||
"country_code": country.upper(),
|
||||
"complement": None
|
||||
}
|
||||
except httpx.HTTPStatusError as e:
|
||||
# For 4xx (except 429) there is not much point in retrying
|
||||
if 400 <= e.response.status_code < 500 and e.response.status_code != 429:
|
||||
logger.error(f"[Zipcodebase] {e.response.status_code} for {cep_digits}: {e.response.text[:200]}")
|
||||
return {}
|
||||
# For 5xx try again
|
||||
wait_s = base_delay * attempt + random.uniform(0, 0.5)
|
||||
logger.warning(f"[Zipcodebase] {e.response.status_code} retry {attempt}/{retries} in {wait_s:.2f}s")
|
||||
await asyncio.sleep(wait_s)
|
||||
except (httpx.ConnectError, httpx.ReadTimeout) as e:
|
||||
wait_s = base_delay * attempt + random.uniform(0, 0.5)
|
||||
logger.warning(f"[Zipcodebase] network {type(e).__name__} retry {attempt}/{retries} in {wait_s:.2f}s")
|
||||
await asyncio.sleep(wait_s)
|
||||
except Exception as e:
|
||||
logger.error(f"[Zipcodebase] unexpected error: {e}")
|
||||
return {}
|
||||
return {}
|
||||
|
||||
async def enrich_address_with_zipcode(record: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enriches record['_parsed'] via Zipcodebase with ViaCEP caching, coalescing, and fallback."""
|
||||
cep_digits = _norm_cep(record.get("cep", ""))
|
||||
country = (record.get("country_code") or "BR").upper()
|
||||
|
||||
if not cep_digits or len(cep_digits) < 5:
|
||||
return record
|
||||
|
||||
# 1) cache hit
|
||||
if cep_digits in _ZIP_CACHE:
|
||||
record["_parsed"] = _ZIP_CACHE[cep_digits]
|
||||
return record
|
||||
|
||||
# 2) coalesce concurrent calls from the same zip code
|
||||
# (whoever arrives later awaits the future response to the first call)
|
||||
fut = _INFLIGHT.get(cep_digits)
|
||||
if fut:
|
||||
try:
|
||||
parsed = await fut
|
||||
if parsed:
|
||||
_ZIP_CACHE[cep_digits] = parsed
|
||||
record["_parsed"] = parsed
|
||||
return record
|
||||
except Exception:
|
||||
# If future failed, let's try ourselves
|
||||
pass
|
||||
|
||||
# 3) first thread: creates the future and executes the query under semaphore
|
||||
loop = asyncio.get_running_loop()
|
||||
_INFLIGHT[cep_digits] = loop.create_future()
|
||||
|
||||
try:
|
||||
async with _SEM:
|
||||
parsed = {}
|
||||
if ZIPCODEBASE_KEY:
|
||||
parsed = await _zipcodebase_lookup(cep_digits, country)
|
||||
|
||||
# Fallback a ViaCEP se Zipcodebase falhar/limitar
|
||||
if not parsed and country == "BR":
|
||||
parsed = await _via_cep(cep_digits)
|
||||
|
||||
# Store in cache
|
||||
if parsed:
|
||||
_ZIP_CACHE[cep_digits] = parsed
|
||||
|
||||
# Resolves coalesced waits
|
||||
if not _INFLIGHT[cep_digits].done():
|
||||
_INFLIGHT[cep_digits].set_result(parsed)
|
||||
|
||||
if parsed:
|
||||
record["_parsed"] = parsed
|
||||
return record
|
||||
except Exception as e:
|
||||
logger.error(f"[Zip] enrich error for {cep_digits}: {e}")
|
||||
if cep_digits in _INFLIGHT and not _INFLIGHT[cep_digits].done():
|
||||
_INFLIGHT[cep_digits].set_result({})
|
||||
return record
|
||||
finally:
|
||||
# Clean the in-flight switch (prevents leakage)
|
||||
_INFLIGHT.pop(cep_digits, None)
|
||||
Reference in New Issue
Block a user