import gradio as gr import anthropic import json import os from typing import Dict, List, Any from mcp.server import Server from mcp.types import Tool, TextContent import asyncio # LlamaIndex imports for RAG from llama_index.core import VectorStoreIndex, Document, Settings from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.vector_stores.chroma import ChromaVectorStore import chromadb # Initialize Anthropic client client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) # ============== VECTOR DATABASE SETUP ============== # Initialize embedding model (using HuggingFace for sponsor recognition!) print("🔄 Loading embedding model...") embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2") Settings.embed_model = embed_model Settings.llm = None # Disable LLM for LlamaIndex (we use Claude directly via MCP) Settings.chunk_size = 512 # Initialize ChromaDB chroma_client = chromadb.Client() # Create collections for workers and gigs workers_collection = chroma_client.get_or_create_collection("gig_workers") gigs_collection = chroma_client.get_or_create_collection("gig_posts") print("✅ Vector database ready!") # ============== LOAD AND INDEX DATA ============== def load_and_index_data(): """Load JSON data and create vector indices""" # Load workers try: with open("workers_data.json", "r") as f: workers_data = json.load(f) except FileNotFoundError: workers_data = [] print("⚠️ workers_data.json not found, using empty list") # Load gigs try: with open("gigs_data.json", "r") as f: gigs_data = json.load(f) except FileNotFoundError: gigs_data = [] print("⚠️ gigs_data.json not found, using empty list") # Create documents for workers worker_documents = [] for worker in workers_data: # Create rich text representation for better semantic search text = f""" Name: {worker['name']} Title: {worker['title']} Skills: {', '.join(worker['skills'])} Experience: {worker['experience']} Location: {worker['location']} Rate: {worker['hourly_rate']} Availability: {worker['availability']} Bio: {worker['bio']} """ doc = Document( text=text, metadata=worker ) worker_documents.append(doc) # Create documents for gigs gig_documents = [] for gig in gigs_data: text = f""" Title: {gig['title']} Company: {gig['company']} Required Skills: {', '.join(gig['required_skills'])} Experience Level: {gig['experience_level']} Location: {gig['location']} Budget: {gig['budget']} Duration: {gig['duration']} Description: {gig['description']} """ doc = Document( text=text, metadata=gig ) gig_documents.append(doc) # Create vector store and index for workers workers_vector_store = ChromaVectorStore(chroma_collection=workers_collection) workers_index = VectorStoreIndex.from_documents( worker_documents, vector_store=workers_vector_store ) # Create vector store and index for gigs gigs_vector_store = ChromaVectorStore(chroma_collection=gigs_collection) gigs_index = VectorStoreIndex.from_documents( gig_documents, vector_store=gigs_vector_store ) print(f"✅ Indexed {len(worker_documents)} workers and {len(gig_documents)} gigs") return workers_index, gigs_index, workers_data, gigs_data # Load and index data at startup print("🔄 Loading and indexing data...") workers_index, gigs_index, workers_db, gigs_db = load_and_index_data() print("✅ Data loaded and indexed!") # ============== MCP SERVER IMPLEMENTATION ============== mcp_server = Server("gig-market-mcp-rag") @mcp_server.list_tools() async def list_tools() -> List[Tool]: """List all available MCP tools with RAG capabilities""" return [ Tool( name="create_worker_profile", description="Transform user's unstructured text into a professional, structured gig worker profile using AI", inputSchema={ "type": "object", "properties": { "raw_text": { "type": "string", "description": "User's description of their skills, experience, and preferences" } }, "required": ["raw_text"] } ), Tool( name="create_gig_post", description="Transform user's unstructured text into a clear, structured gig job post using AI", inputSchema={ "type": "object", "properties": { "raw_text": { "type": "string", "description": "User's description of the job requirements and project details" } }, "required": ["raw_text"] } ), Tool( name="find_matching_gigs_rag", description="Find the best matching gig posts using SEMANTIC SEARCH with vector embeddings and RAG. Returns top matches based on skills, experience, and location similarity.", inputSchema={ "type": "object", "properties": { "worker_profile": { "type": "object", "description": "The structured worker profile to match" }, "top_n": { "type": "integer", "description": "Number of top matches to return", "default": 5 } }, "required": ["worker_profile"] } ), Tool( name="find_matching_workers_rag", description="Find the best matching workers using SEMANTIC SEARCH with vector embeddings and RAG. Returns top matches based on required skills, experience, and location similarity.", inputSchema={ "type": "object", "properties": { "gig_post": { "type": "object", "description": "The structured gig post to match" }, "top_n": { "type": "integer", "description": "Number of top matches to return", "default": 5 } }, "required": ["gig_post"] } ) ] @mcp_server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle MCP tool calls with RAG-enhanced matching""" if name == "create_worker_profile": raw_text = arguments["raw_text"] message = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1500, messages=[{ "role": "user", "content": f"""You are a professional career consultant. Transform this person's description into an attractive gig worker profile. USER INPUT: {raw_text} Create a professional profile with these fields. Return ONLY valid JSON (no markdown, no explanation): {{ "name": "full name", "title": "professional title/role", "skills": ["skill1", "skill2", "skill3", ...], "experience": "X years", "location": "city, country", "hourly_rate": "€X/hour or price range", "availability": "full-time/part-time/freelance/weekends/flexible", "bio": "compelling 1-2 sentence professional summary" }} Make it professional and appealing. If information is missing, infer reasonable values.""" }] ) response_text = message.content[0].text.strip() if response_text.startswith("```"): response_text = response_text.split("```")[1] if response_text.startswith("json"): response_text = response_text[4:] response_text = response_text.strip() profile_data = json.loads(response_text) return [TextContent(type="text", text=json.dumps(profile_data))] elif name == "create_gig_post": raw_text = arguments["raw_text"] message = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1500, messages=[{ "role": "user", "content": f"""You are a hiring manager. Transform this job description into a clear gig post. USER INPUT: {raw_text} Create a professional gig post with these fields. Return ONLY valid JSON (no markdown, no explanation): {{ "title": "clear job title", "company": "company name or 'Private Client'", "required_skills": ["skill1", "skill2", "skill3", ...], "experience_level": "Junior/Mid-level/Senior (X years) or X+ years", "location": "location or Remote", "budget": "€X-Y or budget range", "duration": "time period", "description": "clear 1-2 sentence project description" }} Make it clear and professional. If information is missing, insert Unknown.""" }] ) response_text = message.content[0].text.strip() if response_text.startswith("```"): response_text = response_text.split("```")[1] if response_text.startswith("json"): response_text = response_text[4:] response_text = response_text.strip() gig_data = json.loads(response_text) return [TextContent(type="text", text=json.dumps(gig_data))] elif name == "find_matching_gigs_rag": worker_profile = arguments["worker_profile"] top_n = arguments.get("top_n", 5) # Create semantic search query from worker profile query = f""" Looking for gig opportunities for: Skills: {', '.join(worker_profile.get('skills', []))} Experience: {worker_profile.get('experience', '')} Location: {worker_profile.get('location', '')} Availability: {worker_profile.get('availability', '')} """ # Perform semantic search using LlamaIndex query_engine = gigs_index.as_query_engine(similarity_top_k=top_n) response = query_engine.query(query) # Extract matches from response matches = [] for node in response.source_nodes: gig = node.metadata score = int(node.score * 100) # Convert to 0-100 scale # Calculate skill overlap worker_skills = set(s.lower() for s in worker_profile.get('skills', [])) gig_skills = set(s.lower() for s in gig.get('required_skills', [])) matched_skills = list(worker_skills.intersection(gig_skills)) matches.append({ "gig": gig, "score": score, "matched_skills": matched_skills, "semantic_similarity": node.score }) return [TextContent(type="text", text=json.dumps(matches))] elif name == "find_matching_workers_rag": gig_post = arguments["gig_post"] top_n = arguments.get("top_n", 5) # Create semantic search query from gig post query = f""" Looking for workers for this gig: Required Skills: {', '.join(gig_post.get('required_skills', []))} Experience Level: {gig_post.get('experience_level', '')} Location: {gig_post.get('location', '')} Project: {gig_post.get('description', '')} """ # Perform semantic search using LlamaIndex query_engine = workers_index.as_query_engine(similarity_top_k=top_n) response = query_engine.query(query) # Extract matches from response matches = [] for node in response.source_nodes: worker = node.metadata score = int(node.score * 100) # Convert to 0-100 scale # Calculate skill overlap worker_skills = set(s.lower() for s in worker.get('skills', [])) gig_skills = set(s.lower() for s in gig_post.get('required_skills', [])) matched_skills = list(gig_skills.intersection(worker_skills)) matches.append({ "worker": worker, "score": score, "matched_skills": matched_skills, "semantic_similarity": node.score }) return [TextContent(type="text", text=json.dumps(matches))] return [TextContent(type="text", text=json.dumps({"error": "Tool not found"}))] # ============== AGENTIC WORKFLOW ============== def format_tools_for_claude(tools: List[Tool]) -> List[Dict]: """Convert MCP tools to Anthropic API format""" return [ { "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema } for tool in tools ] async def worker_agent_workflow(user_description: str) -> tuple[str, str]: """Agent workflow: Create worker profile → Find matching gigs with RAG""" tools_list = await list_tools() tools_for_api = format_tools_for_claude(tools_list) conversation_history = [{ "role": "user", "content": f"""I need help with my gig worker profile and finding opportunities. Here's my background: {user_description} Please: 1. Create a professional profile for me 2. Find the top 5 matching gig opportunities using semantic search 3. Explain why each match is good, highlighting semantic similarity and matched skills Use the available tools to help me.""" }] system_prompt = """You are a career advisor with access to a RAG system. The find_matching_gigs_rag tool uses VECTOR EMBEDDINGS and SEMANTIC SEARCH to find the best matches. Explain that matches are found using advanced AI semantic matching, not just keyword matching. Be enthusiastic about the semantic similarity scores!""" profile_created = None for _ in range(5): response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4000, system=system_prompt, tools=tools_for_api, messages=conversation_history ) if response.stop_reason == "end_turn": final_text = "" for content in response.content: if content.type == "text": final_text += content.text return profile_created or "Profile created", final_text elif response.stop_reason == "tool_use": tool_results = [] for content in response.content: if content.type == "tool_use": result = await call_tool(content.name, content.input) result_text = result[0].text if content.name == "create_worker_profile": profile_created = result_text tool_results.append({ "type": "tool_result", "tool_use_id": content.id, "content": result_text }) conversation_history.append({"role": "assistant", "content": response.content}) conversation_history.append({"role": "user", "content": tool_results}) return profile_created or "{}", "Agent completed" async def employer_agent_workflow(job_description: str) -> tuple[str, str]: """Agent workflow: Create gig post → Find matching workers with RAG""" tools_list = await list_tools() tools_for_api = format_tools_for_claude(tools_list) conversation_history = [{ "role": "user", "content": f"""I need to create a gig post and find qualified workers. Here's what I'm looking for: {job_description} Please: 1. Create a clear gig post 2. Find the top 5 best matching workers using semantic search 3. Explain why each candidate is a good fit, highlighting semantic similarity Use the available tools to help me.""" }] system_prompt = """You are a hiring consultant with access to a RAG system. The find_matching_workers_rag tool uses VECTOR EMBEDDINGS and SEMANTIC SEARCH to find the best matches. Explain that matches are found using advanced AI semantic matching powered by HuggingFace embeddings. Be enthusiastic about the semantic similarity scores!""" gig_created = None for _ in range(5): response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4000, system=system_prompt, tools=tools_for_api, messages=conversation_history ) if response.stop_reason == "end_turn": final_text = "" for content in response.content: if content.type == "text": final_text += content.text return gig_created or "Gig post created", final_text elif response.stop_reason == "tool_use": tool_results = [] for content in response.content: if content.type == "tool_use": result = await call_tool(content.name, content.input) result_text = result[0].text if content.name == "create_gig_post": gig_created = result_text tool_results.append({ "type": "tool_result", "tool_use_id": content.id, "content": result_text }) conversation_history.append({"role": "assistant", "content": response.content}) conversation_history.append({"role": "user", "content": tool_results}) return gig_created or "{}", "Agent completed" # ============== GRADIO UI ============== def run_worker_flow(description: str) -> tuple[str, str]: """Worker flow with RAG""" try: profile_json, analysis = asyncio.run(worker_agent_workflow(description)) profile = json.loads(profile_json) profile_display = f"""## ✅ Your Professional Profile **{profile.get('name', 'N/A')}** *{profile.get('title', 'N/A')}* 📍 **Location:** {profile.get('location', 'N/A')} 💼 **Experience:** {profile.get('experience', 'N/A')} 💰 **Rate:** {profile.get('hourly_rate', 'N/A')} ⏰ **Availability:** {profile.get('availability', 'N/A')} **🎯 Skills:** {', '.join(profile.get('skills', []))} **📝 Bio:** {profile.get('bio', 'N/A')} """ return profile_display, analysis except Exception as e: return f"❌ Error: {str(e)}", "" def run_employer_flow(description: str) -> tuple[str, str]: """Employer flow with RAG""" try: gig_json, analysis = asyncio.run(employer_agent_workflow(description)) gig = json.loads(gig_json) gig_display = f"""## ✅ Your Gig Post **{gig.get('title', 'N/A')}** *{gig.get('company', 'N/A')}* 📍 **Location:** {gig.get('location', 'N/A')} 👔 **Experience Level:** {gig.get('experience_level', 'N/A')} 💰 **Budget:** {gig.get('budget', 'N/A')} ⏱️ **Duration:** {gig.get('duration', 'N/A')} **🎯 Required Skills:** {', '.join(gig.get('required_skills', []))} **📝 Description:** {gig.get('description', 'N/A')} """ return gig_display, analysis except Exception as e: return f"❌ Error: {str(e)}", "" # ============== GRADIO INTERFACE ============== with gr.Blocks(title="🤖 Jobly - Transforming Gig Market with AI") as app: # BANNER gr.HTML("""