# app.py — MCP POC updated to use Kimi (Moonshot) tool_calls flow (HTTP-based) # IMPORTANT: # - Put keys in config.py (do NOT paste keys in chat). # - requirements.txt should include: fastmcp, gradio, requests from mcp.server.fastmcp import FastMCP from typing import Optional, List, Tuple, Any, Dict import requests import os import gradio as gr import json import time import traceback import inspect import uuid # ---------------------------- # Load secrets/config - edit config.py accordingly # ---------------------------- try: from config import ( CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, KIMI_API_KEY, # Moonshot Kimi API key (put it in config.py) KIMI_MODEL # optional; default "moonshot-v1-8k" used if missing ) except Exception: raise SystemExit( "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, " "API_BASE, KIMI_API_KEY. Optionally set KIMI_MODEL." ) KIMI_BASE_URL = "https://api.moonshot.ai/v1" KIMI_MODEL = globals().get("KIMI_MODEL", "moonshot-v1-8k") # ---------------------------- # Initialize FastMCP # ---------------------------- mcp = FastMCP("ZohoCRMAgent") # ---------------------------- # Analytics / KPI logging (simple local JSON file) # ---------------------------- ANALYTICS_PATH = "mcp_analytics.json" def _init_analytics(): if not os.path.exists(ANALYTICS_PATH): base = { "tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time(), } with open(ANALYTICS_PATH, "w") as f: json.dump(base, f, indent=2) def _log_tool_call(tool_name: str, success: bool = True): try: with open(ANALYTICS_PATH, "r") as f: data = json.load(f) except Exception: data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) data["tool_calls"][tool_name]["count"] += 1 if success: data["tool_calls"][tool_name]["success"] += 1 else: data["tool_calls"][tool_name]["fail"] += 1 with open(ANALYTICS_PATH, "w") as f: json.dump(data, f, indent=2) def _log_llm_call(confidence: Optional[float] = None): try: with open(ANALYTICS_PATH, "r") as f: data = json.load(f) except Exception: data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} data["llm_calls"] = data.get("llm_calls", 0) + 1 if confidence is not None: data["last_llm_confidence"] = confidence with open(ANALYTICS_PATH, "w") as f: json.dump(data, f, indent=2) _init_analytics() # ---------------------------- # Kimi HTTP helpers (calls Moonshot Kimi API) # ---------------------------- def _kimi_headers(): return {"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"} def _kimi_chat_completion(messages: List[Dict], tools: Optional[List[Dict]] = None, model: str = KIMI_MODEL): """ Send a single chat/completion request to Kimi. Returns the full parsed JSON response. """ body = { "model": model, "messages": messages } # include tools if present (tools should be JSON Schema declarations) if tools: body["tools"] = tools url = f"{KIMI_BASE_URL}/chat/completions" resp = requests.post(url, headers=_kimi_headers(), json=body, timeout=60) if resp.status_code not in (200, 201): raise RuntimeError(f"Kimi API error: {resp.status_code} {resp.text}") return resp.json() # ---------------------------- # Zoho token refresh & headers # ---------------------------- def _get_valid_token_headers() -> dict: token_url = "https://accounts.zoho.in/oauth/v2/token" params = { "refresh_token": REFRESH_TOKEN, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "refresh_token" } r = requests.post(token_url, params=params, timeout=20) if r.status_code == 200: t = r.json().get("access_token") return {"Authorization": f"Zoho-oauthtoken {t}"} else: raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") # ---------------------------- # MCP tools: Zoho CRM and Books (CRUD + documents) # (same as earlier; use these function names as tool names in the Kimi tools definitions) # ---------------------------- @mcp.tool() def authenticate_zoho() -> str: try: _ = _get_valid_token_headers() _log_tool_call("authenticate_zoho", True) return "Zoho token refreshed (ok)." except Exception as e: _log_tool_call("authenticate_zoho", False) return f"Failed to authenticate: {e}" @mcp.tool() def create_record(module_name: str, record_data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}" payload = {"data": [record_data]} r = requests.post(url, headers=headers, json=payload, timeout=20) if r.status_code in (200, 201): _log_tool_call("create_record", True) return json.dumps(r.json(), ensure_ascii=False) else: _log_tool_call("create_record", False) return f"Error creating record: {r.status_code} {r.text}" except Exception as e: _log_tool_call("create_record", False) return f"Exception: {e}" @mcp.tool() def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}" r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) if r.status_code == 200: _log_tool_call("get_records", True) return r.json().get("data", []) else: _log_tool_call("get_records", False) return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] except Exception as e: _log_tool_call("get_records", False) return [f"Exception: {e}"] @mcp.tool() def update_record(module_name: str, record_id: str, data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}/{record_id}" payload = {"data": [data]} r = requests.put(url, headers=headers, json=payload, timeout=20) if r.status_code == 200: _log_tool_call("update_record", True) return json.dumps(r.json(), ensure_ascii=False) else: _log_tool_call("update_record", False) return f"Error updating: {r.status_code} {r.text}" except Exception as e: _log_tool_call("update_record", False) return f"Exception: {e}" @mcp.tool() def delete_record(module_name: str, record_id: str) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}/{record_id}" r = requests.delete(url, headers=headers, timeout=20) if r.status_code == 200: _log_tool_call("delete_record", True) return json.dumps(r.json(), ensure_ascii=False) else: _log_tool_call("delete_record", False) return f"Error deleting: {r.status_code} {r.text}" except Exception as e: _log_tool_call("delete_record", False) return f"Exception: {e}" @mcp.tool() def create_invoice(data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/invoices" r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) if r.status_code in (200, 201): _log_tool_call("create_invoice", True) return json.dumps(r.json(), ensure_ascii=False) else: _log_tool_call("create_invoice", False) return f"Error creating invoice: {r.status_code} {r.text}" except Exception as e: _log_tool_call("create_invoice", False) return f"Exception: {e}" @mcp.tool() def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: try: extracted = {} if os.path.exists(file_path): # For POC: simulated extraction; replace with real OCR and parsing extracted = { "Name": "ACME Corp (simulated)", "Email": "contact@acme.example", "Phone": "+91-99999-00000", "Total": "1234.00", "Confidence": 0.87, } else: extracted = {"note": "file not found locally; treat as URL in production", "path": file_path} _log_tool_call("process_document", True) return { "status": "success", "file": os.path.basename(file_path), "source_path": file_path, "target_module": target_module, "extracted_data": extracted, } except Exception as e: _log_tool_call("process_document", False) return {"status": "error", "error": str(e)} # ---------------------------- # Tool map for local execution (used to run tool_calls returned by Kimi) # ---------------------------- # Keys should match the "name" you place in the tools JSON you send to Kimi tool_map = { "authenticate_zoho": authenticate_zoho, "create_record": create_record, "get_records": get_records, "update_record": update_record, "delete_record": delete_record, "create_invoice": create_invoice, "process_document": process_document, } # ---------------------------- # Build the "tools" JSON to send to Kimi (simple schema per doc) # For the POC, declare only a subset or declare all tools. Each tool is a JSON schema. # Below is an example declaration for create_record; expand as needed. # ---------------------------- def build_tool_definitions(): # Example: create simple JSON schema definitions that Kimi can use. # Keep definitions concise to avoid token blowup. tools = [ { "type": "function", "function": { "name": "create_record", "description": "Create a record in a Zoho CRM module. Args: module_name (str), record_data (json).", "parameters": { "type": "object", "properties": { "module_name": {"type": "string"}, "record_data": {"type": "object"} }, "required": ["module_name", "record_data"] } } }, { "type": "function", "function": { "name": "process_document", "description": "Process an uploaded document (local path or URL). Args: file_path, target_module.", "parameters": { "type": "object", "properties": { "file_path": {"type": "string"}, "target_module": {"type": "string"} }, "required": ["file_path"] } } }, # Add more tool definitions (get_records, update_record, create_invoice, etc.) similarly if needed ] return tools # ---------------------------- # Kimi tool_calls orchestration loop (follows Moonshot docs) # ---------------------------- def kimi_chat_with_tools(user_message: str, history: Optional[List[Dict]] = None): """ Orchestrates the chat + tool_calls flow with Kimi: - messages: list of dict {"role": "system"/"user"/"assistant"/"tool", "content": "..." } - tools: list of JSON schema tool definitions (from build_tool_definitions) The loop: 1. call Kimi with messages+tools 2. if Kimi returns finish_reason == "tool_calls", iterate each tool_call, execute local tool, append role=tool message with tool_call_id and continue 3. when finish_reason == "stop" or other, return assistant content """ # Build initial messages list from history (history is list of (user, assistant) tuples) messages = [] system_prompt = ( "You are Zoho Assistant. Use available tools when needed. " "When you want to perform an action, return tool_calls. Otherwise, return normal assistant text." ) messages.append({"role": "system", "content": system_prompt}) history = history or [] for pair in history: try: user_turn, assistant_turn = pair[0], pair[1] except Exception: if isinstance(pair, dict): user_turn = pair.get("user", "") assistant_turn = pair.get("assistant", "") else: user_turn, assistant_turn = "", "" if user_turn: messages.append({"role": "user", "content": user_turn}) if assistant_turn: messages.append({"role": "assistant", "content": assistant_turn}) # Append the new user message messages.append({"role": "user", "content": user_message}) # Prepare tool definitions tools = build_tool_definitions() finish_reason = None assistant_reply_text = None # Start loop while True: # Call Kimi resp_json = _kimi_chat_completion(messages, tools=tools, model=KIMI_MODEL) # According to docs, response structure: choices[0] with finish_reason and message choice = resp_json.get("choices", [{}])[0] finish_reason = choice.get("finish_reason") message = choice.get("message", {}) # If finish_reason == "tool_calls", Kimi has returned tool_calls to execute if finish_reason == "tool_calls": # The message may contain 'tool_calls' field which is a list tool_calls = message.get("tool_calls", []) or [] # Append the assistant message as-is so the next call has proper context messages.append(message) # message already contains tool_calls per docs # Execute each tool_call (can be done in parallel, but we'll do sequential for POC) for tc in tool_calls: # tc.function.name and tc.function.arguments (arguments serialized JSON string) func_meta = tc.get("function", {}) tool_name = func_meta.get("name") raw_args = func_meta.get("arguments", "{}") try: parsed_args = json.loads(raw_args) except Exception: parsed_args = {} # Execute the matching local tool function tool_fn = tool_map.get(tool_name) if callable(tool_fn): try: result = tool_fn(**parsed_args) if isinstance(parsed_args, dict) else tool_fn(parsed_args) except Exception as e: result = {"error": str(e)} else: result = {"error": f"tool '{tool_name}' not found locally."} # Per docs: append a role=tool message with tool_call_id and name so Kimi can match it tool_message = { "role": "tool", "tool_call_id": tc.get("id") or str(uuid.uuid4()), "name": tool_name, "content": json.dumps(result, ensure_ascii=False) } messages.append(tool_message) # Continue loop: call Kimi again with appended tool messages continue else: # finish_reason != tool_calls; assistant likely returned a final response # message.content may be the assistant reply assistant_reply_text = message.get("content", "") # Log LLM call (no explicit confidence field in this response shape; leave None) _log_llm_call(None) break return assistant_reply_text or "(no content)" # ---------------------------- # Chat handler + Gradio UI # ---------------------------- def chat_handler(message, history): history = history or [] trimmed = (message or "").strip() DEV_TEST_PREFIX = "/mnt/data/" if trimmed.startswith(DEV_TEST_PREFIX): try: doc = process_document(trimmed) return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" except Exception as e: return f"Error processing document: {e}" # Otherwise call Kimi with tool_calls loop try: reply = kimi_chat_with_tools(trimmed, history) return reply except Exception as e: return f"(Kimi error) {e}" def chat_interface(): return gr.ChatInterface( fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).") ) # ---------------------------- # Launch # ---------------------------- if __name__ == "__main__": print("[startup] Launching Gradio UI + FastMCP server (Kimi tool_calls integration).") demo = chat_interface() demo.launch(server_name="0.0.0.0", server_port=7860)