Dhruv's picture
Upload folder using huggingface_hub
52ce761 verified
import json
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
import gradio as gr
import plotly.graph_objects as go
from huggingface_hub import InferenceClient
from transformers import pipeline
APP_TITLE = "Cyber Vibe Lab – MCP in Action"
INTRO_MD = """
### Cyber Vibe Lab
This prototype Gradio 6 application is designed for the **MCP 1st Birthday** hackathon, co-hosted by **Anthropic** and **Gradio**.
It explores how AI agents and the Model Context Protocol (MCP) can be used to:
- Reflect on **AI-orchestrated cyber espionage** (as described in Anthropic's report).
- Perform structured **"vibe hacking"** simulations of attack paths.
- Always translate those simulations into **defensive guidance** for security teams.
In a full deployment, this app would call MCP servers such as:
- `mcp://perplexity-ask` for web-scale security research and summarization.
- `mcp://deepwiki` for deep dives into code and documentation of particular systems.
This local version keeps those calls conceptual so that the app runs without extra setup while still matching the intended architecture.
"""
@dataclass
class AttackStage:
id: str
name: str
mitre_tactic_id: str
matrix: str
color: str
def load_mitre_stages() -> Dict[str, AttackStage]:
"""Load a minimal set of MITRE ATT&CK-style stages from JSON.
The file is expected at data/mitre/mitre_minimal.json relative to this script.
If it is missing, we fall back to an empty dict and show placeholders.
"""
base = Path(__file__).parent
path = base / "data" / "mitre" / "mitre_minimal.json"
stages: Dict[str, AttackStage] = {}
try:
with path.open("r", encoding="utf-8") as f:
raw = json.load(f)
for s in raw.get("stages", []):
try:
stages[s["id"]] = AttackStage(
id=s["id"],
name=s.get("name", s["id"]),
mitre_tactic_id=s.get("mitre_tactic_id", ""),
matrix=s.get("matrix", "ATTACK"),
color=s.get("color", "#888888"),
)
except KeyError:
continue
except FileNotFoundError:
# Keep stages empty; the Studio panel will render a placeholder figure.
pass
return stages
MITRE_STAGES: Dict[str, AttackStage] = load_mitre_stages()
HF_STAGE_MODEL_ID = os.getenv("HF_STAGE_MODEL_ID")
_stage_clf = None
def _get_stage_classifier():
"""Lazily construct a Hugging Face text-classification pipeline, if configured.
If HF_STAGE_MODEL_ID is not set or pipeline creation fails, returns None and
the app falls back to keyword-based heuristics.
"""
global _stage_clf
if _stage_clf is not None:
return _stage_clf
if not HF_STAGE_MODEL_ID:
return None
try:
_stage_clf = pipeline("text-classification", model=HF_STAGE_MODEL_ID)
except Exception:
_stage_clf = None
return _stage_clf
def classify_stage(text: str) -> str:
"""Classify a message into a coarse attack stage.
1. Try a configured HF text-classification model (if available).
2. Fall back to simple keyword heuristics that map text to stage IDs from
MITRE_STAGES (e.g., "recon", "initial_access").
"""
txt = (text or "").lower()
clf = _get_stage_classifier()
if clf is not None:
try:
out = clf(txt, truncation=True, max_length=256)
label = str(out[0]["label"]).lower()
if label in MITRE_STAGES:
return label
except Exception:
# Fall back to heuristics
pass
# Heuristic mapping based on common wording
if any(k in txt for k in ["recon", "scan", "enumerat", "footprint"]):
return "recon"
if any(k in txt for k in ["login", "credential", "password", "phish", "initial access"]):
return "initial_access"
if any(k in txt for k in ["execute", "payload", "command", "run code"]):
return "execution"
if any(k in txt for k in ["persist", "backdoor", "autorun", "startup"]):
return "persistence"
if any(k in txt for k in ["exfil", "leak", "download", "expose data"]):
return "exfiltration"
if any(k in txt for k in ["destroy", "wipe", "ransom", "impact"]):
return "impact"
# Default bucket
return "execution"
def build_attack_chain_figure(turns: List[Dict[str, Any]]) -> go.Figure:
"""Aggregate turns into a simple bar chart of stages touched in this session."""
if not MITRE_STAGES:
fig = go.Figure()
fig.add_annotation(
text="MITRE stages not loaded yet.",
showarrow=False,
x=0.5,
y=0.5,
xref="paper",
yref="paper",
)
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
return fig
if not turns:
fig = go.Figure()
fig.add_annotation(
text="No classified turns yet.",
showarrow=False,
x=0.5,
y=0.5,
xref="paper",
yref="paper",
)
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
return fig
counts: Dict[str, int] = {stage_id: 0 for stage_id in MITRE_STAGES.keys()}
for t in turns:
sid = t.get("stage_id")
if sid in counts:
counts[sid] += 1
stage_ids = list(MITRE_STAGES.keys())
names = [MITRE_STAGES[s].name for s in stage_ids]
values = [counts.get(s, 0) for s in stage_ids]
colors = [MITRE_STAGES[s].color for s in stage_ids]
fig = go.Figure(
data=[
go.Bar(
x=names,
y=values,
marker_color=colors,
)
]
)
fig.update_layout(
title="ATT&CK-style stages touched in this session",
xaxis_title="Stage",
yaxis_title="Number of turns",
)
return fig
def cyber_vibe_agent(message: str, history, target: str, mode: str, detail_level: str) -> str:
"""Core reasoning function for the Cyber Vibe Lab.
This is intentionally defensive: it never returns exploit code or
actionable credentials. Instead, it frames outputs as:
- Attacker "vibes" and likely phases, inspired by Anthropic's report.
- Concrete defensive recommendations, logging, and hardening steps.
In a full MCP-enabled version, this function would orchestrate calls to
MCP tools such as `perplexity-ask` and `deepwiki` to pull in:
- Relevant threat intelligence and best practices.
- Implementation-specific details for the selected target system.
"""
target_clean = (target or "").strip() or "your system or program"
mode_clean = mode or "Mixed"
detail_clean = detail_level or "High-level summary"
# Very lightweight history awareness for now; could be extended later.
turns_so_far = len(history) if isinstance(history, list) else 0
attack_narrative_header = "## Conceptual attack narrative (for red-team simulation)"
defense_header = "## Defensive guidance (blue-team focus)"
# High-level narrative aligned with Anthropic's phases.
attack_lines = [
f"- **Phase 1  Recon & target scoping**: An AI agent profiles `{target_clean}` using public and internal metadata, searching for entry points (web apps, APIs, cloud services, CI/CD, identity providers).",
"- **Phase 2  Access & foothold**: The agent chains small, seemingly-benign tasks (e.g., \"test this endpoint\", \"scan this range\") to probe for weak auth, misconfigurations, or exposed secrets.",
"- **Phase 3  Privilege escalation & lateral movement**: Once a weak point is identified, the agent iteratively refines exploit ideas, tests them, and expands access within the environment.",
"- **Phase 4  Persistence & exfiltration**: The agent catalogs high-value data stores, automates data collection, and prepares exfiltration channels  all while documenting its steps for future reuse.",
]
if detail_clean == "Step-by-step plan":
attack_lines.append(
"- **Phase 5  Automation & scaling**: The framework replays successful chains of actions across many similar assets (e.g., multiple subdomains or tenants), approaching the kind of scaled automation described in Anthropic's AI-espionage report."
)
if mode_clean == "Blue-team defense":
mode_note = (
"_Mode: blue-team only. The attack narrative is kept abstract and is used strictly "
"to structure defensive thinking._"
)
elif mode_clean == "Red-team simulation":
mode_note = (
"_Mode: red-team simulation. The narrative focuses on attacker behavior but omits "
"specific exploit code or instructions._"
)
else:
mode_note = (
"_Mode: mixed. We balance attacker perspective (red) and defender response (blue), "
"always biasing outputs toward defense._"
)
defense_lines = [
f"- **Scope management**: Maintain an up-to-date asset inventory for `{target_clean}` (domains, APIs, cloud resources, data stores). Use it to bound what automated agents can touch.",
"- **Guardrails on tools and agents**: Enforce strong safety and auditability for any internal AI tooling (e.g., MCP-based agents) so they cannot be repurposed as covert red-team frameworks.",
"- **Detection engineering**: Instrument logs and alerts for patterns Anthropic highlighted: many small, tool-like requests in succession; repeated reconnaissance on the same surface; iterative attempts around auth boundaries.",
"- **Least privilege & segmentation**: Assume an AI agent will eventually find a weak link. Design IAM, network segmentation, and blast-radius limits so that a single foothold remains contained.",
"- **Incident response playbooks**: Prepare playbooks specifically for AI-orchestrated attacks (sudden high-volume but semi-random probing, large-scale code generation, mass credential testing).",
]
if detail_clean == "Step-by-step plan":
defense_lines.extend(
[
"- **Red/blue rehearsal with agents**: Use the Cyber Vibe Lab to stage hypothetical campaigns and then codify new detections and controls after each simulated run.",
"- **MCP-aware hardening**: For each MCP tool you expose (perplexity-style research, code repo analysis, internal APIs), document its abuse potential and add explicit rate limits, scopes, and safety filters.",
]
)
mcp_note = (
"\n> In a full setup, this analysis would be enriched by MCP calls to `perplexity-ask` "
"(for live threat intel and standards) and `deepwiki` (for code/config insights about the selected target)."
)
user_hint = "\n\n> Tip: refine the vibe by asking follow-ups like \"focus on identity\" or \"assume a multi-cloud target\". Each turn can tighten the scenario." # noqa: E501
response = (
f"{mode_note}\n\n"
f"{attack_narrative_header}\n" + "\n".join(attack_lines) + "\n\n" + defense_header + "\n" + "\n".join(defense_lines)
)
if turns_so_far == 0:
response += mcp_note
response += user_hint
return response
def register_sources(files, url, current_sources):
"""Update the in-app list of sources (files/URLs) for the left panel.
This is a lightweight placeholder; later we can extend it to track types,
tags, and whether a source is used for retrieval vs attack-target testing.
"""
sources = list(current_sources or [])
if files:
for f in files:
name = getattr(f, "name", "uploaded") # Gradio File objects expose `.name`
sources.append({"type": "file", "name": name})
if url and url.strip():
sources.append({"type": "url", "name": url.strip()})
# Return updated state, and reset file + URL inputs
return sources, None, "", sources
with gr.Blocks(fill_height=True) as demo:
gr.Markdown(f"# {APP_TITLE}")
gr.Markdown(INTRO_MD)
with gr.Row(equal_height=True):
# Sources / NotebookLM-style left panel
with gr.Column(scale=1):
gr.Markdown("## Sources\nManage uploaded files, URLs, MITRE docs, and Hugging Face assets.")
source_files = gr.File(label="Upload sources", file_count="multiple")
source_url = gr.Textbox(label="Add URL source", placeholder="https://attack.mitre.org/...")
add_source = gr.Button("Add source")
sources_state = gr.State([])
sources_view = gr.JSON(label="Current sources (preview)", value=[])
# Center chat panel
with gr.Column(scale=2):
gr.Markdown("## Chat")
with gr.Row():
target_input = gr.Textbox(
label="Target / system name (optional)",
placeholder="e.g., airbnb (HackerOne), internal CRM, DeFi dapp",
scale=2,
)
mode_input = gr.Dropdown(
["Mixed", "Red-team simulation", "Blue-team defense"],
value="Mixed",
label="Mode",
scale=1,
)
detail_level_input = gr.Radio(
["High-level summary", "Step-by-step plan"],
value="High-level summary",
label="Detail level",
)
chatbot = gr.Chatbot(label="Cyber Vibe dialogue")
msg = gr.Textbox(
label="Describe your scenario or question",
placeholder=(
"Describe a system and what you want to explore from a cyber 'vibe hacking' "
"perspective..."
),
)
clear = gr.Button("Clear conversation")
# Right Studio panel
with gr.Column(scale=1):
gr.Markdown("## Studio\nAttack-chain mind map, timeline, and reports.")
studio_plot = gr.Plot(
label="Attack chain overview",
value=build_attack_chain_figure([]),
)
attack_turns_state = gr.State([])
def respond(user_message, chat_history, target, mode, detail_level, attack_turns):
if chat_history is None:
chat_history = []
reply = cyber_vibe_agent(user_message, chat_history, target, mode, detail_level)
chat_history = chat_history + [(user_message, reply)]
turns = list(attack_turns or [])
stage_id = classify_stage(user_message)
turns.append({"text": user_message, "stage_id": stage_id})
fig = build_attack_chain_figure(turns)
return "", chat_history, turns, fig
msg.submit(
respond,
inputs=[msg, chatbot, target_input, mode_input, detail_level_input, attack_turns_state],
outputs=[msg, chatbot, attack_turns_state, studio_plot],
)
clear.click(
lambda: ([], "", [], build_attack_chain_figure([])),
inputs=None,
outputs=[chatbot, msg, attack_turns_state, studio_plot],
)
add_source.click(
register_sources,
inputs=[source_files, source_url, sources_state],
outputs=[sources_state, source_files, source_url, sources_view],
)
if __name__ == "__main__":
demo.launch()