import os os.environ['CUDA_VISIBLE_DEVICES'] = '0' import gradio as gr import cv2 import numpy as np from PIL import Image import logging import gc import torch from collections import defaultdict try: import spaces SPACES_AVAILABLE = True except ImportError: SPACES_AVAILABLE = False print("⚠️ spaces not available, GPU decorator disabled") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global instances - lazy loading detector = None translator = None medical_agent = None speech_processor = None # Session tracking: letters, words, question count, history, last detection time import time sessions = defaultdict(lambda: { 'letters': [], # Accumulated letters 'words': [], # Built words 'question_count': 0, 'history': [], 'last_letter': None, 'last_letter_time': 0, 'letter_stable_count': 0, 'waiting_for_answer': False, # Pause detection when LLM asks question 'current_question': '' # Store current LLM question }) def setup_environment(): """Setup environment for Hugging Face Spaces""" if torch.cuda.is_available(): device = 'cuda' logger.info("✅ GPU available - using CUDA") else: device = 'cpu' logger.info("⚠️ GPU not available - using CPU") return device def initialize_models(): """Initialize models with lazy loading""" global detector, translator, medical_agent, speech_processor logger.info("🔄 Initializing essential models...") try: # Load YOLO detector from utils.detector import ArabicSignDetector detector = ArabicSignDetector() logger.info("✅ YOLO Detector loaded") # Clear memory gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # Load lightweight models from utils.speech import SpeechProcessor speech_processor = SpeechProcessor() logger.info("✅ Speech Processor loaded") logger.info("🎉 Essential models loaded!") except Exception as e: logger.error(f"❌ Model loading failed: {e}") raise def get_translator(): """Lazy loader for translator""" global translator if translator is None: try: from utils.translator import MedicalTranslator translator = MedicalTranslator() logger.info("✅ Translator loaded") except Exception as e: logger.error(f"❌ Translator loading failed: {e}") class FallbackTranslator: def ar_to_en(self, text): return text def en_to_ar(self, text): return text translator = FallbackTranslator() return translator def get_medical_agent(): """Lazy loader for medical agent with HuatuoGPT""" global medical_agent if medical_agent is None: try: from utils.medical_agent import HuatuoMedicalAgent medical_agent = HuatuoMedicalAgent(max_questions=3, max_words_per_question=5) logger.info("✅ HuatuoGPT Medical Agent loaded") except Exception as e: logger.error(f"❌ HuatuoGPT failed, using lite: {e}") from utils.medical_agent_lite import LiteMedicalAgent medical_agent = LiteMedicalAgent() return medical_agent # GPU decorator - only apply if spaces is available def gpu_decorator(duration=30): if SPACES_AVAILABLE: return spaces.GPU(duration=duration) else: # Return identity decorator if spaces not available return lambda f: f @gpu_decorator(duration=30) def process_sign_language_stream(image, session_id="default"): """Process sign language with automatic streaming detection and word building""" try: if image is None: return "⏳ انتظار الكاميرا...", "", "", "" # Initialize session if needed if session_id not in sessions: sessions[session_id] = { 'letters': [], 'words': [], 'question_count': 0, 'history': [], 'last_letter': None, 'last_letter_time': 0, 'letter_stable_count': 0, 'waiting_for_answer': False, 'current_question': '' } session = sessions[session_id] # Normal detection mode - process image # Convert to numpy array if isinstance(image, Image.Image): image_np = np.array(image) else: image_np = image # Convert RGB to BGR for OpenCV if len(image_np.shape) == 3 and image_np.shape[2] == 3: image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) # Detect Arabic letters detection_result = detector.detect_letters(image_np) current_time = time.time() if not detection_result['success']: # No detection - keep showing current state, don't reset if session['letters']: current_word_ar = ''.join(session['letters']) translator_instance = get_translator() current_word_en = translator_instance.ar_to_en(current_word_ar) word_display = f"🔤 الكلمة: {current_word_ar}\n📊 الحروف: {' + '.join(session['letters'])}" translation_display = f"🌐 الترجمة: {current_word_en}" status_display = "⏸️ لا توجد إشارة - أظهر الحرف التالي أو اضغط 'إكمال الكلمة'" return status_display, word_display, translation_display, "💡 نصيحة: اضغط 'إكمال الكلمة' للحصول على رد طبي" return "🔍 جاهز للكشف - أظهر إشارة يد", "", "", "🎥 ابدأ بإظهار إشارات اليد" # Get detected letter (take first/highest confidence) detected_letters = detection_result['letters'] confidences = detection_result.get('confidences', []) if not detected_letters: return "🔍 جاهز للكشف...", "", "", "" current_letter = detected_letters[0] # Highest confidence letter current_confidence = confidences[0] if confidences else 0.0 # Letter stabilization: only add if held steady if current_letter == session['last_letter']: session['letter_stable_count'] += 1 else: session['last_letter'] = current_letter session['letter_stable_count'] = 1 session['last_letter_time'] = current_time # Add letter after it's been stable for 2 frames (~1 second at 0.5s stream) if session['letter_stable_count'] >= 2: # Check if it's not a duplicate of the last added letter if not session['letters'] or session['letters'][-1] != current_letter: session['letters'].append(current_letter) session['letter_stable_count'] = 0 # Reset counter logger.info(f"📝 Added letter: {current_letter}") # Build current word from accumulated letters current_word_ar = ''.join(session['letters']) # Translate word to English translator_instance = get_translator() current_word_en = translator_instance.ar_to_en(current_word_ar) if current_word_ar else "" # Update last letter time when actively detecting session['last_letter_time'] = current_time # Format detection display stability_bar = "🟢" * session['letter_stable_count'] + "⚪" * (2 - session['letter_stable_count']) detected_info_ar = f"🎯 الحرف الحالي: {current_letter} ({current_confidence:.0%})\n{stability_bar} ثبات: {session['letter_stable_count']}/2 (~1 ثانية)" # Format word display in Arabic word_display_ar = f"🔤 الكلمة: {current_word_ar if current_word_ar else '...'}" if session['letters']: word_display_ar += f"\n📊 الحروف: {' + '.join(session['letters'])}" # Format translation in Arabic translation_display_ar = f"🌐 الترجمة: {current_word_en}" if current_word_en else "⏳ أكمل الكلمة..." # If waiting for answer, show question + allow building answer word response_display_ar = "" if session['waiting_for_answer']: # Show question while building answer response_display_ar = session['current_question'] if len(session['letters']) >= 1: response_display_ar += f"\n\n✏️ إجابتك: {current_word_ar} ({len(session['letters'])} حروف)" else: response_display_ar += "\n\n👉 أظهر إجابتك بالإشارات" # Normal mode - show hint to press button when word is ready elif len(session['letters']) >= 3 and current_word_en: response_display_ar = f"✅ الكلمة جاهزة!\n💡 اضغط 'إرسال للذكاء الاصطناعي' للتحليل" elif session['letters']: response_display_ar = f"⏳ استمر في الإشارات... ({len(session['letters'])} حروف حتى الآن)" else: response_display_ar = "🎥 ابدأ بإظهار إشارات اليد" return detected_info_ar, word_display_ar, translation_display_ar, response_display_ar except Exception as e: logger.error(f"Error processing sign: {e}") import traceback traceback.print_exc() return f"❌ خطأ: {str(e)}", "", "", "الرجاء المحاولة مرة أخرى" def process_doctor_audio(audio, session_id="default"): """Process doctor's audio input""" try: if audio is None: return "❌ No audio provided", "" # Convert audio to text doctor_text = speech_processor.speech_to_text(audio) logger.info(f"🎤 Doctor said: {doctor_text}") # Get medical agent medical_agent_instance = get_medical_agent() patient_question = medical_agent_instance.process_doctor_input(doctor_text) # Translate to Arabic translator_instance = get_translator() arabic_question = translator_instance.en_to_ar(patient_question) return f"🎤 You said: {doctor_text}", f"❓ Question for patient: {arabic_question}" except Exception as e: logger.error(f"Error processing audio: {e}") return f"❌ Error: {str(e)}", "" def reset_session(session_id="default"): """Reset conversation session and clear accumulated letters/words""" if session_id in sessions: del sessions[session_id] return "🔄 تم إعادة تعيين الجلسة بنجاح!\n\n✅ Session reset - all letters and words cleared!" def complete_word(session_id="default"): """Send word to HuatuoGPT, pause detection, show LLM question""" if session_id not in sessions: return "⚠️ لا توجد جلسة - ابدأ بإظهار إشارات" session = sessions[session_id] # If already waiting for answer, this button saves answer and resumes detection if session['waiting_for_answer']: # Save answer word if there are letters if session['letters']: answer_word_ar = ''.join(session['letters']) session['words'].append(answer_word_ar) result_msg = f"✅ تم حفظ الإجابة: {answer_word_ar}" else: result_msg = "⚠️ لم يتم إدخال إجابة" # Clear and resume detection session['letters'] = [] session['last_letter'] = None session['letter_stable_count'] = 0 session['waiting_for_answer'] = False session['current_question'] = '' return f"{result_msg}\n\n🔄 الكشف مستأنف - جاهز للكلمة التالية!" # Check if we have letters to send if not session['letters']: return "⚠️ لا توجد أحرف - أظهر إشارات أولاً" # First time: Send word to LLM current_word_ar = ''.join(session['letters']) session['words'].append(current_word_ar) # Translate to English translator_instance = get_translator() current_word_en = translator_instance.ar_to_en(current_word_ar) logger.info(f"📤 Sending to HuatuoGPT: {current_word_ar} → {current_word_en}") # Get medical response from HuatuoGPT medical_agent_instance = get_medical_agent() agent_response = medical_agent_instance.process_input( current_word_en, session_id=session_id ) # Translate response to Arabic arabic_medical_response = translator_instance.en_to_ar(agent_response['response']) # Update session session['question_count'] = agent_response['question_count'] session['history'].append(f"المريض: {current_word_ar} ({current_word_en})") session['history'].append(f"الطبيب: {arabic_medical_response}") # Pause detection and store question session['waiting_for_answer'] = True session['current_question'] = f"👨‍⚕️ الطبيب ({agent_response['question_count']}/3):\n{arabic_medical_response}\n\n⏸️ الكشف متوقف - أظهر إجابتك ثم اضغط الزر مرة أخرى" # DON'T clear letters yet - keep them visible # Only clear last_letter tracking for new word detection session['last_letter'] = None session['letter_stable_count'] = 0 return f"✅ تم إرسال: {current_word_ar} → {current_word_en}\n\n🤖 HuatuoGPT يحلل...\n\n{session['current_question']}" def create_interface(): """Create Gradio interface""" with gr.Blocks(title="Arabic Sign Language Medical Interpreter") as app: gr.Markdown( """ # 🏥 Arabic Sign Language Medical Interpreter This system helps deaf patients communicate with doctors using Arabic sign language. ## 🎯 How to use: 1. **Patient**: Show Arabic sign language to the camera 2. **System**: Detects signs, translates, and provides medical questions 3. **Doctor**: Can also speak questions which will be converted for the patient """ ) session_id = gr.State(value="default_session") with gr.Tab("📹 Sign Language Detection"): gr.Markdown(""" ### 🎥 Real-Time Sign Detection / الكشف في الوقت الفعلي **🔄 Workflow / سير العمل:** 1. **YOLO detects** - أظهر إشارات لبناء كلمة (ثانية لكل حرف) 2. **Press button** - اضغط الزر لإرسال الكلمة 3. **YOLO pauses** - يتوقف الكشف تلقائياً 4. **HuatuoGPT analyzes** - الذكاء يحلل ويسأل 5. **Question shown** - السؤال يظهر (لا يختفي!) 6. **Show answer signs** - أظهر إجابتك 7. **Press button again** - للمتابعة 🚨 **Detection pauses during LLM questions - question stays visible!** """) with gr.Row(): with gr.Column(): image_input = gr.Image( sources=["webcam"], type="pil", label="📹 Live Camera Feed / كاميرا مباشرة", streaming=True ) with gr.Row(): complete_word_btn = gr.Button("🤖 إرسال للذكاء (Send to AI)", variant="primary", size="lg") clear_btn = gr.Button("🔄 مسح (Clear All)", variant="secondary") with gr.Column(): detected_output = gr.Textbox( label="✅ نتائج الكشف / Detection Results", lines=3, placeholder="ستظهر الحروف المكتشفة هنا / Detected letters will appear here..." ) arabic_output = gr.Textbox( label="🔤 الكلمة العربية / Arabic Word", lines=2, placeholder="الكلمة المتراكمة / Accumulated word..." ) english_output = gr.Textbox( label="🌐 الترجمة / Translation", lines=2, placeholder="الترجمة الإنجليزية / English translation..." ) response_output = gr.Textbox( label="👨‍⚕️ استجابة الطبيب / Medical AI Response", lines=5, placeholder="ستظهر الأسئلة الطبية هنا / Medical questions will appear here..." ) word_status = gr.Textbox( label="📊 حالة الكلمة / Word Status", lines=2, placeholder="Word completion status..." ) gr.Markdown(""" ### 💡 Tips for Better Detection / نصائح للكشف الأفضل: - **إضاءة جيدة / Good Lighting**: Ensure your hands are well-lit - **خلفية واضحة / Clear Background**: Use a plain background - **وضع اليد / Hand Position**: Keep hands centered in view - **وضوح الإشارة / Sign Clarity**: Make distinct, clear signs - **المسافة / Distance**: Comfortable distance from camera - **ثبات / Stability**: Hold each sign steady for ~1 second (wait for 🟢🟢) - **لا تعيد تعيين / Don't Reset**: Move hand away between letters - word stays! """) # Auto-streaming detection - no manual button click needed image_input.stream( fn=process_sign_language_stream, inputs=[image_input, session_id], outputs=[detected_output, arabic_output, english_output, response_output], stream_every=0.5 # Process every 0.5 seconds ) complete_word_btn.click( fn=complete_word, inputs=[session_id], outputs=[word_status] ) def clear_all(session_id): if session_id in sessions: sessions[session_id]['letters'] = [] sessions[session_id]['last_letter'] = None sessions[session_id]['letter_stable_count'] = 0 return "", "", "", "", "" clear_btn.click( fn=clear_all, inputs=[session_id], outputs=[detected_output, arabic_output, english_output, response_output, word_status] ) with gr.Tab("🎤 Doctor's Voice Input"): with gr.Row(): with gr.Column(): audio_input = gr.Audio( sources=["microphone"], type="filepath", label="Doctor's Voice" ) audio_btn = gr.Button("🎤 Process Audio", variant="primary", size="lg") with gr.Column(): doctor_text_output = gr.Textbox(label="🎤 Transcribed Text", lines=3) question_output = gr.Textbox(label="❓ Question for Patient (Arabic)", lines=3) audio_btn.click( fn=process_doctor_audio, inputs=[audio_input, session_id], outputs=[doctor_text_output, question_output] ) with gr.Tab("ℹ️ System Info"): gr.Markdown( """ ## 📊 System Features: - **YOLO-based** Arabic sign language detection - **Real-time** translation (Arabic ↔ English) - **Medical AI** for intelligent questioning - **ZeroGPU** optimization for efficient processing ## 🔧 Technical Stack: - YOLOv8 for sign detection - Helsinki-NLP for translation - Whisper for speech recognition - gTTS for text-to-speech ## 💡 Tips: - Ensure good lighting for better detection - Make clear, distinct sign gestures - Speak clearly into the microphone """ ) reset_btn = gr.Button("🔄 Reset Session", variant="secondary") reset_output = gr.Textbox(label="Status", lines=1) reset_btn.click( fn=reset_session, inputs=[session_id], outputs=[reset_output] ) gr.Markdown( """ --- Built with ❤️ for accessible healthcare communication """ ) return app # Initialize and launch if __name__ == "__main__": logger.info("🚀 Starting Arabic Sign Language Medical Interpreter...") # Setup environment setup_environment() # Initialize models initialize_models() # Create and launch interface app = create_interface() app.queue() app.launch( server_name="0.0.0.0", server_port=7860, share=False )