import gradio as gr from gradio.events import SelectData import pandas as pd import plotly.graph_objects as go import os from agenteval.leaderboard.view import LeaderboardViewer from huggingface_hub import HfApi from leaderboard_transformer import DataTransformer, transform_raw_dataframe, create_pretty_tag_map, INFORMAL_TO_FORMAL_NAME_MAP, _plot_scatter_plotly, format_cost_column, format_score_column from content import ( SCATTER_DISCLAIMER, format_error, format_log, format_warning, hf_uri_to_web_url, hyperlink, ) # --- Constants and Configuration --- LOCAL_DEBUG = not (os.environ.get("system") == "spaces") CONFIG_NAME = "1.0.0-dev1" # This corresponds to 'config' in LeaderboardViewer IS_INTERNAL = os.environ.get("IS_INTERNAL", "false").lower() == "true" OWNER = "allenai" PROJECT_NAME = "asta-bench" + ("-internal" if IS_INTERNAL else "") SUBMISSION_DATASET = f"{OWNER}/{PROJECT_NAME}-submissions" SUBMISSION_DATASET_PUBLIC = f"{OWNER}/{PROJECT_NAME}-submissions-public" CONTACT_DATASET = f"{OWNER}/{PROJECT_NAME}-contact-info" RESULTS_DATASET = f"{OWNER}/{PROJECT_NAME}-results" # This is the repo_id for LeaderboardViewer LEADERBOARD_PATH = f"{OWNER}/{PROJECT_NAME}-leaderboard" if LOCAL_DEBUG: DATA_DIR = os.path.join(os.path.dirname(__file__), "data", CONFIG_NAME) else: DATA_DIR = "/home/user/data/" + CONFIG_NAME EXTRACTED_DATA_DIR = os.path.join(DATA_DIR, "extracted") api = HfApi() MAX_UPLOAD_BYTES = 100 * 1024**2 AGENTEVAL_MANIFEST_NAME = "agenteval.json" os.makedirs(EXTRACTED_DATA_DIR, exist_ok=True) # --- Global State for Viewers (simple caching) --- CACHED_VIEWERS = {} CACHED_TAG_MAPS = {} # --- New Helper Class to Solve the Type Mismatch Bug --- class DummyViewer: """A mock viewer to be cached on error. It has a ._load() method to ensure it behaves like the real LeaderboardViewer.""" def __init__(self, error_df): self._error_df = error_df def _load(self): # The _load method returns the error DataFrame and an empty tag map return self._error_df, {} def get_leaderboard_viewer_instance(split: str): """ Fetches the LeaderboardViewer for a split, using a cache to avoid re-downloading data. On error, returns a stable DummyViewer object. """ global CACHED_VIEWERS, CACHED_TAG_MAPS if split in CACHED_VIEWERS: # Cache hit: return the cached viewer and tag map return CACHED_VIEWERS[split], CACHED_TAG_MAPS.get(split, {"Overall": []}) # --- Cache miss: try to load data from the source --- try: print(f"Using Hugging Face dataset for split '{split}': {RESULTS_DATASET}/{CONFIG_NAME}") viewer = LeaderboardViewer( repo_id=RESULTS_DATASET, config=CONFIG_NAME, split=split, is_internal=IS_INTERNAL ) # Simplify tag map creation pretty_tag_map = create_pretty_tag_map(viewer.tag_map, INFORMAL_TO_FORMAL_NAME_MAP) # Cache the results for next time CACHED_VIEWERS[split] = viewer CACHED_TAG_MAPS[split] = pretty_tag_map # Cache the pretty map directly return viewer, pretty_tag_map except Exception as e: # On ANY error, create a consistent error message and cache a DummyViewer error_message = f"Error loading data for split '{split}': {e}" print(format_error(error_message)) dummy_df = pd.DataFrame({"Message": [error_message]}) dummy_viewer = DummyViewer(dummy_df) dummy_tag_map = {"Overall": []} # Cache the dummy objects so we don't try to fetch again on this run CACHED_VIEWERS[split] = dummy_viewer CACHED_TAG_MAPS[split] = dummy_tag_map return dummy_viewer, dummy_tag_map def create_leaderboard_display( full_df: pd.DataFrame, tag_map: dict, category_name: str, split_name: str ): """ This UI factory takes pre-loaded data and renders the main DataFrame and Plot for a given category (e.g., "Overall" or "Literature Understanding"). """ # 1. Instantiate the transformer and get the specific view for this category. # The function no longer loads data itself; it filters the data it receives. transformer = DataTransformer(full_df, tag_map) df_view, plots_dict = transformer.view(tag=category_name, use_plotly=True) # format cost columns for col in df_view.columns: if "Cost" in col: df_view = format_cost_column(df_view, col) # 2. Fill NaN scores with 0 for col in df_view.columns: if "Score" in col: df_view = format_score_column(df_view, col) scatter_plot = plots_dict.get('scatter_plot', go.Figure()) # 2. Define the UI components with the filtered data. df_headers = df_view.columns.tolist() df_datatypes = ["markdown" if col == "Logs" or "Cost" in col or "Score" in col else "str" for col in df_headers] dataframe_component = gr.DataFrame( headers=df_headers, value=df_view, datatype=df_datatypes, interactive=False, wrap=True, column_widths=[100, 100, 70, 70, 70, 70, 70, 70, 70, 70, 70, 70, 70, 75, 75, 50, 50] ) plot_component = gr.Plot( value=scatter_plot, label=f"Score vs. Cost ({category_name})" ) gr.HTML(SCATTER_DISCLAIMER, elem_id="scatter-disclaimer") # Return the components so they can be referenced elsewhere. return dataframe_component, plot_component def get_full_leaderboard_data(split: str) -> tuple[pd.DataFrame, dict]: """ Loads and transforms the complete dataset for a given split. This function handles caching and returns the final "pretty" DataFrame and tag map. """ # This reuses your existing robust caching logic viewer_or_data, raw_tag_map = get_leaderboard_viewer_instance(split) if isinstance(viewer_or_data, (LeaderboardViewer, DummyViewer)): raw_df, _ = viewer_or_data._load() if raw_df.empty: return pd.DataFrame(), {} pretty_df = transform_raw_dataframe(raw_df) pretty_tag_map = create_pretty_tag_map(raw_tag_map, INFORMAL_TO_FORMAL_NAME_MAP) if "Logs" in pretty_df.columns: def format_log_entry_to_html(raw_uri): if pd.isna(raw_uri) or raw_uri == "": return "" web_url = hf_uri_to_web_url(str(raw_uri)) return hyperlink(web_url, "🔗") if web_url else "" # Apply the function to the "Logs" column pretty_df["Logs"] = pretty_df["Logs"].apply(format_log_entry_to_html) return pretty_df, pretty_tag_map # Fallback for unexpected types return pd.DataFrame(), {} # --- Detailed Benchmark Display --- def create_benchmark_details_display( full_df: pd.DataFrame, tag_map: dict, category_name: str ): """ Generates a detailed breakdown for each benchmark within a given category. For each benchmark, it creates a title, a filtered table, and a scatter plot. Args: full_df (pd.DataFrame): The complete, "pretty" dataframe for the entire split. tag_map (dict): The "pretty" tag map to find the list of benchmarks. category_name (str): The main category to display details for (e.g., "Literature Understanding"). """ # 1. Get the list of benchmarks for the selected category benchmark_names = tag_map.get(category_name, []) if not benchmark_names: gr.Markdown(f"No detailed benchmarks found for the category: {category_name}") return gr.Markdown("---") gr.Markdown("## Detailed Benchmark Results") # 2. Loop through each benchmark and create its UI components for benchmark_name in benchmark_names: with gr.Blocks(): gr.Markdown(f"### {benchmark_name}") # 3. Prepare the data for this specific benchmark's table and plot benchmark_score_col = f"{benchmark_name} Score" benchmark_cost_col = f"{benchmark_name} Cost" # Define the columns needed for the detailed table table_cols = ['Agent', 'Submitter', 'Date', benchmark_score_col, benchmark_cost_col,'Logs'] # Filter to only columns that actually exist in the full dataframe existing_table_cols = [col for col in table_cols if col in full_df.columns] if benchmark_score_col not in existing_table_cols: gr.Markdown(f"Score data for {benchmark_name} not available.") continue # Skip to the next benchmark if score is missing # Create a specific DataFrame for the table view benchmark_table_df = full_df[existing_table_cols].copy() # Calculated and add "Benchmark Attempted" column def check_benchmark_status(row): has_score = pd.notna(row.get(benchmark_score_col)) has_cost = pd.notna(row.get(benchmark_cost_col)) if has_score and has_cost: return "✅" if has_score or has_cost: return "⚠️" return "🚫 " # Apply the function to create the new column benchmark_table_df['Attempted Benchmark'] = benchmark_table_df.apply(check_benchmark_status, axis=1) # Sort the DataFrame if benchmark_score_col in benchmark_table_df.columns: benchmark_table_df = benchmark_table_df.sort_values( by=benchmark_score_col, ascending=False, na_position='last' ) # 1. Format the cost and score columns benchmark_table_df = format_cost_column(benchmark_table_df, benchmark_cost_col) benchmark_table_df = format_score_column(benchmark_table_df, benchmark_score_col) desired_cols_in_order = [ 'Agent', 'Submitter', 'Attempted Benchmark', benchmark_score_col, benchmark_cost_col, 'Openness', 'Degree of Control', 'Date', 'Logs' ] for col in desired_cols_in_order: if col not in benchmark_table_df.columns: benchmark_table_df[col] = pd.NA # Add as an empty column benchmark_table_df = benchmark_table_df[desired_cols_in_order] # Rename columns for a cleaner table display, as requested benchmark_table_df.rename(columns={ benchmark_score_col: 'Score', benchmark_cost_col: 'Cost' }, inplace=True) # Ensure the 'Logs' column is formatted correctly table_headers = benchmark_table_df.columns.tolist() # If the column is 'Logs', render as markdown; otherwise, as a string. df_datatypes = [ "markdown" if col in ["Logs", "Cost", "Score"] else "str" for col in table_headers ] # Create the Gradio component, now with the correct datatypes gr.DataFrame( value=benchmark_table_df, datatype=df_datatypes, interactive=False, wrap=True, ) # Create the scatter plot using the full data for context, but plotting benchmark metrics # This shows all agents on the same axis for better comparison. benchmark_plot = _plot_scatter_plotly( data=full_df, x=benchmark_cost_col, y=benchmark_score_col, agent_col="Agent" ) gr.Plot(value=benchmark_plot) gr.HTML(SCATTER_DISCLAIMER, elem_id="scatter-disclaimer")