# app_select_ai.py # --------------------------------------------------------- # Chat + Dashboard (histórico) para SELECT AI no Autonomous Database # --------------------------------------------------------- from flask import Flask, request, render_template_string, session import oracledb, os, json from flask import send_file, jsonify from io import BytesIO import base64 from openpyxl import Workbook from openpyxl.utils import get_column_letter from openpyxl.drawing.image import Image as XLImage from PIL import Image as PILImage # ====================== # CONFIGURAÇÕES DO BANCO # ====================== with open("./config", "r") as f: config_data = json.load(f) WALLET_PATH = config_data["WALLET_PATH"] DB_ALIAS = config_data["DB_ALIAS"] USERNAME = config_data["USERNAME"] PASSWORD = config_data["PASSWORD"] os.environ["TNS_ADMIN"] = WALLET_PATH PROFILE_NAME = "OCI_GENERATIVE_AI_PROFILE" def set_select_ai_profile(conn, requested_tag): # Ativa o profile em cada sessão do pool with conn.cursor() as cur: cur.execute("BEGIN DBMS_CLOUD_AI.SET_PROFILE(:p); END;", p=PROFILE_NAME) pool = oracledb.create_pool( user=USERNAME, password=PASSWORD, dsn=DB_ALIAS, config_dir=WALLET_PATH, wallet_location=WALLET_PATH, wallet_password=PASSWORD, min=1, max=5, increment=1, session_callback=set_select_ai_profile ) # ====================== # APP FLASK # ====================== app = Flask(__name__) app.secret_key = "troque-esta-chave" # necessário p/ sessão (histórico) PAGE = """ ER Analytics · Select AI
ER Analytics · Select AI
Respostas e gráficos no topo; pergunta no rodapé. A última resposta fica visível automaticamente.
{% if timeline %}
Conexão: {{ db_alias }} Usuário: {{ session_user }} Profile AI: {{ profile or '—' }} |
{% for item in timeline %}
Pergunta: {{ item.prompt }}
{% if item.sql %}
{{ item.sql }}
{% endif %} {% if item.chart %}
Barras Linhas Pizza {% if item.table %} Mostrar/ocultar Tabela {% endif %}
{% endif %} {% if item.table %}
{% for h in item.table.headers %}{% endfor %} {% for r in item.table.rows %} {% for c in r %}{% endfor %} {% endfor %}
{{ h }}
{{ c }}
{% endif %}
{% endfor %}
{% endif %}
""" def _auto_width(ws, start_col, end_col, extra=2): for col_idx in range(start_col, end_col+1): col_letter = get_column_letter(col_idx) max_len = 0 for cell in ws[col_letter]: try: max_len = max(max_len, len(str(cell.value)) if cell.value is not None else 0) except: pass ws.column_dimensions[col_letter].width = min(50, max(12, max_len + extra)) @app.route("/export/xlsx", methods=["POST"]) def export_xlsx(): """ Espera JSON com uma lista 'items'. Cada item: { "question": str, "sql": str|None, "table": { "headers":[...], "rows":[[...], ...] } | None, "chartPng": "data:image/png;base64,..." | None } Retorna um arquivo XLSX com 1 aba por resposta. """ try: payload = request.get_json(force=True) items = payload.get("items", []) if not items: return jsonify({"error":"sem itens"}), 400 wb = Workbook() # use a folha ativa como “Resumo” ws0 = wb.active ws0.title = "Resumo" ws0["A1"] = "Export gerado pelo ER Analytics · Select AI" ws0["A2"] = f"Total de respostas: {len(items)}" for i, it in enumerate(items, start=1): title = f"Resp {i}" ws = wb.create_sheet(title[:31]) # Pergunta ws["A1"] = "Pergunta:" ws["A1"].font = ws["A1"].font.copy(bold=True) ws["B1"] = it.get("question") or f"Resposta {i}" ws.row_dimensions[1].height = 22 # SQL sql = (it.get("sql") or "").strip() if sql: ws["A2"] = "SQL:" ws["A2"].font = ws["A2"].font.copy(bold=True) ws["B2"] = sql row = 4 # Tabela table = it.get("table") if table and table.get("headers"): headers = table["headers"] rows = table.get("rows", []) for j, h in enumerate(headers, start=1): cell = ws.cell(row=row, column=j, value=h) cell.font = cell.font.copy(bold=True) cell.fill = cell.fill.copy() row += 1 for r in rows: for j, val in enumerate(r, start=1): ws.cell(row=row, column=j, value=val) row += 1 # largura de colunas _auto_width(ws, 1, len(headers)) row += 1 # Gráfico como PNG (se vier) chart_png = it.get("chartPng") if chart_png: try: # aceita dataURL com prefixo if "," in chart_png: chart_png = chart_png.split(",", 1)[1] img_bytes = base64.b64decode(chart_png) # normaliza via PIL (corrige metadados) pil = PILImage.open(BytesIO(img_bytes)).convert("RGBA") buf = BytesIO() pil.save(buf, format="PNG") buf.seek(0) xl_img = XLImage(buf) anchor_cell = f"A{row}" ws.add_image(xl_img, anchor_cell) # empurra linhas para não sobrepor texto row += 20 except Exception as e: # segue sem o gráfico pass # remove folha “Sheet” se sobrar (caso libs criem extra) for sh in list(wb.sheetnames): if sh.lower().startswith("sheet") and sh != ws0.title: try: del wb[sh] except: pass out = BytesIO() wb.save(out) out.seek(0) return send_file( out, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name="respostas-select-ai.xlsx", ) except Exception as e: return jsonify({"error": str(e)}), 500 def _to_float(x): if x is None: return None s = str(x).strip().replace(',', '.') if s.endswith('%'): s = s[:-1] try: return float(s) except Exception: return None def build_chart(headers, rows): """ Gera metadados de gráfico + eixos/legenda: - 1x1 percentual -> pizza (Internados vs Não) - 2 colunas (cat, num) -> barras - 3 colunas com (categoria, count, percentage) -> usa percentage; senão count - 3 colunas com temporal (col contém MONTH/YEAR/DATE) -> linha Retorna dict {type, labels, values, seriesLabel, title, xLabel, yLabel} ou None """ if not headers or not rows: return None H = [h.upper() for h in headers] # 1) 1x1 percentual if len(headers) == 1 and len(rows) == 1: v = _to_float(rows[0][0]) if v is None: return None pct = v*100 if 0 <= v <= 1 else v if 0 <= pct <= 100: return { "type": "pie", "labels": ["Internados", "Não internados"], "values": [round(pct,2), round(100-pct,2)], "seriesLabel": "Percentual", "title": f"{headers[0]}", "xLabel": "Status", "yLabel": "%" } # 2) 2 colunas: categoria × valor if len(headers) == 2: labels, values = [], [] for r in rows: labels.append(str(r[0])) vf = _to_float(r[1]) if vf is None: return None values.append(vf) return { "type": "bar", "labels": labels, "values": values, "seriesLabel": headers[1], "title": f"{headers[1]} por {headers[0]}", "xLabel": headers[0], "yLabel": headers[1] } # 3) 3 colunas: temporal ou categoria + (count, percentage) if len(headers) == 3: # 3a) temporal idx_time = next((i for i,h in enumerate(H) if any(k in h for k in ["DATE","DATA","MONTH","YEAR","TIME"])), None) if idx_time is not None: idx_val = 2 labels, values = [], [] for r in rows: labels.append(str(r[idx_time])) vf = _to_float(r[idx_val]) if vf is None: return None values.append(vf) return { "type": "line", "labels": labels, "values": values, "seriesLabel": headers[idx_val], "title": f"{headers[idx_val]} por {headers[idx_time]}", "xLabel": headers[idx_time], "yLabel": headers[idx_val] } # 3b) categoria + count + percentage idx_pct = next((i for i,h in enumerate(H) if "PERCENT" in h), None) idx_cnt = next((i for i,h in enumerate(H) if any(k in h for k in ["COUNT","NUMBER"])), None) idx_cat = 0 idx_val = idx_pct if idx_pct is not None else idx_cnt if idx_val is not None: labels, values = [], [] for r in rows: labels.append(str(r[idx_cat])) vf = _to_float(r[idx_val]) if vf is None: return None values.append(vf) ylab = headers[idx_val] + (" (%)" if idx_val == idx_pct else "") return { "type": "bar" if idx_val == idx_cnt else "pie", "labels": labels, "values": values, "seriesLabel": headers[idx_val], "title": f"{headers[idx_val]} por {headers[idx_cat]}", "xLabel": headers[idx_cat], "yLabel": ylab } return None def format_table(headers, rows, limit=500): if not headers: return None rows = rows if len(rows) <= limit else rows[:limit] return {"headers": headers, "rows": rows} def run_select_ai(nl_prompt, table_name): frase_segura = nl_prompt.replace("'", "''") sql = f"SELECT AI '{frase_segura}' FROM {table_name}" with pool.acquire() as conn: with conn.cursor() as cur: cur.execute("select user, dbms_cloud_ai.get_profile() from dual") user_, profile_ = cur.fetchone() cur.execute(sql) rows = cur.fetchall() headers = [d[0] for d in cur.description] if cur.description else [] return sql, headers, rows, user_, profile_ def ensure_timeline(): if "timeline" not in session: session["timeline"] = [] return session["timeline"] @app.route("/", methods=["GET","POST"]) def index(): default_table = "MEU_USUARIO.NLU_ED_ADMISSION" timeline = ensure_timeline() if request.method == "POST": frase = request.form["frase"].strip() tabela = request.form.get("tabela", default_table).strip() try: sql, headers, rows, user_, profile_ = run_select_ai(frase, tabela) table = format_table(headers, rows) chart = build_chart(headers, rows) # empilha no histórico (mantém últimos 10) timeline.append({ "prompt": frase, "sql": sql, "table": table, "chart": chart }) if len(timeline) > 10: timeline[:] = timeline[-10:] session["timeline"] = timeline return render_template_string( PAGE, timeline=timeline, default_table=default_table, db_alias=DB_ALIAS, session_user=user_, profile=profile_ ) except Exception as e: timeline.append({ "prompt": frase, "sql": None, "table": {"headers": ["Erro"], "rows": [[str(e)]]}, "chart": None }) if len(timeline) > 10: timeline[:] = timeline[-10:] session["timeline"] = timeline return render_template_string( PAGE, timeline=timeline, default_table=default_table, db_alias=DB_ALIAS, session_user=USERNAME, profile=None ) # GET return render_template_string( PAGE, timeline=timeline, default_table=default_table, db_alias=DB_ALIAS, session_user=USERNAME, profile=None ) if __name__ == "__main__": app.run(debug=True, port=5001)