First commit

This commit is contained in:
2026-02-27 09:28:39 -03:00
commit 00b64c2534
19 changed files with 1895 additions and 0 deletions

View File

@@ -0,0 +1,232 @@
from __future__ import annotations
import argparse
import json
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
from pptx import Presentation
# ============================================================
# PATHS / CONFIG (flexible via env vars and CLI)
# ============================================================
def _env_path(name: str, default: Optional[str] = None) -> Optional[Path]:
v = os.getenv(name, default)
return Path(v).expanduser() if v else None
OPENCLAW_WORKDIR = _env_path("OPENCLAW_WORKDIR", ".") # default: current directory
PPTX_TEMPLATE_PATH = _env_path(
"PPTX_TEMPLATE_PATH",
str(OPENCLAW_WORKDIR / "template_openclaw_oci_clean.pptx"),
)
PPTX_OUTPUT_PATH = _env_path(
"PPTX_OUTPUT_PATH",
str(OPENCLAW_WORKDIR / "openclaw_oci_presentation.pptx"),
)
# Prefer OCI_CONTENT_FILE (policy name) but accept PPTX_CONTENT_PATH too
PPTX_CONTENT_PATH = _env_path(
"OCI_CONTENT_FILE",
os.getenv("PPTX_CONTENT_PATH", str(OPENCLAW_WORKDIR / "content.json")),
)
DEFAULT_LINK = "https://docs.oracle.com/en-us/iaas/Content/generative-ai/home.htm"
DEFAULT_PRESENTER = os.getenv("PPTX_PRESENTER", "Cristiano Hoshikawa")
DEFAULT_COVER_SUBTITLE = os.getenv("PPTX_COVER_SUBTITLE", "Architecture")
# ============================================================
# TEMPLATE ENGINE
# ============================================================
class RedwoodSafePPT:
"""
Loads a PPTX template, wipes all existing slides safely, and builds a new deck
using named layouts from the template.
"""
LAYOUT_COVER = "Cover 1 - Full Image"
LAYOUT_CONTENT = "Full Page - Light"
def __init__(self, template_path: Path):
template_path = Path(template_path).expanduser()
if not template_path.exists():
raise FileNotFoundError(f"Template not found: {template_path}")
self.prs = Presentation(str(template_path))
# Remove ALL slides
while len(self.prs.slides) > 0:
rId = self.prs.slides._sldIdLst[0].rId
self.prs.part.drop_rel(rId)
del self.prs.slides._sldIdLst[0]
self.layouts = {layout.name: layout for layout in self.prs.slide_layouts}
def _layout(self, name: str):
if name not in self.layouts:
available = ", ".join(sorted(self.layouts.keys()))
raise ValueError(f"Layout '{name}' not found in template. Available: {available}")
return self.layouts[name]
def add_content(self, title: str, subhead: str, body: str):
slide = self.prs.slides.add_slide(self._layout(self.LAYOUT_CONTENT))
text_placeholders = [ph for ph in slide.placeholders if getattr(ph, "has_text_frame", False)]
if len(text_placeholders) < 2:
raise RuntimeError("Content layout must have at least 2 text placeholders.")
text_placeholders[0].text = title
text_placeholders[1].text = f"{subhead}\n\n{body}"
def add_cover(self, title: str, subtitle: str, presenter: str):
slide = self.prs.slides.add_slide(self._layout(self.LAYOUT_COVER))
text_placeholders = [ph for ph in slide.placeholders if getattr(ph, "has_text_frame", False)]
if len(text_placeholders) < 2:
raise RuntimeError("Cover layout must have at least 2 text placeholders.")
text_placeholders[0].text = title
text_placeholders[1].text = subtitle
# Optional placeholders by name
for ph in text_placeholders:
name = (getattr(ph, "name", "") or "").lower()
if "date" in name:
ph.text = datetime.now().strftime("%d %b %Y")
if "presenter" in name:
ph.text = presenter
def save(self, output_path: Path):
output_path = Path(output_path).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
if output_path.exists():
output_path.unlink()
self.prs.save(str(output_path))
# ============================================================
# DECK (fixed 7 slides)
# ============================================================
class OCIStrategicArchitectDeck:
def __init__(self, template_path: Path):
self.ppt = RedwoodSafePPT(template_path)
def _format_section(self, section: Dict[str, Any]) -> str:
bullets = section.get("bullets", []) or []
evidence = section.get("evidence", []) or []
keywords = section.get("keywords", []) or []
lines = []
for b in bullets:
lines.append(f"{str(b).strip()}")
if evidence:
lines.append("")
lines.append("Evidence:")
for e in evidence[:2]:
lines.append(f"- {str(e).strip()}")
if keywords:
lines.append("")
lines.append("Keywords: " + ", ".join([str(k).strip() for k in keywords[:8]]))
return "\n".join(lines).strip()
def build(self, material_link: str, content: Dict[str, Any], presenter: str, cover_subtitle: str):
# 1) Cover
self.ppt.add_cover(
title=str(content["cover_title"]).strip(),
subtitle=cover_subtitle,
presenter=presenter,
)
# 2) Intro
self.ppt.add_content(
title="Intro",
subhead="Context and Motivation",
body=self._format_section(content["introduction"]),
)
# 3) Technologies
self.ppt.add_content(
title="Technologies",
subhead="Stack OCI",
body=self._format_section(content["technologies"]),
)
# 4) Architecture
self.ppt.add_content(
title="Architecture",
subhead="Architecture Flow",
body=self._format_section(content["architecture"]),
)
# 5) Problems
self.ppt.add_content(
title="Problems",
subhead="Technical Challenges",
body=self._format_section(content["problems"]),
)
# 6) Demo
self.ppt.add_content(
title="Demo",
subhead="Materials",
body=f"{material_link}\n\n{self._format_section(content['demo'])}",
)
# 7) Conclusion
self.ppt.add_content(
title="Conclusion",
subhead="Strategies",
body=self._format_section(content["conclusion"]),
)
if len(self.ppt.prs.slides) != 7:
raise RuntimeError("Deck must contain exactly 7 slides.")
def save(self, output_path: Path):
self.ppt.save(output_path)
# ============================================================
# CLI
# ============================================================
def _load_json(path: Path) -> Dict[str, Any]:
path = Path(path).expanduser()
if not path.exists():
raise FileNotFoundError(f"content.json not found: {path}")
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def main():
parser = argparse.ArgumentParser(description="Generate a 7-slide OCI strategic PPTX from a template + content.json.")
parser.add_argument("--template", default=str(PPTX_TEMPLATE_PATH), help="Path to the PPTX template file.")
parser.add_argument("--output", default=str(PPTX_OUTPUT_PATH), help="Path to the output PPTX to be written.")
parser.add_argument("--content", default=str(PPTX_CONTENT_PATH), help="Path to content.json.")
parser.add_argument("--link", default=os.getenv("OCI_LINK_DEMO", DEFAULT_LINK), help="Source link shown on Demo slide.")
parser.add_argument("--presenter", default=DEFAULT_PRESENTER, help="Presenter name on cover (if placeholder exists).")
parser.add_argument("--cover-subtitle", default=DEFAULT_COVER_SUBTITLE, help="Cover subtitle.")
args = parser.parse_args()
content = _load_json(Path(args.content))
deck = OCIStrategicArchitectDeck(Path(args.template))
deck.build(args.link, content, presenter=args.presenter, cover_subtitle=args.cover_subtitle)
deck.save(Path(args.output))
print("✅ PPT generated:", Path(args.output).expanduser().resolve())
if __name__ == "__main__":
main()

