mirror of
https://github.com/hoshikawa2/agent_oci_automation.git
synced 2026-03-03 16:09:37 +00:00
411 lines
15 KiB
Python
411 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
# ==============================================
|
|
# server_mcp.py — MCP Server (FastMCP) for OCI
|
|
# ==============================================
|
|
# Features
|
|
# - SQLite database storing OCI resource OCIDs (name, type, ocid, compartment, tags)
|
|
# - Phonetic + fuzzy search (accent-insensitive Soundex + difflib fallback)
|
|
# - Tools to: add/update/list/search resources; resolve name→OCID; simple memory KV store
|
|
# - Tool to create OCI resources via `oci` CLI (VM example + generic passthrough)
|
|
# - Designed for MCP hosts; start with: `python server_mcp.py`
|
|
# --------------------------------------------------------------
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
import shlex
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
import shutil
|
|
import configparser, os, json
|
|
|
|
import oracledb
|
|
import json
|
|
import oci
|
|
import configparser
|
|
|
|
with open("./config", "r") as f:
|
|
config_data = json.load(f)
|
|
|
|
mcp = FastMCP("oci-ops")
|
|
|
|
# ------------------------------
|
|
# Save Text Log
|
|
# ------------------------------
|
|
def append_line(file_path: str, base: list):
|
|
"""
|
|
Save the sequence of commands in `base` to a text file.
|
|
|
|
Args:
|
|
file_path (str): Path to the text file.
|
|
base (list): List of command parts to save.
|
|
"""
|
|
with open(file_path, "a", encoding="utf-8") as f:
|
|
# join each item in base with a space if it's a command string
|
|
command_line = " ".join(map(str, base))
|
|
f.write(command_line + "\n")
|
|
f.flush()
|
|
|
|
# ------------------------------
|
|
# OCI CLI execution helper
|
|
# ------------------------------
|
|
|
|
class OCI:
|
|
def __init__(self, profile: Optional[str] = None, bin_path: Optional[str] = None):
|
|
self.profile = config_data["oci_profile"]
|
|
self.bin = config_data["OCI_CLI_BIN"]
|
|
|
|
def run(self, args: List[str]) -> Tuple[int, str, str]:
|
|
try:
|
|
base = [self.bin]
|
|
if self.profile:
|
|
base += ["--profile", self.profile]
|
|
cmd = base + args
|
|
append_line("log.txt", cmd)
|
|
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
append_line("log.txt", proc.stdout)
|
|
append_line("log.txt", proc.stderr)
|
|
append_line("log.txt", "--------------------------")
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
except ex as Exception:
|
|
append_line("log.txt", str(ex))
|
|
|
|
oci_cli = OCI(profile=config_data["oci_profile"])
|
|
|
|
# -------- OCI config helpers --------
|
|
import configparser
|
|
|
|
def _read_oci_config(profile: Optional[str]) -> Dict[str, str]:
|
|
cfg_path = os.path.expanduser("~/.oci/config")
|
|
cp = configparser.ConfigParser()
|
|
if os.path.exists(cfg_path):
|
|
cp.read(cfg_path)
|
|
prof = config_data["oci_profile"]
|
|
if cp.has_section(prof):
|
|
return {k: v for k, v in cp.items(prof)}
|
|
return {}
|
|
|
|
def _tenancy_ocid() -> Optional[str]:
|
|
return _read_oci_config(config_data["oci_profile"]).get("tenancy")
|
|
|
|
def _safe_json(s: str) -> Any:
|
|
try:
|
|
return json.loads(s)
|
|
except Exception:
|
|
return {"raw": s}
|
|
|
|
# ---------------------------------
|
|
# Phonetic + fuzzy helpers (pt-BR)
|
|
# ---------------------------------
|
|
_consonant_map = {
|
|
"b": "1", "f": "1", "p": "1", "v": "1",
|
|
"c": "2", "g": "2", "j": "2", "k": "2", "q": "2", "s": "2", "x": "2", "z": "2",
|
|
"d": "3", "t": "3",
|
|
"l": "4",
|
|
"m": "5", "n": "5",
|
|
"r": "6",
|
|
}
|
|
|
|
def _normalize(text: str) -> str:
|
|
text = unicodedata.normalize("NFKD", text)
|
|
text = "".join(ch for ch in text if not unicodedata.combining(ch))
|
|
text = re.sub(r"[^a-zA-Z0-9 ]+", " ", text)
|
|
return re.sub(r"\s+", " ", text).strip().lower()
|
|
|
|
def ptbr_soundex(word: str, maxlen: int = 6) -> str:
|
|
w = _normalize(word)
|
|
if not w:
|
|
return ""
|
|
first_letter = w[0]
|
|
# Remove vowels and h/w/y after first letter, collapse duplicates
|
|
digits = []
|
|
prev = ""
|
|
for ch in w[1:]:
|
|
if ch in "aeiouhwy ":
|
|
code = ""
|
|
else:
|
|
code = _consonant_map.get(ch, "")
|
|
if code and code != prev:
|
|
digits.append(code)
|
|
prev = code
|
|
code = (first_letter + "".join(digits))[:maxlen]
|
|
return code.ljust(maxlen, "0")
|
|
|
|
from difflib import SequenceMatcher
|
|
|
|
def similarity(a: str, b: str) -> float:
|
|
return SequenceMatcher(None, _normalize(a), _normalize(b)).ratio()
|
|
|
|
# ------------------------------
|
|
# MCP Tools
|
|
# ------------------------------
|
|
@mcp.tool()
|
|
async def find_subnet(query_text: str) -> dict:
|
|
"""
|
|
Find the subnet ocid by the name and the compartment ocid
|
|
"""
|
|
structured = f"query subnet resources where displayName =~ '.*{query_text}*.'"
|
|
code, out, err = oci_cli.run(["search","resource","structured-search","--query-text", structured])
|
|
if code != 0:
|
|
return {"status":"error","stderr": err, "stdout": out}
|
|
data = json.loads(out)
|
|
items = data.get("data",{}).get("items",[])
|
|
return {"status":"ok","data": items}
|
|
|
|
@mcp.tool()
|
|
async def list_availability_domains(compartment_ocid: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Lista ADs via `oci iam availability-domain list`."""
|
|
cid = compartment_ocid or _tenancy_ocid()
|
|
if not cid:
|
|
return {"status": "error", "error": "Missing tenancy compartment OCID."}
|
|
code, out, err = oci_cli.run(["iam", "availability-domain", "list", "--compartment-id", cid])
|
|
if code != 0:
|
|
return {"status": "error", "stderr": err, "stdout": out}
|
|
return {"status": "ok", "data": _safe_json(out)}
|
|
|
|
@mcp.tool()
|
|
async def find_ad(name_or_hint: str, compartment_ocid: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Find the AD by a name (ex.: 'SAOPAULO-1-AD-1')."""
|
|
lst = await list_availability_domains(compartment_ocid)
|
|
if lst.get("status") != "ok":
|
|
return lst
|
|
items = lst["data"].get("data", []) if isinstance(lst["data"], dict) else []
|
|
q = _normalize(name_or_hint)
|
|
scored = []
|
|
for ad in items:
|
|
adname = ad.get("name") or ad.get("display-name") or ""
|
|
s = similarity(q, adname)
|
|
scored.append((s, adname))
|
|
scored.sort(reverse=True, key=lambda x: x[0])
|
|
if not scored:
|
|
return {"status": "not_found", "candidates": []}
|
|
best = scored[0]
|
|
return {"status": "ok" if best[0] >= 0.6 else "ambiguous", "ad": scored[0][1], "candidates": [n for _, n in scored[:5]]}
|
|
|
|
async def _list_shapes_from_oci(compartment_ocid: Optional[str] = None, ad: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Lista shapes via `oci compute shape list --all` (precisa compartment; AD melhora a lista)."""
|
|
cid = compartment_ocid or _tenancy_ocid()
|
|
if not cid:
|
|
return {"status": "error", "error": "Missing compartment OCID."}
|
|
args = ["compute", "shape", "list", "--compartment-id", cid, "--all"]
|
|
if ad:
|
|
args += ["--availability-domain", ad]
|
|
code, out, err = oci_cli.run(args)
|
|
if code != 0:
|
|
return {"status": "error", "stderr": err, "stdout": out}
|
|
data = _safe_json(out)
|
|
return {"status": "ok", "data": data.get("data", []) if isinstance(data, dict) else data}
|
|
|
|
@mcp.tool()
|
|
async def resolve_shape(hint: str, compartment_ocid: Optional[str] = None, ad: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Resolves shape by hint like 'e4' → best match type 'VM.Standard.E4.Flex'."""
|
|
lst = await _list_shapes_from_oci(compartment_ocid=compartment_ocid, ad=ad)
|
|
if lst.get("status") != "ok":
|
|
return lst
|
|
items = lst["data"]
|
|
q = _normalize(hint)
|
|
scored = []
|
|
for s in items:
|
|
name = s.get("shape") or ""
|
|
s1 = similarity(q, name)
|
|
fam = _normalize(name.replace("VM.Standard.", ""))
|
|
s1 += 0.2 if fam.startswith(q) or q in fam else 0
|
|
scored.append((s1, name))
|
|
scored.sort(reverse=True, key=lambda x: x[0])
|
|
if not scored:
|
|
return {"status": "not_found", "candidates": []}
|
|
best = scored[0]
|
|
return {"status": "ok" if best[0] >= 0.6 else "ambiguous", "shape": best[1], "candidates": [n for _, n in scored[:5]]}
|
|
|
|
@mcp.tool()
|
|
async def list_shapes(compartment_ocid: Optional[str] = None, ad: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
List all available compute shapes in the given compartment/availability domain.
|
|
"""
|
|
lst = await _list_shapes_from_oci(compartment_ocid=compartment_ocid, ad=ad)
|
|
if lst.get("status") != "ok":
|
|
return lst
|
|
|
|
items = lst["data"]
|
|
shapes = [{"shape": s.get("shape"), "ocpus": s.get("ocpus"), "memory": s.get("memoryInGBs")} for s in items]
|
|
return {"status": "ok", "data": shapes}
|
|
|
|
async def list_images(compartment_ocid: Optional[str] = None,
|
|
operating_system: Optional[str] = None,
|
|
operating_system_version: Optional[str] = None,
|
|
shape: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Find the image by a short name or similarity"""
|
|
cid = compartment_ocid or _tenancy_ocid()
|
|
if not cid:
|
|
return {"status": "error", "error": "Missing compartment OCID."}
|
|
args = ["compute", "image", "list", "--compartment-id", cid, "--all"]
|
|
if operating_system:
|
|
args += ["--operating-system", operating_system]
|
|
if operating_system_version:
|
|
args += ["--operating-system-version", operating_system_version]
|
|
if shape:
|
|
args += ["--shape", shape]
|
|
code, out, err = oci_cli.run(args)
|
|
if code != 0:
|
|
return {"status": "error", "stderr": err, "stdout": out}
|
|
data = _safe_json(out)
|
|
items = data.get("data", []) if isinstance(data, dict) else []
|
|
return {"status": "ok", "data": items}
|
|
|
|
@mcp.tool()
|
|
async def resolve_image(query: str,
|
|
compartment_ocid: Optional[str] = None,
|
|
shape: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Find the image by a short name or similarity"""
|
|
q = query.strip()
|
|
os_name, os_ver = None, None
|
|
if "linux" in q.lower():
|
|
os_name = "Oracle Linux"
|
|
m = re.search(r"(?:^|\\D)(\\d{1,2})(?:\\D|$)", q)
|
|
if m:
|
|
os_ver = m.group(1)
|
|
|
|
lst = await list_images(compartment_ocid=compartment_ocid, operating_system=os_name, operating_system_version=os_ver)
|
|
if lst.get("status") != "ok":
|
|
return lst
|
|
items = lst["data"]
|
|
if not items:
|
|
# fallback: no filter, list all and make fuzzy on display-name
|
|
lst = await list_images(compartment_ocid=compartment_ocid)
|
|
if lst.get("status") != "ok":
|
|
return lst
|
|
items = lst["data"]
|
|
|
|
# rank by similarity of display-name and creation date
|
|
ranked = []
|
|
for img in items:
|
|
dn = img.get("display-name","")
|
|
s = similarity(query, dn)
|
|
ts = img.get("time-created") or img.get("time_created") or ""
|
|
ranked.append((s, ts, img))
|
|
ranked.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
|
|
if not ranked:
|
|
return {"status": "not_found", "candidates": []}
|
|
|
|
best = ranked[0][2]
|
|
# top-5 candidates
|
|
cands = []
|
|
for s, _, img in ranked[:5]:
|
|
cands.append({"name": img.get("display-name"), "ocid": img["id"], "score": round(float(s), 4)})
|
|
|
|
status = "ok" if cands and cands[0]["score"] >= 0.65 else "ambiguous"
|
|
return {"status": status, "resource": cands[0] if cands else None, "candidates": cands}
|
|
|
|
def _norm(s: str) -> str:
|
|
return _normalize(s)
|
|
|
|
@mcp.tool()
|
|
async def find_compartment(query_text: str) -> dict:
|
|
"""
|
|
Find compartment ocid by the name, the compartment ocid is the identifier field
|
|
"""
|
|
structured = f"query compartment resources where displayName =~ '.*{query_text}*.'"
|
|
code, out, err = oci_cli.run(["search","resource","structured-search","--query-text", structured])
|
|
if code != 0:
|
|
return {"status":"error","stderr": err, "stdout": out}
|
|
data = json.loads(out)
|
|
items = data.get("data",{}).get("items",[])
|
|
return {"status":"ok","data": items}
|
|
|
|
@mcp.tool()
|
|
async def create_compute_instance(
|
|
compartment_ocid: Optional[str] = None,
|
|
subnet_ocid: Optional[str] = None,
|
|
availability_domain: Optional[str] = None,
|
|
shape: Optional[str] = None,
|
|
ocpus: Optional[int] = None, # Inteiro opcional
|
|
memory: Optional[int] = None, # Inteiro opcional
|
|
image_ocid: Optional[str] = None,
|
|
display_name: Optional[str] = None,
|
|
ssh_authorized_keys_path: Optional[str] = None,
|
|
extra_args: Optional[List[str]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create an OCI Compute instance via `oci` CLI.
|
|
Missing parameters should be asked upstream by the agent.
|
|
|
|
Example:
|
|
compartment_id: ocid1.compartment.oc1..aaaa...
|
|
subnet_id: ocid1.subnet.oc1.sa-saopaulo-1.aaaa...
|
|
shape: VM.Standard.E4.Flex
|
|
availability_domain: IAfA:SA-SAOPAULO-1-AD-1
|
|
image_id: ocid1.image.oc1.sa-saopaulo-1.aaaa...
|
|
display_name: teste_hoshikawa
|
|
shape-config: {"ocpus": 2, "memoryInGBs": 16}
|
|
"""
|
|
|
|
# mount shape-config automatically
|
|
shape_config = None
|
|
if ocpus is not None and memory is not None:
|
|
shape_config = json.dumps({"ocpus": ocpus, "memoryInGBs": memory})
|
|
|
|
args = [
|
|
"compute", "instance", "launch",
|
|
"--compartment-id", compartment_ocid or "",
|
|
"--subnet-id", subnet_ocid or "",
|
|
"--shape", shape or "",
|
|
"--availability-domain", availability_domain or "",
|
|
"--image-id", image_ocid or "",
|
|
]
|
|
|
|
if shape_config:
|
|
args += ["--shape-config", shape_config]
|
|
|
|
if display_name:
|
|
args += ["--display-name", display_name]
|
|
|
|
if ssh_authorized_keys_path:
|
|
args += ["--metadata", json.dumps({
|
|
"ssh_authorized_keys": open(ssh_authorized_keys_path, "r", encoding="utf-8").read()
|
|
})]
|
|
|
|
if extra_args:
|
|
args += extra_args
|
|
|
|
# validação mínima
|
|
for flag in ["--compartment-id", "--subnet-id", "--shape", "--availability-domain", "--image-id"]:
|
|
if "" in [args[args.index(flag) + 1]]:
|
|
return {"status": "error", "error": f"Missing required {flag} value"}
|
|
|
|
code, out, err = oci_cli.run(args)
|
|
if code != 0:
|
|
return {"status": "error", "error": err.strip(), "stdout": out}
|
|
|
|
try:
|
|
payload = json.loads(out)
|
|
except Exception:
|
|
payload = {"raw": out}
|
|
|
|
return {"status": "ok", "oci_result": payload}
|
|
|
|
@mcp.tool()
|
|
async def oci_cli_passthrough(raw: str) -> Dict[str, Any]:
|
|
"""Run an arbitrary `oci` CLI command (single string). Example: "network vcn list --compartment-id ocid1..."""
|
|
args = shlex.split(raw)
|
|
code, out, err = oci_cli.run(args)
|
|
result = {"returncode": code, "stdout": out, "stderr": err}
|
|
# try JSON parse
|
|
try:
|
|
result["json"] = json.loads(out)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
# -------------
|
|
# Entrypoint
|
|
# -------------
|
|
if __name__ == "__main__":
|
|
# Start FastMCP server (stdio by default). A host (your agent/IDE) should launch this.
|
|
mcp.run(transport="stdio") |