import pandas as pd import numpy as np import streamlit as st from langchain_huggingface import HuggingFaceEmbeddings from langchain_chroma import Chroma from pathlib import Path from typing import Optional from utils.console_manager import console_manager from config import EMBEDDINGS_DIR, EMBEDDING_MODEL_NAME from data_pipeline.preprocess import PROCESSED_PARQUET from data_pipeline.preprocess import preprocess_and_save def initialize_embedding_model( model_name: str = EMBEDDING_MODEL_NAME, ) -> HuggingFaceEmbeddings: embeddings = HuggingFaceEmbeddings(model_name=model_name) console_manager.print_info(f"Initialized embeddings model: {model_name}") return embeddings def initialize_chroma( embedding_model: HuggingFaceEmbeddings, chroma_path: Path = EMBEDDINGS_DIR ) -> Chroma: if chroma_path.exists() and any(chroma_path.iterdir()): console_manager.print_info(f"Loading existing ChromaDB from {chroma_path}") else: console_manager.print_info(f"Creating new ChromaDB at at: {chroma_path}") vectordb = Chroma( persist_directory=str(chroma_path), embedding_function=embedding_model ) return vectordb def load_preprocessed_data() -> Optional[pd.DataFrame]: if not PROCESSED_PARQUET.exists(): console_manager.print_error(f"Processed file not found: {PROCESSED_PARQUET}") return None df = pd.read_parquet(PROCESSED_PARQUET) df["content"] = ( "Title: " + df["title"] + ". Abstract: " + df["abstract"] + ". Categories: " + df["categories"].apply( lambda x: ", ".join(x) if isinstance(x, list) else str(x) ) ) return df def prepare_documents(df: pd.DataFrame) -> list[dict]: docs = [ { "id": str(i), "content": row["content"], "metadata": { "id": str(i), "title": row["title"], "categories": row["categories"], "year": int(row["year"]), }, } for i, row in df.iterrows() ] return docs def add_embeddings_to_chroma(vectordb: Chroma, docs: list[dict]): vectordb.add_texts( texts=[d["content"] for d in docs], metadatas=[d["metadata"] for d in docs], ) console_manager.print_success("Embeddings generated and stored successfully!") def embed_and_store(): try: with console_manager.status("Generating embeddings...") as status: embedding_model = initialize_embedding_model() vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR) df = load_preprocessed_data() if df is None: return None docs = prepare_documents(df) add_embeddings_to_chroma(vectordb, docs) return vectordb except Exception as e: console_manager.print_error(f"Embedding generation failed: {e}") return None def extract_embeddings(_vectordb): collection = _vectordb._collection data = collection.get(include=["metadatas", "documents", "embeddings"]) embeddings = np.array(data["embeddings"]) metadata = data["metadatas"] return embeddings, metadata def run_pipeline(force_run: bool = False): st.header("Ingestion & Embedding") if st.button("Run Ingestion & Embeddings Pipeline"): with st.spinner("Running full pipeline..."): preprocess_and_save() # Step 1: Extraction and basic cleaning embedding_model = initialize_embedding_model() vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR) collection_data = vectordb._collection.get(include=["embeddings"]) embeddings = collection_data["embeddings"] if embeddings is not None: if isinstance(embeddings, np.ndarray): embeddings_exist = embeddings.size > 0 else: embeddings_exist = len(embeddings) > 0 else: embeddings_exist = False if embeddings_exist and not force_run: st.warning("Embeddings already exist. Skipping embedding generation.") return vectordb return embed_and_store() st.success("Pipeline finished successfully!")