752
project/oci_openai_proxy.py Normal file
View File

@@ -0,0 +1,752 @@
import os
import time
import json
import uuid
from typing import Optional, List, Dict, Any
import re
import subprocess
import requests
import oci
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict
import requests
import os
import requests
# ============================================================
# CONFIG
# ============================================================
OCI_CONFIG_FILE = os.getenv("OCI_CONFIG_FILE", os.path.expanduser("~/.oci/config"))
OCI_PROFILE = os.getenv("OCI_PROFILE", "DEFAULT")
OCI_COMPARTMENT_ID = os.getenv("OCI_COMPARTMENT_ID", "<YOUR_COMPARTMENT_ID>")
OCI_GENAI_ENDPOINT = os.getenv(
"OCI_GENAI_ENDPOINT",
"https://inference.generativeai.<region>.oci.oraclecloud.com"
)
if not OCI_COMPARTMENT_ID:
raise RuntimeError("OCI_COMPARTMENT_ID not defined")
OPENCLAW_TOOLS_ACTIVE = True
# ============================================================
# PROMPTS to adapt for OCI
# ============================================================
SYSTEM_AGENT_PROMPT = """
You are an autonomous software agent.
You have full access to the local machine.
Available tools:
- weather(city: string)
- exec(command: string)
If a system command is required, respond ONLY with:
{
"action": "call_tool",
"tool": "exec",
"arguments": {
"command": "<shell command>"
}
}
***VERY IMPORTANT***: A TASK IS CONSIDERED COMPLETED WHEN IT RESULTS IN A ARTIFACT ASKED FROM THE USER
If task is completed:
{
"action": "final_answer",
"content": "<result>"
}
"""
PROMPT_PATH = os.path.expanduser("pptx_runner_policy_strict.txt")
def load_runner_policy():
if os.path.exists(PROMPT_PATH):
with open(PROMPT_PATH, "r", encoding="utf-8") as f:
return f.read()
return ""
RUNNER_POLICY = load_runner_policy()
RUNNER_PROMPT = (
RUNNER_POLICY + "\n\n"
"You are a Linux execution agent.\n"
"\n"
"OUTPUT CONTRACT (MANDATORY):\n"
"- You must output EXACTLY ONE of the following per response:\n"
" A) (exec <command>)\n"
" B) (done <final answer>)\n"
"\n"
"STRICT RULES:\n"
"1) NEVER output raw commands without (exec <command>). Raw commands will be ignored.\n"
"2) NEVER output explanations, markdown, code fences, bullets, or extra text.\n"
"3) If you need to create multi-line files, you MUST use heredoc inside (exec <command>), e.g.:\n"
" (exec cat > file.py << 'EOF'\n"
" ...\n"
" EOF)\n"
"4) If the previous tool result shows an error, your NEXT response must be (exec <command>) to fix it.\n"
"5) When the artifact is created successfully, end with (done ...).\n"
"\n"
"REMINDER: Your response must be only a single parenthesized block."
)
# Mapeamento OpenAI → OCI
MODEL_MAP = {
"gpt-5": "openai.gpt-4.1",
"openai/gpt-5": "openai.gpt-4.1",
"openai-compatible/gpt-5": "openai.gpt-4.1",
}
# ============================================================
# FASTAPI APP
# ============================================================
app = FastAPI(title="OCI OpenAI-Compatible Gateway")
# ============================================================
# OCI SIGNER
# ============================================================
def get_signer():
config = oci.config.from_file(OCI_CONFIG_FILE, OCI_PROFILE)
return oci.signer.Signer(
tenancy=config["tenancy"],
user=config["user"],
fingerprint=config["fingerprint"],
private_key_file_location=config["key_file"],
pass_phrase=config.get("pass_phrase"),
)
# ============================================================
# OCI CHAT CALL (OPENAI FORMAT)
# ============================================================
def _openai_messages_to_generic(messages: list) -> list:
"""
OpenAI: {"role":"user","content":"..."}
Generic: {"role":"USER","content":[{"type":"TEXT","text":"..."}]}
"""
out = []
for m in messages or []:
role = (m.get("role") or "user").upper()
# OCI GENERIC geralmente espera USER/ASSISTANT
if role == "SYSTEM":
role = "USER"
elif role == "TOOL":
role = "USER"
content = m.get("content", "")
# Se vier lista (OpenAI multimodal), extrai texto
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") in ("text", "TEXT"):
parts.append(item.get("text", ""))
content = "\n".join(parts)
out.append({
"role": role,
"content": [{"type": "TEXT", "text": str(content)}]
})
return out
def build_generic_messages(openai_messages: list, system_prompt: str) -> list:
out = []
# 1) Injeta o system como PRIMEIRA mensagem USER, com prefixo fixo
out.append({
"role": "USER",
"content": [{"type":"TEXT","text": "SYSTEM:\n" + system_prompt.strip()}]
})
# 2) Depois converte o resto, ignorando systems originais
for m in openai_messages or []:
role = (m.get("role") or "user").lower()
if role == "system":
continue
r = "USER" if role in ("user", "tool") else "ASSISTANT"
content = m.get("content", "")
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") in ("text","TEXT"):
parts.append(item.get("text",""))
content = "\n".join(parts)
out.append({
"role": r,
"content": [{"type":"TEXT","text": str(content)}]
})
return out
def call_oci_chat(body: dict, system_prompt: str):
signer = get_signer()
model = body.get("model")
oci_model = MODEL_MAP.get(model, model)
url = f"{OCI_GENAI_ENDPOINT}/20231130/actions/chat"
# generic_messages = _openai_messages_to_generic(body.get("messages", []))
generic_messages = build_generic_messages(body.get("messages", []), system_prompt)
payload = {
"compartmentId": OCI_COMPARTMENT_ID,
"servingMode": {
"servingType": "ON_DEMAND",
"modelId": oci_model
},
"chatRequest": {
"apiFormat": "GENERIC",
"messages": generic_messages,
"maxTokens": int(body.get("max_tokens", 4000)),
"temperature": float(body.get("temperature", 0.0)),
"topP": float(body.get("top_p", 1.0)),
}
}
# ⚠️ IMPORTANTÍSSIMO:
# Em GENERIC, NÃO envie tools/tool_choice/stream (você orquestra tools no proxy)
# Se você mandar, pode dar 400 "correct format of request".
# print("\n=== PAYLOAD FINAL (GENERIC) ===")
# print(json.dumps(payload, indent=2, ensure_ascii=False))
r = requests.post(url, json=payload, auth=signer)
if r.status_code != 200:
print("OCI ERROR:", r.text)
raise HTTPException(status_code=r.status_code, detail=r.text)
return r.json()["chatResponse"]
def detect_tool_call(text: str):
pattern = r"exec\s*\(\s*([^\s]+)\s*(.*?)\s*\)"
match = re.search(pattern, text)
if not match:
return None
tool_name = "exec"
command = match.group(1)
args = match.group(2)
return {
"tool": tool_name,
"args_raw": f"{command} {args}".strip()
}
def execute_exec_command(command: str):
try:
print(f"LOG: EXEC COMMAND: {command}")
p = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=120 # ajuste
)
out = (p.stdout or "") + (p.stderr or "")
return out if out.strip() else f"(no output) exit={p.returncode}"
except subprocess.TimeoutExpired:
return "ERROR: command timed out"
TOOLS = {
"weather": lambda city: get_weather_from_api(city),
"exec": lambda command: execute_exec_command(command)
}
def execute_real_tool(name, args):
if name == "weather":
city = args.get("city")
return get_weather_from_api(city)
return "Tool not implemented"
def _extract_generic_text(oci_message: dict) -> str:
content = oci_message.get("content")
if isinstance(content, list):
r = "".join([i.get("text", "") for i in content if isinstance(i, dict) and i.get("type") == "TEXT"])
# print("r", r)
return r
if isinstance(content, str):
# print("content", content)
return content
return str(content)
def agent_loop(body: dict, max_iterations=10000):
# Trabalhe sempre com OpenAI messages internamente,
# mas call_oci_chat converte pra GENERIC.
messages = []
messages.append({"role": "system", "content": SYSTEM_AGENT_PROMPT})
messages.extend(body.get("messages", []))
for _ in range(max_iterations):
response = call_oci_chat({**body, "messages": messages}, SYSTEM_AGENT_PROMPT)
oci_choice = response["choices"][0]
oci_message = oci_choice["message"]
text = _extract_generic_text(oci_message)
try:
agent_output = json.loads(text)
except:
# modelo não retornou JSON (quebrou regra)
return response
if agent_output.get("action") == "call_tool":
tool_name = agent_output.get("tool")
args = agent_output.get("arguments", {})
if tool_name not in TOOLS:
# devolve pro modelo como erro
messages.append({"role": "assistant", "content": text})
messages.append({"role": "user", "content": json.dumps({
"tool_error": f"Tool '{tool_name}' not implemented"
})})
continue
tool_result = TOOLS[tool_name](**args)
# Mantém o histórico: (1) decisão do agente, (2) resultado do tool
messages.append({"role": "assistant", "content": text})
messages.append({"role": "user", "content": json.dumps({
"tool_result": {
"tool": tool_name,
"arguments": args,
"result": tool_result
}
}, ensure_ascii=False)})
continue
if agent_output.get("action") == "final_answer":
return response
return response
EXEC_RE = re.compile(r"\(exec\s+(.+?)\)\s*$", re.DOTALL)
DONE_RE = re.compile(r"\(done\s+(.+?)\)\s*$", re.MULTILINE)
def run_exec_loop(body: dict, max_steps: int = 10000) -> dict:
# Histórico OpenAI-style
messages = [{"role":"system"}]
messages.extend(body.get("messages", []))
last = None
last_executed_command = None
for _ in range(max_steps):
last = call_oci_chat({**body, "messages": messages}, RUNNER_PROMPT)
print('LLM Result', last)
msg = last["choices"][0]["message"]
text = _extract_generic_text(msg) or ""
m_done = DONE_RE.search(text)
print("DONE_RE", text)
print("m_done", m_done)
if m_done:
final_text = m_done.group(1).strip()
# devolve em formato OpenAI no fim
return {
**last,
"choices": [{
**last["choices"][0],
"message": {"role":"assistant","content": final_text},
"finishReason": "stop"
}]
}
m_exec = EXEC_RE.search(text)
if m_exec:
command = m_exec.group(1).strip()
if command == last_executed_command:
print("⚠️ DUPLICATE COMMAND BLOCKED:", command)
messages.append({"role":"assistant","content": text})
messages.append({"role":"user","content": (
"Command already executed. You must proceed or finish with (done ...)."
)})
continue
last_executed_command = command
result = execute_exec_command(command)
messages.append({"role":"assistant","content": text})
messages.append({"role":"user","content": f"Tool result:\n{result}"})
continue
# Se o modelo quebrou o protocolo:
messages.append({"role":"assistant","content": text})
messages.append({"role":"user","content": (
"Protocol error. You MUST reply ONLY with (exec <command>) or (done <final answer>)."
)})
continue
# estourou steps: devolve última resposta (melhor do que travar)
return last
def verify_task_completion(original_task: str, assistant_output: str) -> bool:
"""
Retorna True se tarefa estiver concluída.
Retorna False se ainda precisar continuar.
"""
verifier_prompt = [
{
"role": "system",
"content": (
"You are a strict task completion validator.\n"
"Answer ONLY with DONE or CONTINUE.\n"
"DONE = the task is fully completed.\n"
"CONTINUE = more steps are required.\n"
),
},
{
"role": "user",
"content": f"""
Original task:
{original_task}
Last assistant output:
{assistant_output}
Is the task fully completed?
"""
}
]
response = call_oci_chat({
"model": "openai-compatible/gpt-5",
"messages": verifier_prompt,
"temperature": 0
}, verifier_prompt[0]["content"])
text = _extract_generic_text(response["choices"][0]["message"]).strip().upper()
return text == "DONE"
# ============================================================
# ENTERPRISE TOOLS
# Set the OPENCLAW_TOOLS_ACTIVE = True to automatize OpenClaw execution Tools
# Set the OPENCLAW_TOOLS_ACTIVE = False and implement your own Tools
# ============================================================
def get_weather_from_api(city: str) -> str:
"""
Consulta clima atual usando Open-Meteo (100% free, sem API key)
"""
print("LOG: EXECUTE TOOL WEATHER")
try:
# 1⃣ Geocoding (cidade -> lat/lon)
geo_url = "https://geocoding-api.open-meteo.com/v1/search"
geo_params = {
"name": city,
"count": 1,
"language": "pt",
"format": "json"
}
geo_response = requests.get(geo_url, params=geo_params, timeout=10)
if geo_response.status_code != 200:
return f"Erro geocoding: {geo_response.text}"
geo_data = geo_response.json()
if "results" not in geo_data or len(geo_data["results"]) == 0:
return f"Cidade '{city}' não encontrada."
location = geo_data["results"][0]
latitude = location["latitude"]
longitude = location["longitude"]
resolved_name = location["name"]
country = location.get("country", "")
# 2⃣ Clima atual
weather_url = "https://api.open-meteo.com/v1/forecast"
weather_params = {
"latitude": latitude,
"longitude": longitude,
"current_weather": True,
"timezone": "auto"
}
weather_response = requests.get(weather_url, params=weather_params, timeout=10)
if weather_response.status_code != 200:
return f"Erro clima: {weather_response.text}"
weather_data = weather_response.json()
current = weather_data.get("current_weather")
if not current:
return "Dados de clima indisponíveis."
temperature = current["temperature"]
windspeed = current["windspeed"]
return (
f"Temperatura atual em {resolved_name}, {country}: {temperature}°C.\n"
f"Velocidade do vento: {windspeed} km/h."
)
except Exception as e:
return f"Weather tool error: {str(e)}"
# ============================================================
# STREAMING ADAPTER
# ============================================================
def stream_openai_format(chat_response: dict, model: str):
completion_id = f"chatcmpl-{uuid.uuid4().hex}"
created = int(time.time())
content = chat_response["choices"][0]["message"]["content"]
yield f"data: {json.dumps({
'id': completion_id,
'object': 'chat.completion.chunk',
'created': created,
'model': model,
'choices': [{
'index': 0,
'delta': {'role': 'assistant'},
'finish_reason': None
}]
})}\n\n"
for i in range(0, len(content), 60):
chunk = content[i:i+60]
yield f"data: {json.dumps({
'id': completion_id,
'object': 'chat.completion.chunk',
'created': created,
'model': model,
'choices': [{
'index': 0,
'delta': {'content': chunk},
'finish_reason': None
}]
})}\n\n"
yield "data: [DONE]\n\n"
# ============================================================
# ENDPOINTS
# ============================================================
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/v1/models")
def list_models():
return {
"object": "list",
"data": [
{"id": k, "object": "model", "owned_by": "oci"}
for k in MODEL_MAP.keys()
],
}
# ------------------------------------------------------------
# CHAT COMPLETIONS
# ------------------------------------------------------------
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()
# chat_response = call_oci_chat(body)
# chat_response = agent_loop(body)
if OPENCLAW_TOOLS_ACTIVE:
chat_response = run_exec_loop(body, max_steps=10000)
else:
# 🔥 Modo enterprise → seu agent_loop controla tools
chat_response = agent_loop(body)
# print("FINAL RESPONSE:", json.dumps(chat_response, indent=2))
oci_choice = chat_response["choices"][0]
oci_message = oci_choice["message"]
# 🔥 SE É TOOL CALL → RETORNA DIRETO
if oci_message.get("tool_calls"):
return chat_response
content_text = ""
content = oci_message.get("content")
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "TEXT":
content_text += item.get("text", "")
elif isinstance(content, str):
content_text = content
else:
content_text = str(content)
finish_reason = oci_choice.get("finishReason", "stop")
# 🔥 SE STREAMING
if body.get("stream"):
async def event_stream():
completion_id = f"chatcmpl-{uuid.uuid4().hex}"
created = int(time.time())
# role chunk
yield f"data: {json.dumps({
'id': completion_id,
'object': 'chat.completion.chunk',
'created': created,
'model': body['model'],
'choices': [{
'index': 0,
'delta': {'role': 'assistant'},
'finish_reason': None
}]
})}\n\n"
# content chunks
for i in range(0, len(content_text), 50):
chunk = content_text[i:i+50]
yield f"data: {json.dumps({
'id': completion_id,
'object': 'chat.completion.chunk',
'created': created,
'model': body['model'],
'choices': [{
'index': 0,
'delta': {'content': chunk},
'finish_reason': None
}]
})}\n\n"
# final chunk
yield f"data: {json.dumps({
'id': completion_id,
'object': 'chat.completion.chunk',
'created': created,
'model': body['model'],
'choices': [{
'index': 0,
'delta': {},
'finish_reason': finish_reason
}]
})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
# 🔥 SE NÃO FOR STREAM
return {
"id": f"chatcmpl-{uuid.uuid4().hex}",
"object": "chat.completion",
"created": int(time.time()),
"model": body["model"],
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": content_text
},
"finish_reason": finish_reason
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
# ------------------------------------------------------------
# RESPONSES (OpenAI 2024 format)
# ------------------------------------------------------------
@app.post("/v1/responses")
async def responses(request: Request):
body = await request.json()
# chat_response = call_oci_chat(body)
chat_response = agent_loop(body)
oci_choice = chat_response["choices"][0]
oci_message = oci_choice["message"]
content_text = ""
content = oci_message.get("content")
if isinstance(content, list):
for item in content:
if item.get("type") == "TEXT":
content_text += item.get("text", "")
elif isinstance(content, str):
content_text = content
return {
"id": f"resp_{uuid.uuid4().hex}",
"object": "response",
"created": int(time.time()),
"model": body.get("model"),
"output": [
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": content_text
}
]
}
],
"usage": {
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0
}
}
@app.middleware("http")
async def log_requests(request: Request, call_next):
# print("\n>>> ENDPOINT:", request.method, request.url.path)
body = await request.body()
try:
body_json = json.loads(body.decode())
# print(">>> BODY:", json.dumps(body_json, indent=2))
except:
print(">>> BODY RAW:", body.decode())
response = await call_next(request)
# print(">>> STATUS:", response.status_code)
return response

