import base64 import io import gradio as gr from fastmcp import Client from fastmcp.client import StreamableHttpTransport import asyncio import ast import json import os # ------------------------------- # MCP server info # ------------------------------- ROBOT_ID = "CV_MCP_Client" HF_TOKEN = os.environ.get("HF_TOKEN") if not HF_TOKEN: print("Warning: HF_TOKEN not found. API calls may fail.") HF_TOKEN = "missing_token_placeholder" MCP_SERVER_URL = "https://mcp-1st-birthday-cv-mcp-server.hf.space/gradio_api/mcp/" SERVER_NAME = "CV_MCP_Server" TOOL_NAME = "CV_MCP_Server_robot_watch" # ------------------------------- # Initialize MCP client globally # ------------------------------- HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) # ------------------------------- # Async function using user's HF token # ------------------------------- async def process_webcam_stream_async(image): if image is None: return "", "", "", "", "", "", "", "" if HF_TOKEN == "missing_token_placeholder": return "Error: HF_TOKEN not set locally.", "", "", "", "", "", "", "" # Convert image to Base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") payload = { "hf_token_input": HF_TOKEN, "robot_id_input": ROBOT_ID, "image_b64_input": b64_img } try: async with MCP_CLIENT: response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) if response.is_error: # Handle error content safely error_msg = "Unknown Error" if hasattr(response, 'content') and isinstance(response.content, list): error_msg = " ".join([getattr(item, 'text', '') for item in response.content]) raise Exception(f"MCP Tool Error: {error_msg}") # --------------------------------------------------------- # FIX: Handle List Content # The 'content' is a list of objects (e.g., TextContent). # We iterate through the list and join the text parts. # --------------------------------------------------------- raw_text = "" if hasattr(response, 'content') and isinstance(response.content, list): for item in response.content: # Check if the item has a 'text' attribute if hasattr(item, 'text'): raw_text += item.text else: # Fallback for unexpected structure raw_text = str(response) # 6. PARSE RESPONSE try: response_dict = json.loads(raw_text) except json.JSONDecodeError: try: response_dict = ast.literal_eval(raw_text) except Exception: # If parsing fails completely, return the raw text in description return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", "" vlm_result = response_dict.get("result", {}) # 7. EXTRACT DATA description_out = vlm_result.get("description", "") environment_out = vlm_result.get("environment", "") indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "") lighting_condition_out = vlm_result.get("lighting_condition", "") human_out = vlm_result.get("human", "") animals_out = vlm_result.get("animals", "") objects_list = vlm_result.get("objects", []) hazards_out = vlm_result.get("hazards", "") objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) return ( description_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_out, objects_str, hazards_out ) except Exception as e: print(f"Error calling MCP API: {e}") return f"Error: {e}", "", "", "", "", "", "", "" # ------------------------------- # Gradio UI # ------------------------------- with gr.Blocks() as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)") gr.Markdown( """ This interface captures a live webcam feed and sends each frame to the MCP Client for analysis. The system extracts detailed information from the scene — including descriptions, detected objects, humans, animals, environmental context, lighting conditions, and potential hazards. Use this dashboard to observe how the robot interprets the world in real time. """ ) with gr.Row(): webcam_input = gr.Image( label="Captured from Web-Cam", sources=["webcam"], type="pil" ) with gr.Column(): description_out = gr.Textbox(label="Description", lines=5) environment_out = gr.Textbox(label="Environment", lines=3) indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1) lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1) human_out = gr.Textbox(label="Human Detected", lines=3) animals_out = gr.Textbox(label="Animals Detected", lines=2) objects_out = gr.Textbox(label="Objects Detected", lines=2) hazards_out = gr.Textbox(label="Hazards Identified", lines=2) webcam_input.stream( process_webcam_stream_async, inputs=[webcam_input], outputs=[ description_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_out, objects_out, hazards_out ], stream_every=1.0 ) if __name__ == "__main__": demo.launch(ssr_mode=False)