# -*- coding: utf-8 -*- # ============================================== # server_mcp.py — MCP Server (FastMCP) for OCI # ============================================== # Features # - SQLite database storing OCI resource OCIDs (name, type, ocid, compartment, tags) # - Phonetic + fuzzy search (accent-insensitive Soundex + difflib fallback) # - Tools to: add/update/list/search resources; resolve name→OCID; simple memory KV store # - Tool to create OCI resources via `oci` CLI (VM example + generic passthrough) # - Designed for MCP hosts; start with: `python server_mcp.py` # -------------------------------------------------------------- import asyncio import json import os import re import shlex import sqlite3 import subprocess import sys import unicodedata from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from mcp.server.fastmcp import FastMCP import shutil import configparser, os, json import oracledb import json import oci import configparser with open("./config", "r") as f: config_data = json.load(f) mcp = FastMCP("oci-ops") # ------------------------------ # Save Text Log # ------------------------------ def append_line(file_path: str, base: list): """ Save the sequence of commands in `base` to a text file. Args: file_path (str): Path to the text file. base (list): List of command parts to save. """ with open(file_path, "a", encoding="utf-8") as f: # join each item in base with a space if it's a command string command_line = " ".join(map(str, base)) f.write(command_line + "\n") f.flush() # ------------------------------ # OCI CLI execution helper # ------------------------------ class OCI: def __init__(self, profile: Optional[str] = None, bin_path: Optional[str] = None): self.profile = config_data["oci_profile"] self.bin = config_data["OCI_CLI_BIN"] def run(self, args: List[str]) -> Tuple[int, str, str]: try: base = [self.bin] if self.profile: base += ["--profile", self.profile] cmd = base + args append_line("log.txt", cmd) proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) append_line("log.txt", proc.stdout) append_line("log.txt", proc.stderr) append_line("log.txt", "--------------------------") return proc.returncode, proc.stdout, proc.stderr except ex as Exception: append_line("log.txt", str(ex)) oci_cli = OCI(profile=config_data["oci_profile"]) # -------- OCI config helpers -------- import configparser def _read_oci_config(profile: Optional[str]) -> Dict[str, str]: cfg_path = os.path.expanduser("~/.oci/config") cp = configparser.ConfigParser() if os.path.exists(cfg_path): cp.read(cfg_path) prof = config_data["oci_profile"] if cp.has_section(prof): return {k: v for k, v in cp.items(prof)} return {} def _tenancy_ocid() -> Optional[str]: return _read_oci_config(config_data["oci_profile"]).get("tenancy") def _safe_json(s: str) -> Any: try: return json.loads(s) except Exception: return {"raw": s} # --------------------------------- # Phonetic + fuzzy helpers (pt-BR) # --------------------------------- _consonant_map = { "b": "1", "f": "1", "p": "1", "v": "1", "c": "2", "g": "2", "j": "2", "k": "2", "q": "2", "s": "2", "x": "2", "z": "2", "d": "3", "t": "3", "l": "4", "m": "5", "n": "5", "r": "6", } def _normalize(text: str) -> str: text = unicodedata.normalize("NFKD", text) text = "".join(ch for ch in text if not unicodedata.combining(ch)) text = re.sub(r"[^a-zA-Z0-9 ]+", " ", text) return re.sub(r"\s+", " ", text).strip().lower() def ptbr_soundex(word: str, maxlen: int = 6) -> str: w = _normalize(word) if not w: return "" first_letter = w[0] # Remove vowels and h/w/y after first letter, collapse duplicates digits = [] prev = "" for ch in w[1:]: if ch in "aeiouhwy ": code = "" else: code = _consonant_map.get(ch, "") if code and code != prev: digits.append(code) prev = code code = (first_letter + "".join(digits))[:maxlen] return code.ljust(maxlen, "0") from difflib import SequenceMatcher def similarity(a: str, b: str) -> float: return SequenceMatcher(None, _normalize(a), _normalize(b)).ratio() # ------------------------------ # MCP Tools # ------------------------------ @mcp.tool() async def find_subnet(query_text: str) -> dict: """ Find the subnet ocid by the name and the compartment ocid """ structured = f"query subnet resources where displayName =~ '.*{query_text}*.'" code, out, err = oci_cli.run(["search","resource","structured-search","--query-text", structured]) if code != 0: return {"status":"error","stderr": err, "stdout": out} data = json.loads(out) items = data.get("data",{}).get("items",[]) return {"status":"ok","data": items} @mcp.tool() async def list_availability_domains(compartment_ocid: Optional[str] = None) -> Dict[str, Any]: """Lista ADs via `oci iam availability-domain list`.""" cid = compartment_ocid or _tenancy_ocid() if not cid: return {"status": "error", "error": "Missing tenancy compartment OCID."} code, out, err = oci_cli.run(["iam", "availability-domain", "list", "--compartment-id", cid]) if code != 0: return {"status": "error", "stderr": err, "stdout": out} return {"status": "ok", "data": _safe_json(out)} @mcp.tool() async def find_ad(name_or_hint: str, compartment_ocid: Optional[str] = None) -> Dict[str, Any]: """Find the AD by a name (ex.: 'SAOPAULO-1-AD-1').""" lst = await list_availability_domains(compartment_ocid) if lst.get("status") != "ok": return lst items = lst["data"].get("data", []) if isinstance(lst["data"], dict) else [] q = _normalize(name_or_hint) scored = [] for ad in items: adname = ad.get("name") or ad.get("display-name") or "" s = similarity(q, adname) scored.append((s, adname)) scored.sort(reverse=True, key=lambda x: x[0]) if not scored: return {"status": "not_found", "candidates": []} best = scored[0] return {"status": "ok" if best[0] >= 0.6 else "ambiguous", "ad": scored[0][1], "candidates": [n for _, n in scored[:5]]} async def _list_shapes_from_oci(compartment_ocid: Optional[str] = None, ad: Optional[str] = None) -> Dict[str, Any]: """Lista shapes via `oci compute shape list --all` (precisa compartment; AD melhora a lista).""" cid = compartment_ocid or _tenancy_ocid() if not cid: return {"status": "error", "error": "Missing compartment OCID."} args = ["compute", "shape", "list", "--compartment-id", cid, "--all"] if ad: args += ["--availability-domain", ad] code, out, err = oci_cli.run(args) if code != 0: return {"status": "error", "stderr": err, "stdout": out} data = _safe_json(out) return {"status": "ok", "data": data.get("data", []) if isinstance(data, dict) else data} @mcp.tool() async def resolve_shape(hint: str, compartment_ocid: Optional[str] = None, ad: Optional[str] = None) -> Dict[str, Any]: """Resolve shape por dica como 'e4' → melhor match tipo 'VM.Standard.E4.Flex'.""" lst = await _list_shapes_from_oci(compartment_ocid=compartment_ocid, ad=ad) if lst.get("status") != "ok": return lst items = lst["data"] q = _normalize(hint) scored = [] for s in items: name = s.get("shape") or "" s1 = similarity(q, name) # bônus para begins-with no sufixo da família fam = _normalize(name.replace("VM.Standard.", "")) s1 += 0.2 if fam.startswith(q) or q in fam else 0 scored.append((s1, name)) scored.sort(reverse=True, key=lambda x: x[0]) if not scored: return {"status": "not_found", "candidates": []} best = scored[0] return {"status": "ok" if best[0] >= 0.6 else "ambiguous", "shape": best[1], "candidates": [n for _, n in scored[:5]]} @mcp.tool() async def list_shapes(compartment_ocid: Optional[str] = None, ad: Optional[str] = None) -> Dict[str, Any]: """ List all available compute shapes in the given compartment/availability domain. """ lst = await _list_shapes_from_oci(compartment_ocid=compartment_ocid, ad=ad) if lst.get("status") != "ok": return lst items = lst["data"] # simplificar a saída shapes = [{"shape": s.get("shape"), "ocpus": s.get("ocpus"), "memory": s.get("memoryInGBs")} for s in items] return {"status": "ok", "data": shapes} async def list_images(compartment_ocid: Optional[str] = None, operating_system: Optional[str] = None, operating_system_version: Optional[str] = None, shape: Optional[str] = None) -> Dict[str, Any]: """Find the image by a short name or similarity""" cid = compartment_ocid or _tenancy_ocid() if not cid: return {"status": "error", "error": "Missing compartment OCID."} args = ["compute", "image", "list", "--compartment-id", cid, "--all"] if operating_system: args += ["--operating-system", operating_system] if operating_system_version: args += ["--operating-system-version", operating_system_version] if shape: args += ["--shape", shape] code, out, err = oci_cli.run(args) if code != 0: return {"status": "error", "stderr": err, "stdout": out} data = _safe_json(out) items = data.get("data", []) if isinstance(data, dict) else [] return {"status": "ok", "data": items} @mcp.tool() async def resolve_image(query: str, compartment_ocid: Optional[str] = None, shape: Optional[str] = None) -> Dict[str, Any]: """Find the image by a short name or similarity""" # heurística simples para OS/versão q = query.strip() os_name, os_ver = None, None # exemplos: "Oracle Linux 9", "OracleLinux 9", "OL9" if "linux" in q.lower(): os_name = "Oracle Linux" m = re.search(r"(?:^|\\D)(\\d{1,2})(?:\\D|$)", q) if m: os_ver = m.group(1) # primeiro: filtro por OS/versão lst = await list_images(compartment_ocid=compartment_ocid, operating_system=os_name, operating_system_version=os_ver) if lst.get("status") != "ok": return lst items = lst["data"] if not items: # fallback: sem filtro, listar tudo e fazer fuzzy no display-name lst = await list_images(compartment_ocid=compartment_ocid) if lst.get("status") != "ok": return lst items = lst["data"] # rankear por similitude do display-name e data de criação ranked = [] for img in items: dn = img.get("display-name","") s = similarity(query, dn) ts = img.get("time-created") or img.get("time_created") or "" ranked.append((s, ts, img)) ranked.sort(key=lambda x: (x[0], x[1]), reverse=True) if not ranked: return {"status": "not_found", "candidates": []} best = ranked[0][2] # top-5 candidates cands = [] for s, _, img in ranked[:5]: cands.append({"name": img.get("display-name"), "ocid": img["id"], "score": round(float(s), 4)}) status = "ok" if cands and cands[0]["score"] >= 0.65 else "ambiguous" return {"status": status, "resource": cands[0] if cands else None, "candidates": cands} def _norm(s: str) -> str: return _normalize(s) @mcp.tool() async def find_compartment(query_text: str) -> dict: """ Find compartment OCID by the name. The correct OCID is always in the 'identifier' field. """ structured = f"query compartment resources where displayName =~ '.*{query_text}*.'" code, out, err = oci_cli.run([ "search", "resource", "structured-search", "--query-text", structured ]) if code != 0: return {"status": "error", "stderr": err, "stdout": out} data = json.loads(out) items = data.get("data", {}).get("items", []) results = [] for item in items: results.append({ "name": item.get("displayName"), "ocid": item.get("identifier"), # 🔑 este é o OCID correto "lifecycle_state": item.get("lifecycleState"), "time_created": item.get("timeCreated") }) return {"status": "ok", "data": results} @mcp.tool() async def create_compute_instance( compartment_ocid: Optional[str] = None, subnet_ocid: Optional[str] = None, availability_domain: Optional[str] = None, shape: Optional[str] = None, ocpus: Optional[int] = None, # Inteiro opcional memory: Optional[int] = None, # Inteiro opcional image_ocid: Optional[str] = None, display_name: Optional[str] = None, ssh_authorized_keys_path: Optional[str] = None, extra_args: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Create an OCI Compute instance via `oci` CLI. Missing parameters should be asked upstream by the agent. Example: compartment_id: ocid1.compartment.oc1..aaaa... subnet_id: ocid1.subnet.oc1.sa-saopaulo-1.aaaa... shape: VM.Standard.E4.Flex availability_domain: IAfA:SA-SAOPAULO-1-AD-1 image_id: ocid1.image.oc1.sa-saopaulo-1.aaaa... display_name: teste_hoshikawa shape-config: {"ocpus": 2, "memoryInGBs": 16} """ # montar shape-config automaticamente shape_config = None if ocpus is not None and memory is not None: shape_config = json.dumps({"ocpus": ocpus, "memoryInGBs": memory}) args = [ "compute", "instance", "launch", "--compartment-id", compartment_ocid or "", "--subnet-id", subnet_ocid or "", "--shape", shape or "", "--availability-domain", availability_domain or "", "--image-id", image_ocid or "", ] if shape_config: args += ["--shape-config", shape_config] if display_name: args += ["--display-name", display_name] if ssh_authorized_keys_path: args += ["--metadata", json.dumps({ "ssh_authorized_keys": open(ssh_authorized_keys_path, "r", encoding="utf-8").read() })] if extra_args: args += extra_args # validação mínima for flag in ["--compartment-id", "--subnet-id", "--shape", "--availability-domain", "--image-id"]: if "" in [args[args.index(flag) + 1]]: return {"status": "error", "error": f"Missing required {flag} value"} code, out, err = oci_cli.run(args) if code != 0: return {"status": "error", "error": err.strip(), "stdout": out} try: payload = json.loads(out) except Exception: payload = {"raw": out} return {"status": "ok", "oci_result": payload} @mcp.tool() async def oci_cli_passthrough(raw: str) -> Dict[str, Any]: """Run an arbitrary `oci` CLI command (single string). Example: "network vcn list --compartment-id ocid1...""" args = shlex.split(raw) code, out, err = oci_cli.run(args) result = {"returncode": code, "stdout": out, "stderr": err} # try JSON parse try: result["json"] = json.loads(out) except Exception: pass return result # ------------- # Entrypoint # ------------- if __name__ == "__main__": # Start FastMCP server (stdio by default). A host (your agent/IDE) should launch this. mcp.run(transport="stdio")