89
project/openclaw.json Normal file
View File

@@ -0,0 +1,89 @@
{
"meta": {
"lastTouchedVersion": "2026.2.1",
"lastTouchedAt": "2026-02-14T03:24:55.922Z"
},
"wizard": {
"lastRunAt": "2026-02-14T03:24:55.917Z",
"lastRunVersion": "2026.2.1",
"lastRunCommand": "onboard",
"lastRunMode": "local"
},
"models": {
"providers": {
"openai-compatible": {
"baseUrl": "http://127.0.0.1:8050/v1",
"apiKey": "sk-test",
"api": "openai-completions",
"models": [
{
"id": "gpt-5",
"name": "gpt-5" ,
"reasoning": false,
"input": ["text"],
"cost": { "input": 0, "output": 0, "cacheRead": 0, "cacheWrite": 0 },
"contextWindow": 200000,
"maxTokens": 8192
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openai-compatible/gpt-5"
},
"models": {
"openai-compatible/gpt-5": {}
},
"workspace": "/home/hoshikawa2/.openclaw/workspace",
"compaction": {
"mode": "safeguard"
},
"maxConcurrent": 4,
"subagents": {
"maxConcurrent": 8
}
}
},
"messages": {
"ackReactionScope": "group-mentions"
},
"commands": {
"native": "auto",
"nativeSkills": "auto"
},
"channels": {
"whatsapp": {
"dmPolicy": "allowlist",
"selfChatMode": true,
"allowFrom": [
"+5511999961711"
],
"groupPolicy": "allowlist",
"mediaMaxMb": 50,
"debounceMs": 0
}
},
"gateway": {
"port": 18789,
"mode": "local",
"bind": "loopback",
"auth": {
"mode": "token",
"token": "5459cc59afcb0a4de09e0ce23ef6409090059a7d35df1740"
},
"tailscale": {
"mode": "off",
"resetOnExit": false
}
},
"plugins": {
"entries": {
"whatsapp": {
"enabled": true
}
}
}
}

View File

@@ -0,0 +1,100 @@
Whenever the user requests PPTX generation with external material (link, file, or text):
----------------------------------------------
STEP 0 FIXED WORKING DIRECTORY (MANDATORY)
----------------------------------------------
All operations MUST occur inside:
$HOME/.openclaw/workspace/openclaw_folder
Execute:
cd $HOME/.openclaw/workspace/openclaw_folder
STEP 1 PREPARATION (MANDATORY)
The file generate_openclaw_ppt_template.py is located in $HOME/.openclaw/workspace/openclaw_folder
The file read_url is located in $HOME/.openclaw/workspace/openclaw_folder
The file read_file is located in $HOME/.openclaw/workspace/openclaw_folder
Required:
read_url for links
read_file for local files
🔄 GITHUB LINK HANDLING (REQUIRED)
If the link contains:
github.com/.../blob/...
Automatically convert to:
raw.githubusercontent.com/USER/REPO/BRANCH/PATH
BEFORE calling read_url.
Example:
Original:
https://github.com/user/repo/blob/main/app.py
Convert to:
https://raw.githubusercontent.com/user/repo/main/app.py
Then call:
read_url <raw_url>
If the returned content contains <html or <script>, extract only visible text, removing HTML tags.
* If the content cannot be read successfully → ABORT.
MANDATORY PIPELINE:
1) Save material to file:
(exec read_url <url> > $HOME/.openclaw/workspace/openclaw_folder/material_raw.txt)
2) Analyze material_raw.txt and generate content.json explicitly:
(exec cat > $HOME/.openclaw/workspace/openclaw_folder/content.json << 'EOF'
<valid JSON only>
EOF)
cover_title (string)
introduction, technologies, architecture, problems, demo, conclusion (objects)
- Each chapter object MUST have:
bullets: 36 bullets (short, objective)
keywords: 512 terms that appear literally in the material
evidence: 24 short excerpts (1025 words) taken from the material, without HTML
- It is FORBIDDEN to use generic bullets without keywords from the material.
- VALIDATION: if it is not possible to extract at least 20 unique keywords from the total material → ABORT.
3) Validate JSON:
(exec python -m json.tool $HOME/.openclaw/workspace/openclaw_folder/content.json)
Only after successful validation:
(exec export OCI_LINK_DEMO="<url>")
(exec python generate_openclaw_ppt_template.py)
STEP 4 MODIFICATION VALIDATION [STRICT VERSION]
Before running:
- Verify that each chapter contains at least 1 literal keyword from the material.
- Verify that at least 8 keywords appear in 4 or more slides.
- Verify that each chapter contains at least 1 piece of evidence.
If it fails → ABORT.
STEP 5 EXECUTION
Only now execute:
SET THE ENVIRONMENT VARIABLE WITH THE URL PASSED AS A BASIS FOR DOCUMENTATION: `export OCI_LINK_DEMO=<link passed as documentation>`
SET THE ENVIRONMENT VARIABLE WITH THE FILE NAME GENERATED WITH CONTENT READ FROM THE LINK: `export OCI_CONTENT_FILE=<NAME OF THE GENERATED FILE>`
`python $HOME/.openclaw/workspace/openclaw_folder/generate_openclaw_ppt_template.py`
STEP 6 UPLOAD
First, delete the file in object storage: `openclaw_oci_presentation.pptx`
And only then upload it to Object Storage: `oci os object put \
--bucket-name hoshikawa_template \
--file` $HOME/.openclaw/workspace/openclaw_folder/openclaw_oci_presentation.pptx \
--force
STEP 7 GENERATE PRE-AUTH LINK
oci os preauth-request create ...

