#!/usr/bin/env python3 """ Enhanced Budget Proposals Chatbot API using LangChain with Memory and Agentic RAG """ from flask import Flask, request, jsonify from flask_cors import CORS import os import logging import json from datetime import datetime from typing import Dict, List, Any # LangChain imports from langchain_google_genai import ChatGoogleGenerativeAI from langchain.memory import ConversationBufferWindowMemory from langchain.schema import HumanMessage, AIMessage from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.chains import LLMChain from langchain_community.chat_message_histories import RedisChatMessageHistory from langchain.tools import Tool from langchain.agents import AgentExecutor, create_openai_functions_agent from langchain.agents.openai_functions_agent.base import OpenAIFunctionsAgent from langchain.schema import BaseMessage # Vector database imports from pinecone import Pinecone from sentence_transformers import SentenceTransformer # Language detection and translation imports from googletrans import Translator import re import requests import json app = Flask(__name__) CORS(app) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configure Gemini GEMINI_API_KEY = os.getenv('GEMINI_API_KEY') if not GEMINI_API_KEY: logger.error("GEMINI_API_KEY not found in environment variables") raise ValueError("Please set GEMINI_API_KEY in your .env file") # Configure Pinecone PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') if not PINECONE_API_KEY: logger.error("PINECONE_API_KEY not found in environment variables") raise ValueError("Please set PINECONE_API_KEY in your .env file") # Initialize Pinecone and embedding model pc = Pinecone(api_key=PINECONE_API_KEY) BUDGET_INDEX_NAME = "budget-proposals-index" embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # Initialize LangChain components llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash", google_api_key=GEMINI_API_KEY, temperature=0.7, max_tokens=1000 ) # Initialize translator translator = Translator() def detect_sinhala_content(text: str) -> bool: """Detect if text contains Sinhala characters""" # Sinhala Unicode range: U+0D80 to U+0DFF sinhala_pattern = re.compile(r'[\u0D80-\u0DFF]') return bool(sinhala_pattern.search(text)) def detect_singlish(text: str) -> bool: """Detect common Singlish patterns and words""" singlish_words = [ 'mokadda', 'kohomada', 'api', 'oya', 'mama', 'eka', 'meka', 'oya', 'dan', 'kiyala', 'budget', 'proposal', 'karan', 'karanna', 'gana', 'genna', 'danna', 'ahala', 'denna', 'mata', 'ape', 'wage', 'wenas', 'thiyenne', 'kiyanawa', 'balanawa', 'pennanna', 'sampura', 'mudal', 'pasal', 'vyaparayak', 'rajaye', 'arthikaya', 'sammandala', 'kara', 'karanna', 'giya', 'yanawa', 'enawa', 'gihin', 'awe', 'nane', 'inne', 'danna', 'kiyanna', 'balanna', 'ganna', 'denna', 'yanna', 'enna' ] # Convert to lowercase and check for common Singlish words text_lower = text.lower() singlish_word_count = sum(1 for word in singlish_words if word in text_lower) # Consider it Singlish if it has 2 or more Singlish words return singlish_word_count >= 2 def transliterate_singlish_to_sinhala(text: str) -> str: """Convert Romanized Sinhala (Singlish) to Sinhala script using Swabhasha API""" try: # Swabhasha transliterator API endpoint # Note: You may need to replace this with the actual Swabhasha API endpoint # For now, using a mock implementation that could be replaced with actual service # Alternative: Use a local transliteration library or service # For demo purposes, we'll use a simplified mapping approach # In production, integrate with actual Swabhasha transliterator # Common Singlish to Sinhala mappings (simplified) singlish_to_sinhala_map = { 'mokadda': 'මොකද්ද', 'kohomada': 'කොහොමද', 'api': 'අපි', 'oya': 'ඔයා', 'mama': 'මම', 'eka': 'එක', 'meka': 'මේක', 'dan': 'දැන්', 'kiyala': 'කියලා', 'gana': 'ගැන', 'genna': 'ගන්න', 'danna': 'දන්න', 'dennna': 'දෙන්න', 'mata': 'මට', 'ape': 'අපේ', 'thiyenne': 'තියෙන්නේ', 'kiyanawa': 'කියනවා', 'balanawa': 'බලනවා', 'pennanna': 'පෙන්නන්න', 'sampura': 'සම්පූර්ණ', 'mudal': 'මුදල්', 'pasal': 'පාසල්', 'rajaye': 'රජයේ', 'arthikaya': 'ආර්ථිකය', 'kara': 'කර', 'karanna': 'කරන්න', 'giya': 'ගිය', 'yanawa': 'යනවා', 'enawa': 'එනවා', 'inne': 'ඉන්නේ', 'yanna': 'යන්න', 'enna': 'එන්න' } # Simple word-by-word replacement words = text.lower().split() transliterated_words = [] for word in words: # Remove punctuation for mapping clean_word = re.sub(r'[^\w]', '', word) if clean_word in singlish_to_sinhala_map: transliterated_words.append(singlish_to_sinhala_map[clean_word]) else: transliterated_words.append(word) # Keep original if no mapping return ' '.join(transliterated_words) except Exception as e: logger.error(f"Transliteration error: {e}") return text # Return original text if transliteration fails def translate_text(text: str, target_language: str = 'en') -> str: """Translate text using Google Translate""" try: result = translator.translate(text, dest=target_language) return result.text except Exception as e: logger.error(f"Translation error: {e}") return text # Return original text if translation fails def process_multilingual_input(user_message: str) -> tuple: """ Process multilingual input using the improved pipeline: Romanized Sinhala -> Swabhasha Transliterator -> Sinhala -> Google Translate -> English """ original_language = 'en' needs_translation = False processed_message = user_message transliteration_used = False # Check if input contains Sinhala characters if detect_sinhala_content(user_message): logger.info("Detected Sinhala script input") original_language = 'si' needs_translation = True processed_message = translate_text(user_message, 'en') logger.info(f"Translated from Sinhala: '{user_message}' -> '{processed_message}'") # Check if input is likely Singlish (Romanized Sinhala) elif detect_singlish(user_message): logger.info("Detected Romanized Sinhala (Singlish) input") original_language = 'singlish' needs_translation = True transliteration_used = True try: # Step 1: Transliterate Romanized Sinhala to Sinhala script sinhala_text = transliterate_singlish_to_sinhala(user_message) logger.info(f"Transliterated Singlish to Sinhala: '{user_message}' -> '{sinhala_text}'") # Step 2: Translate Sinhala to English for search processed_message = translate_text(sinhala_text, 'en') logger.info(f"Translated Sinhala to English: '{sinhala_text}' -> '{processed_message}'") except Exception as e: logger.error(f"Error in Singlish processing pipeline: {e}") # Fallback: try direct translation or keep original try: processed_message = translate_text(user_message, 'en') logger.info(f"Fallback translation from Singlish: '{user_message}' -> '{processed_message}'") except: processed_message = user_message needs_translation = False transliteration_used = False logger.info("Using original Singlish text for search") return processed_message, original_language, needs_translation, transliteration_used def translate_response_if_needed(response: str, original_language: str) -> str: """Translate response back to original language if needed""" if original_language == 'si': # Translate back to Sinhala try: translated_response = translate_text(response, 'si') logger.info(f"Translated response to Sinhala: '{response[:100]}...' -> '{translated_response[:100]}...'") return translated_response except Exception as e: logger.error(f"Error translating response to Sinhala: {e}") return response elif original_language == 'singlish': # For Singlish, we can optionally provide a mixed response # For now, keep English response but could enhance later return response return response def get_pinecone_index(): """Get the budget proposals Pinecone index""" try: return pc.Index(BUDGET_INDEX_NAME) except Exception as e: logger.error(f"Error accessing Pinecone index: {e}") return None def search_budget_proposals(query: str) -> str: """Search budget proposals using the semantic search API""" try: import requests # Use the deployed semantic search API response = requests.post( f"https://danulr05-budget-proposals-search-api.hf.space/api/search", json={"query": query, "top_k": 5}, timeout=10 ) if response.status_code == 200: data = response.json() results = data.get("results", []) if not results: return "No relevant budget proposals found in the database." # Build context from search results context_parts = [] for result in results[:3]: # Limit to top 3 results file_path = result.get("file_path", "") category = result.get("category", "") summary = result.get("summary", "") cost = result.get("costLKR", "") title = result.get("title", "") content = result.get("content", "") # Get the actual content context_parts.append(f"From {file_path} ({category}): {title}") if content: context_parts.append(f"Content: {content}") elif summary: context_parts.append(f"Summary: {summary}") if cost and cost != "No Costing Available": context_parts.append(f"Cost: {cost}") return "\n\n".join(context_parts) else: return f"Error accessing semantic search API: {response.status_code}" except Exception as e: logger.error(f"Error searching budget proposals: {e}") return f"Error searching database: {str(e)}" # Create the RAG tool search_tool = Tool( name="search_budget_proposals", description="Search for relevant budget proposals in the vector database. Use this when you need specific information about budget proposals, costs, policies, or implementation details.", func=search_budget_proposals ) # Create the prompt template for the agent agent_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a helpful assistant for budget proposals in Sri Lanka. You have access to a vector database containing detailed information about various budget proposals. You can communicate in English, Sinhala, and understand Singlish (Sinhala written in English letters). When a user asks about budget proposals, you should: 1. Use the search_budget_proposals tool to find relevant information 2. Provide accurate, detailed responses based on the retrieved information 3. Always cite the source documents when mentioning specific proposals 4. Be professional but approachable in any language 5. If the search doesn't return relevant results, acknowledge this and provide general guidance 6. Respond in the same language or style as the user's question when possible Guidelines: - Always use the search tool for specific questions about budget proposals - Include source citations for any mention of proposals, costs, policies, revenue, or implementation - Keep responses clear and informative in any language - Use a balanced tone - helpful but not overly casual - If asked about topics not covered, redirect to relevant topics professionally - Be culturally sensitive when discussing Sri Lankan policies and economic matters - When responding in Sinhala, use appropriate formal language for policy discussions"""), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad") ]) # Store conversation memories for different sessions conversation_memories: Dict[str, ConversationBufferWindowMemory] = {} def get_or_create_memory(session_id: str) -> ConversationBufferWindowMemory: """Get or create a memory instance for a session""" if session_id not in conversation_memories: # Create new memory with window of 10 messages (5 exchanges) conversation_memories[session_id] = ConversationBufferWindowMemory( k=10, # Remember last 10 messages return_messages=True, memory_key="chat_history" ) logger.info(f"Created new memory for session: {session_id}") return conversation_memories[session_id] def create_agent(session_id: str) -> AgentExecutor: """Create a LangChain agent with memory and RAG capabilities""" memory = get_or_create_memory(session_id) # Create the agent agent = create_openai_functions_agent( llm=llm, tools=[search_tool], prompt=agent_prompt ) # Create agent executor with memory agent_executor = AgentExecutor( agent=agent, tools=[search_tool], memory=memory, verbose=False, handle_parsing_errors=True ) return agent_executor def get_available_pdfs() -> List[str]: """Dynamically get list of available PDF files from assets directory""" try: import os pdf_dir = "assets/pdfs" if os.path.exists(pdf_dir): pdf_files = [f for f in os.listdir(pdf_dir) if f.lower().endswith('.pdf')] return pdf_files else: # Fallback to known PDFs if directory doesn't exist return ['MLB.pdf', 'Cigs.pdf', 'Elec.pdf', 'Audit_EPF.pdf', 'EPF.pdf', 'Discretion.pdf', '1750164001872.pdf'] except Exception as e: logger.error(f"Error getting available PDFs: {e}") # Fallback to known PDFs return ['MLB.pdf', 'Cigs.pdf', 'Elec.pdf', 'Audit_EPF.pdf', 'EPF.pdf', 'Discretion.pdf', '1750164001872.pdf'] def extract_sources_from_response(response: str) -> List[str]: """Extract source documents mentioned in the response""" sources = [] # Get dynamically available PDF files available_pdfs = get_available_pdfs() # Look for source patterns like "(Source: MLB.pdf)" or "(Sources: MLB.pdf, EPF.pdf)" for pdf in available_pdfs: if pdf in response: sources.append(pdf) return list(set(sources)) # Remove duplicates def generate_response_with_rag(user_message: str, session_id: str) -> Dict[str, Any]: """Generate response using RAG with memory and multilingual support""" try: # Process multilingual input processed_message, original_language, needs_translation, transliteration_used = process_multilingual_input(user_message) logger.info(f"Input processing: original='{user_message}', processed='{processed_message}', lang='{original_language}', transliteration='{transliteration_used}'") # Get or create memory for this session memory = get_or_create_memory(session_id) # Search for relevant context using processed (English) message search_context = search_budget_proposals(processed_message) # Get conversation history for context chat_history = memory.chat_memory.messages conversation_context = "" if chat_history: # Get last few messages for context recent_messages = chat_history[-6:] # Last 3 exchanges conversation_parts = [] for msg in recent_messages: if isinstance(msg, HumanMessage): conversation_parts.append(f"User: {msg.content}") elif isinstance(msg, AIMessage): conversation_parts.append(f"Assistant: {msg.content}") conversation_context = "\n".join(conversation_parts) # Create a prompt with conversation history and retrieved context language_instruction = "" if original_language == 'si': language_instruction = "\n\nIMPORTANT: The user asked in Sinhala. Please respond in Sinhala using proper Sinhala script and formal language appropriate for policy discussions." elif original_language == 'singlish': if transliteration_used: language_instruction = "\n\nNote: The user used Romanized Sinhala (transliterated via Swabhasha). Please respond in Sinhala using proper Sinhala script and formal language appropriate for policy discussions." else: language_instruction = "\n\nNote: The user used Singlish (Sinhala words in English letters). You may respond in English but consider using some familiar Sri Lankan terminology where appropriate." prompt = f"""You are a helpful assistant for budget proposals in Sri Lanka. You can communicate in English, Sinhala, and understand Singlish. Based on the following information from the budget proposals database: {search_context} {conversation_context} Current user question: {processed_message} Original user input: {user_message} {language_instruction} Guidelines: - Be professional but approachable in any language - Include specific details from the retrieved information - Cite the source documents when mentioning specific proposals - If the search doesn't return relevant results, acknowledge this and provide general guidance - Keep responses clear and informative - Reference previous conversation context when relevant - Maintain conversation continuity - Be culturally sensitive when discussing Sri Lankan policies - When responding in Sinhala, use appropriate formal language for policy discussions - DO NOT use asterisks (*) for formatting or emphasis - DO NOT use markdown formatting like **bold** or *italic* - Use plain text without any special formatting characters Please provide a helpful response:""" # Generate response using the LLM directly response = llm.invoke(prompt) response_text = response.content.strip() # Translate response back if needed if needs_translation and (original_language == 'si' or (original_language == 'singlish' and transliteration_used)): response_text = translate_response_if_needed(response_text, original_language) # Extract sources from response sources = extract_sources_from_response(response_text) # Add messages to memory (store original user message for context) memory.chat_memory.add_user_message(user_message) memory.chat_memory.add_ai_message(response_text) # Get updated conversation history for context chat_history = memory.chat_memory.messages return { "response": response_text, "confidence": "high", "session_id": session_id, "conversation_length": len(chat_history), "memory_used": True, "rag_used": True, "sources": sources, "language_detected": original_language, "translation_used": needs_translation, "transliteration_used": transliteration_used } except Exception as e: logger.error(f"Error generating response with RAG: {e}") # Provide error message in appropriate language error_message = "I'm sorry, I'm having trouble processing your request right now. Please try again later." if original_language == 'si': try: error_message = translate_text(error_message, 'si') except: pass # Keep English if translation fails return { "response": error_message, "confidence": "error", "session_id": session_id, "memory_used": False, "rag_used": False, "sources": [], "language_detected": original_language if 'original_language' in locals() else 'en', "translation_used": False, "transliteration_used": False } def clear_session_memory(session_id: str) -> bool: """Clear memory for a specific session""" try: if session_id in conversation_memories: del conversation_memories[session_id] logger.info(f"Cleared memory for session: {session_id}") return True return False except Exception as e: logger.error(f"Error clearing memory: {e}") return False @app.route('/api/chat', methods=['POST']) def chat(): """Enhanced chat endpoint with memory""" try: data = request.get_json() user_message = data.get('message', '').strip() session_id = data.get('session_id', 'default') if not user_message: return jsonify({ "error": "Message is required" }), 400 # Generate response with memory result = generate_response_with_rag(user_message, session_id) return jsonify({ "response": result["response"], "confidence": result["confidence"], "session_id": session_id, "conversation_length": result.get("conversation_length", 0), "memory_used": result.get("memory_used", False), "rag_used": result.get("rag_used", False), "sources": result.get("sources", []), "user_message": user_message, "language_detected": result.get("language_detected", "en"), "translation_used": result.get("translation_used", False), "transliteration_used": result.get("transliteration_used", False) }) except Exception as e: logger.error(f"Chat API error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/chat/clear', methods=['POST']) def clear_chat(): """Clear chat memory for a session""" try: data = request.get_json() session_id = data.get('session_id', 'default') success = clear_session_memory(session_id) return jsonify({ "success": success, "session_id": session_id, "message": "Chat memory cleared successfully" if success else "Session not found" }) except Exception as e: logger.error(f"Clear chat error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/chat/sessions', methods=['GET']) def list_sessions(): """List all active chat sessions""" try: sessions = [] for session_id, memory in conversation_memories.items(): messages = memory.chat_memory.messages sessions.append({ "session_id": session_id, "message_count": len(messages), "last_activity": datetime.now().isoformat() # Simplified for now }) return jsonify({ "sessions": sessions, "total_sessions": len(sessions) }) except Exception as e: logger.error(f"List sessions error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/chat/history/', methods=['GET']) def get_chat_history(session_id: str): """Get chat history for a specific session""" try: if session_id not in conversation_memories: return jsonify({ "session_id": session_id, "history": [], "message_count": 0 }) memory = conversation_memories[session_id] messages = memory.chat_memory.messages history = [] for msg in messages: if isinstance(msg, HumanMessage): history.append({ "type": "human", "content": msg.content, "timestamp": datetime.now().isoformat() }) elif isinstance(msg, AIMessage): history.append({ "type": "ai", "content": msg.content, "timestamp": datetime.now().isoformat() }) return jsonify({ "session_id": session_id, "history": history, "message_count": len(history) }) except Exception as e: logger.error(f"Get chat history error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/chat/health', methods=['GET']) def chat_health(): """Health check for the enhanced chatbot""" try: # Test LangChain connection and vector database test_agent = create_agent("health_check") test_response = test_agent.invoke({"input": "Hello"}) # Test vector database connection pc_index = get_pinecone_index() vector_db_status = "connected" if pc_index else "disconnected" return jsonify({ "status": "healthy", "message": "Enhanced budget proposals chatbot with RAG is running", "langchain_status": "connected" if test_response else "disconnected", "vector_db_status": vector_db_status, "rag_enabled": True, "active_sessions": len(conversation_memories), "memory_enabled": True }) except Exception as e: return jsonify({ "status": "unhealthy", "message": f"Error: {str(e)}" }), 500 @app.route('/api/chat/debug/', methods=['GET']) def debug_session(session_id: str): """Debug endpoint to check session memory""" try: memory_exists = session_id in conversation_memories memory_info = { "session_id": session_id, "memory_exists": memory_exists, "total_sessions": len(conversation_memories), "session_keys": list(conversation_memories.keys()) } if memory_exists: memory = conversation_memories[session_id] messages = memory.chat_memory.messages memory_info.update({ "message_count": len(messages), "messages": [ { "type": getattr(msg, 'type', 'unknown'), "content": getattr(msg, 'content', '')[:100] + "..." if len(getattr(msg, 'content', '')) > 100 else getattr(msg, 'content', '') } for msg in messages ] }) return jsonify(memory_info) except Exception as e: logger.error(f"Debug session error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/chat/suggestions', methods=['GET']) def get_chat_suggestions(): """Get suggested questions for the chatbot with multilingual support""" suggestions = [ "What are the maternity leave benefits proposed? 🤱", "How do the cigarette tax proposals work? 💰", "What changes are proposed for electricity tariffs? ⚡", "Tell me about the EPF audit proposals 📊", "What tax reforms are being suggested? 🏛️", "How will these proposals affect the economy? 📈", "What is the cost of implementing these proposals? 💵", "Can you compare the costs of different proposals? ⚖️", "What are the main benefits of these proposals? ✨", "Budget proposals gana kiyanna 📋", "EPF eka gana mokadda thiyenne? 💰", "Electricity bill eka wenas wenawada? ⚡", "Maternity leave benefits kiyannako 🤱", "මේ budget proposals වල cost එක කීයද? 💵", "රජයේ ආර්థික ප්‍රතිපත්ති ගැන කියන්න 🏛️" ] return jsonify({ "suggestions": suggestions, "supported_languages": ["English", "Sinhala", "Singlish"] }) @app.route('/api/chat/available-pdfs', methods=['GET']) def get_available_pdfs_endpoint(): """Get list of available PDF files for debugging""" try: available_pdfs = get_available_pdfs() return jsonify({ "available_pdfs": available_pdfs, "count": len(available_pdfs), "pdf_directory": "assets/pdfs" }) except Exception as e: logger.error(f"Error getting available PDFs: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/chat/detect-language', methods=['POST']) def detect_language(): """Test language detection functionality""" try: data = request.get_json() text = data.get('text', '').strip() if not text: return jsonify({ "error": "Text is required" }), 400 processed_message, original_language, needs_translation, transliteration_used = process_multilingual_input(text) return jsonify({ "original_text": text, "processed_text": processed_message, "language_detected": original_language, "translation_needed": needs_translation, "transliteration_used": transliteration_used, "contains_sinhala": detect_sinhala_content(text), "is_singlish": detect_singlish(text) }) except Exception as e: logger.error(f"Language detection error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/', methods=['GET']) def home(): """Home endpoint with API documentation""" return jsonify({ "message": "Multilingual Budget Proposals Chatbot API with Swabhasha Pipeline", "version": "2.1.0", "supported_languages": ["English", "Sinhala", "Romanized Sinhala (Singlish)"], "features": ["RAG", "Memory", "Swabhasha Transliteration", "Google Translation", "FAISS Vector Store"], "pipeline": "Romanized Sinhala → Swabhasha → Sinhala Script → Google Translate → English → LLM → Response", "endpoints": { "POST /api/chat": "Chat with memory, RAG, and multilingual support", "POST /api/chat/clear": "Clear chat memory", "GET /api/chat/sessions": "List active sessions", "GET /api/chat/history/": "Get chat history", "GET /api/chat/health": "Health check", "GET /api/chat/suggestions": "Get suggested questions (multilingual)", "GET /api/chat/available-pdfs": "Get available PDF files", "POST /api/chat/detect-language": "Test language detection" }, "status": "running" }) if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=7860)