Spaces:
Runtime error
Runtime error
| import subprocess | |
| import sys | |
| import os | |
| import gradio as gr | |
| from transformers import Qwen2VLForConditionalGeneration, AutoProcessor | |
| from qwen_vl_utils import process_vision_info | |
| import torch | |
| import pandas as pd | |
| import pytesseract | |
| import cv2 | |
| import pymssql | |
| # Hardcoded Hugging Face token and SQL server IP address | |
| SERVER_IP = "35.227.148.156" | |
| # Install dependencies in smaller chunks to avoid memory issues | |
| def install_dependencies(): | |
| dependency_groups = [ | |
| ["pip==23.3.1", "setuptools", "wheel"], | |
| ["pytesseract"], | |
| ["torch==2.1.0+cpu", "torchvision==0.16.0+cpu", "torchaudio==2.1.0+cpu"], | |
| ["transformers==4.38.2", "auto-gptq==0.7.1", "autoawq==0.2.8"], | |
| ["qwen_vl_utils==0.0.8", "gradio==4.27.0"], | |
| ["pyodbc", "sqlalchemy", "azure-storage-blob", "pymssql", "pandas", "opencv-python"] | |
| ] | |
| for group in dependency_groups: | |
| for package in group: | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", package], stdout=sys.stdout, stderr=sys.stderr) | |
| print(f"Installed {package}") | |
| install_dependencies() | |
| # Install system dependencies (executed separately to avoid timeout issues) | |
| def install_system_dependencies(): | |
| commands = [ | |
| "apt-get update", | |
| "apt-get install -y unixodbc-dev tesseract-ocr", | |
| "ACCEPT_EULA=Y apt-get install -y msodbcsql17" | |
| ] | |
| for command in commands: | |
| subprocess.run(command, shell=True, check=True) | |
| print(f"Executed: {command}") | |
| install_system_dependencies() | |
| # Initialize model and processor with CPU mode | |
| model = Qwen2VLForConditionalGeneration.from_pretrained( | |
| "Qwen/Qwen2-VL-2B-Instruct-AWQ", | |
| torch_dtype="auto", | |
| use_auth_token=HUGGINGFACE_API_KEY | |
| ) | |
| # Force model to use CPU to avoid memory issues on Hugging Face Spaces | |
| model.to("cpu") | |
| processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct-AWQ", use_auth_token=HUGGINGFACE_API_KEY) | |
| pytesseract.pytesseract_cmd = r'/usr/bin/tesseract' | |
| # Function to preprocess the image for OCR | |
| def preprocess_image(image_path): | |
| image = cv2.imread(image_path) | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| _, binary = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY) | |
| return binary | |
| # Function to extract text using OCR | |
| def ocr_extract_text(image_path): | |
| preprocessed_image = preprocess_image(image_path) | |
| return pytesseract.image_to_string(preprocessed_image) | |
| # Function to process image and extract details | |
| def process_image(image_path): | |
| try: | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image_path}, | |
| {"type": "text", "text": ( | |
| "Extract the following details from the invoice:\n" | |
| "- 'invoice_number'\n" | |
| "- 'date'\n" | |
| "- 'place'\n" | |
| "- 'amount' (monetary value in the relevant currency)\n" | |
| "- 'category' (based on the invoice type)" | |
| )} | |
| ] | |
| }] | |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = processor(text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt") | |
| inputs = inputs.to(model.device) | |
| generated_ids = model.generate(**inputs, max_new_tokens=128) | |
| output_text = processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
| return parse_details(output_text[0]) | |
| except Exception as e: | |
| print(f"Model failed, falling back to OCR: {e}") | |
| ocr_text = ocr_extract_text(image_path) | |
| return parse_details(ocr_text) | |
| # Function to parse details from extracted text | |
| def parse_details(details): | |
| parsed_data = { | |
| "Invoice Number": None, | |
| "Date": None, | |
| "Place": None, | |
| "Amount": None, | |
| "Category": None | |
| } | |
| lines = details.split("\n") | |
| for line in lines: | |
| lower_line = line.lower() | |
| if "invoice" in lower_line: | |
| parsed_data["Invoice Number"] = line.split(":")[-1].strip() | |
| elif "date" in lower_line: | |
| parsed_data["Date"] = line.split(":")[-1].strip() | |
| elif "place" in lower_line: | |
| parsed_data["Place"] = line.split(":")[-1].strip() | |
| elif any(keyword in lower_line for keyword in ["total", "amount", "cost"]): | |
| parsed_data["Amount"] = line.split(":")[-1].strip() | |
| else: | |
| parsed_data["Category"] = "General" | |
| return parsed_data | |
| # Store extracted data in Azure SQL Database | |
| def store_to_azure_sql(dataframe): | |
| conn_str = ( | |
| f"Driver={{ODBC Driver 17 for SQL Server}};" | |
| f"Server={SERVER_IP};" | |
| "Database=Invoices;" | |
| "UID=pio-admin;" | |
| "PWD=Poctest123#;" | |
| ) | |
| try: | |
| with pymssql.connect(SERVER_IP, "pio-admin", "Poctest123#", "Invoices") as conn: | |
| cursor = conn.cursor() | |
| create_table_query = """ | |
| IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Invoices' AND xtype='U') | |
| CREATE TABLE Invoices ( | |
| InvoiceNumber NVARCHAR(255), | |
| Date NVARCHAR(255), | |
| Place NVARCHAR(255), | |
| Amount NVARCHAR(255), | |
| Category NVARCHAR(255) | |
| ) | |
| """ | |
| cursor.execute(create_table_query) | |
| for _, row in dataframe.iterrows(): | |
| insert_query = """ | |
| INSERT INTO Invoices (InvoiceNumber, Date, Place, Amount, Category) | |
| VALUES (%s, %s, %s, %s, %s) | |
| """ | |
| cursor.execute(insert_query, (row['Invoice Number'], row['Date'], row['Place'], row['Amount'], row['Category'])) | |
| conn.commit() | |
| print("Data successfully stored in Azure SQL Database.") | |
| except Exception as e: | |
| print(f"Error storing data to database: {e}") | |
| # Gradio interface for invoice processing | |
| def gradio_interface(image_files): | |
| results = [] | |
| for image_file in image_files: | |
| details = process_image(image_file) | |
| results.append(details) | |
| df = pd.DataFrame(results) | |
| store_to_azure_sql(df) | |
| return df | |
| # Launch Gradio interface | |
| grpc_interface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.Files(label="Upload Invoice Images"), | |
| outputs=gr.Dataframe(interactive=True), | |
| title="Invoice Extraction System", | |
| ) | |
| if __name__ == "__main__": | |
| grpc_interface.launch(share=True) | |