View File

@@ -0,0 +1,84 @@
#!/usr/bin/env bash
set -euo pipefail
# Where to install scripts (default matches typical OpenClaw workspace folder)
TARGET_DIR="${OPENCLAW_WORKDIR:-$HOME/.openclaw/workspace/openclaw_folder}"
mkdir -p "$TARGET_DIR"
cat > "$TARGET_DIR/read_url" << 'EOF'
#!/usr/bin/env python3
import sys
import requests
from bs4 import BeautifulSoup
def normalize_github_blob(url: str) -> str:
# Convert github.com/.../blob/... to raw.githubusercontent.com/.../.../...
if "github.com/" in url and "/blob/" in url:
parts = url.split("github.com/", 1)[1].split("/blob/", 1)
repo = parts[0].strip("/")
rest = parts[1].lstrip("/")
return f"https://raw.githubusercontent.com/{repo}/{rest}"
return url
if len(sys.argv) < 2:
print("Usage: read_url <url>", file=sys.stderr)
sys.exit(1)
url = normalize_github_blob(sys.argv[1])
try:
r = requests.get(url, timeout=30)
r.raise_for_status()
content = r.text
# If HTML, extract visible text
if "<html" in content.lower() or "<body" in content.lower():
soup = BeautifulSoup(content, "html.parser")
content = soup.get_text("\n")
print(content)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
EOF
cat > "$TARGET_DIR/read_file" << 'EOF'
#!/usr/bin/env python3
import sys
from pathlib import Path
def read_pdf(path: Path) -> str:
try:
import fitz # PyMuPDF
except Exception:
raise RuntimeError("PyMuPDF (fitz) not installed. Install with: pip install pymupdf")
doc = fitz.open(str(path))
out = []
for i in range(doc.page_count):
out.append(doc.load_page(i).get_text("text"))
return "\n".join(out)
if len(sys.argv) < 2:
print("Usage: read_file <path>", file=sys.stderr)
sys.exit(1)
p = Path(sys.argv[1]).expanduser()
if not p.exists():
print(f"ERROR: file not found: {p}", file=sys.stderr)
sys.exit(1)
suffix = p.suffix.lower()
try:
if suffix == ".pdf":
print(read_pdf(p))
else:
print(p.read_text(encoding="utf-8", errors="replace"))
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
EOF
chmod +x "$TARGET_DIR/read_url" "$TARGET_DIR/read_file"
echo "✅ Installed: $TARGET_DIR/read_url and $TARGET_DIR/read_file"