|
|
import json |
|
|
import os |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
import gradio as gr |
|
|
import plotly.graph_objects as go |
|
|
from huggingface_hub import InferenceClient |
|
|
from transformers import pipeline |
|
|
|
|
|
APP_TITLE = "Cyber Vibe Lab – MCP in Action" |
|
|
|
|
|
INTRO_MD = """ |
|
|
### Cyber Vibe Lab |
|
|
|
|
|
This prototype Gradio 6 application is designed for the **MCP 1st Birthday** hackathon, co-hosted by **Anthropic** and **Gradio**. |
|
|
|
|
|
It explores how AI agents and the Model Context Protocol (MCP) can be used to: |
|
|
- Reflect on **AI-orchestrated cyber espionage** (as described in Anthropic's report). |
|
|
- Perform structured **"vibe hacking"** simulations of attack paths. |
|
|
- Always translate those simulations into **defensive guidance** for security teams. |
|
|
|
|
|
In a full deployment, this app would call MCP servers such as: |
|
|
- `mcp://perplexity-ask` for web-scale security research and summarization. |
|
|
- `mcp://deepwiki` for deep dives into code and documentation of particular systems. |
|
|
|
|
|
This local version keeps those calls conceptual so that the app runs without extra setup while still matching the intended architecture. |
|
|
""" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AttackStage: |
|
|
id: str |
|
|
name: str |
|
|
mitre_tactic_id: str |
|
|
matrix: str |
|
|
color: str |
|
|
|
|
|
|
|
|
def load_mitre_stages() -> Dict[str, AttackStage]: |
|
|
"""Load a minimal set of MITRE ATT&CK-style stages from JSON. |
|
|
|
|
|
The file is expected at data/mitre/mitre_minimal.json relative to this script. |
|
|
If it is missing, we fall back to an empty dict and show placeholders. |
|
|
""" |
|
|
|
|
|
base = Path(__file__).parent |
|
|
path = base / "data" / "mitre" / "mitre_minimal.json" |
|
|
stages: Dict[str, AttackStage] = {} |
|
|
|
|
|
try: |
|
|
with path.open("r", encoding="utf-8") as f: |
|
|
raw = json.load(f) |
|
|
for s in raw.get("stages", []): |
|
|
try: |
|
|
stages[s["id"]] = AttackStage( |
|
|
id=s["id"], |
|
|
name=s.get("name", s["id"]), |
|
|
mitre_tactic_id=s.get("mitre_tactic_id", ""), |
|
|
matrix=s.get("matrix", "ATTACK"), |
|
|
color=s.get("color", "#888888"), |
|
|
) |
|
|
except KeyError: |
|
|
continue |
|
|
except FileNotFoundError: |
|
|
|
|
|
pass |
|
|
|
|
|
return stages |
|
|
|
|
|
|
|
|
MITRE_STAGES: Dict[str, AttackStage] = load_mitre_stages() |
|
|
|
|
|
HF_STAGE_MODEL_ID = os.getenv("HF_STAGE_MODEL_ID") |
|
|
_stage_clf = None |
|
|
|
|
|
|
|
|
def _get_stage_classifier(): |
|
|
"""Lazily construct a Hugging Face text-classification pipeline, if configured. |
|
|
|
|
|
If HF_STAGE_MODEL_ID is not set or pipeline creation fails, returns None and |
|
|
the app falls back to keyword-based heuristics. |
|
|
""" |
|
|
|
|
|
global _stage_clf |
|
|
if _stage_clf is not None: |
|
|
return _stage_clf |
|
|
|
|
|
if not HF_STAGE_MODEL_ID: |
|
|
return None |
|
|
|
|
|
try: |
|
|
_stage_clf = pipeline("text-classification", model=HF_STAGE_MODEL_ID) |
|
|
except Exception: |
|
|
_stage_clf = None |
|
|
|
|
|
return _stage_clf |
|
|
|
|
|
|
|
|
def classify_stage(text: str) -> str: |
|
|
"""Classify a message into a coarse attack stage. |
|
|
|
|
|
1. Try a configured HF text-classification model (if available). |
|
|
2. Fall back to simple keyword heuristics that map text to stage IDs from |
|
|
MITRE_STAGES (e.g., "recon", "initial_access"). |
|
|
""" |
|
|
|
|
|
txt = (text or "").lower() |
|
|
|
|
|
clf = _get_stage_classifier() |
|
|
if clf is not None: |
|
|
try: |
|
|
out = clf(txt, truncation=True, max_length=256) |
|
|
label = str(out[0]["label"]).lower() |
|
|
if label in MITRE_STAGES: |
|
|
return label |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
if any(k in txt for k in ["recon", "scan", "enumerat", "footprint"]): |
|
|
return "recon" |
|
|
if any(k in txt for k in ["login", "credential", "password", "phish", "initial access"]): |
|
|
return "initial_access" |
|
|
if any(k in txt for k in ["execute", "payload", "command", "run code"]): |
|
|
return "execution" |
|
|
if any(k in txt for k in ["persist", "backdoor", "autorun", "startup"]): |
|
|
return "persistence" |
|
|
if any(k in txt for k in ["exfil", "leak", "download", "expose data"]): |
|
|
return "exfiltration" |
|
|
if any(k in txt for k in ["destroy", "wipe", "ransom", "impact"]): |
|
|
return "impact" |
|
|
|
|
|
|
|
|
return "execution" |
|
|
|
|
|
|
|
|
def build_attack_chain_figure(turns: List[Dict[str, Any]]) -> go.Figure: |
|
|
"""Aggregate turns into a simple bar chart of stages touched in this session.""" |
|
|
|
|
|
if not MITRE_STAGES: |
|
|
fig = go.Figure() |
|
|
fig.add_annotation( |
|
|
text="MITRE stages not loaded yet.", |
|
|
showarrow=False, |
|
|
x=0.5, |
|
|
y=0.5, |
|
|
xref="paper", |
|
|
yref="paper", |
|
|
) |
|
|
fig.update_xaxes(visible=False) |
|
|
fig.update_yaxes(visible=False) |
|
|
return fig |
|
|
|
|
|
if not turns: |
|
|
fig = go.Figure() |
|
|
fig.add_annotation( |
|
|
text="No classified turns yet.", |
|
|
showarrow=False, |
|
|
x=0.5, |
|
|
y=0.5, |
|
|
xref="paper", |
|
|
yref="paper", |
|
|
) |
|
|
fig.update_xaxes(visible=False) |
|
|
fig.update_yaxes(visible=False) |
|
|
return fig |
|
|
|
|
|
counts: Dict[str, int] = {stage_id: 0 for stage_id in MITRE_STAGES.keys()} |
|
|
for t in turns: |
|
|
sid = t.get("stage_id") |
|
|
if sid in counts: |
|
|
counts[sid] += 1 |
|
|
|
|
|
stage_ids = list(MITRE_STAGES.keys()) |
|
|
names = [MITRE_STAGES[s].name for s in stage_ids] |
|
|
values = [counts.get(s, 0) for s in stage_ids] |
|
|
colors = [MITRE_STAGES[s].color for s in stage_ids] |
|
|
|
|
|
fig = go.Figure( |
|
|
data=[ |
|
|
go.Bar( |
|
|
x=names, |
|
|
y=values, |
|
|
marker_color=colors, |
|
|
) |
|
|
] |
|
|
) |
|
|
fig.update_layout( |
|
|
title="ATT&CK-style stages touched in this session", |
|
|
xaxis_title="Stage", |
|
|
yaxis_title="Number of turns", |
|
|
) |
|
|
return fig |
|
|
|
|
|
|
|
|
def cyber_vibe_agent(message: str, history, target: str, mode: str, detail_level: str) -> str: |
|
|
"""Core reasoning function for the Cyber Vibe Lab. |
|
|
|
|
|
This is intentionally defensive: it never returns exploit code or |
|
|
actionable credentials. Instead, it frames outputs as: |
|
|
- Attacker "vibes" and likely phases, inspired by Anthropic's report. |
|
|
- Concrete defensive recommendations, logging, and hardening steps. |
|
|
|
|
|
In a full MCP-enabled version, this function would orchestrate calls to |
|
|
MCP tools such as `perplexity-ask` and `deepwiki` to pull in: |
|
|
- Relevant threat intelligence and best practices. |
|
|
- Implementation-specific details for the selected target system. |
|
|
""" |
|
|
|
|
|
target_clean = (target or "").strip() or "your system or program" |
|
|
mode_clean = mode or "Mixed" |
|
|
detail_clean = detail_level or "High-level summary" |
|
|
|
|
|
|
|
|
turns_so_far = len(history) if isinstance(history, list) else 0 |
|
|
|
|
|
attack_narrative_header = "## Conceptual attack narrative (for red-team simulation)" |
|
|
defense_header = "## Defensive guidance (blue-team focus)" |
|
|
|
|
|
|
|
|
attack_lines = [ |
|
|
f"- **Phase 1 Recon & target scoping**: An AI agent profiles `{target_clean}` using public and internal metadata, searching for entry points (web apps, APIs, cloud services, CI/CD, identity providers).", |
|
|
"- **Phase 2 Access & foothold**: The agent chains small, seemingly-benign tasks (e.g., \"test this endpoint\", \"scan this range\") to probe for weak auth, misconfigurations, or exposed secrets.", |
|
|
"- **Phase 3 Privilege escalation & lateral movement**: Once a weak point is identified, the agent iteratively refines exploit ideas, tests them, and expands access within the environment.", |
|
|
"- **Phase 4 Persistence & exfiltration**: The agent catalogs high-value data stores, automates data collection, and prepares exfiltration channels all while documenting its steps for future reuse.", |
|
|
] |
|
|
|
|
|
if detail_clean == "Step-by-step plan": |
|
|
attack_lines.append( |
|
|
"- **Phase 5 Automation & scaling**: The framework replays successful chains of actions across many similar assets (e.g., multiple subdomains or tenants), approaching the kind of scaled automation described in Anthropic's AI-espionage report." |
|
|
) |
|
|
|
|
|
if mode_clean == "Blue-team defense": |
|
|
mode_note = ( |
|
|
"_Mode: blue-team only. The attack narrative is kept abstract and is used strictly " |
|
|
"to structure defensive thinking._" |
|
|
) |
|
|
elif mode_clean == "Red-team simulation": |
|
|
mode_note = ( |
|
|
"_Mode: red-team simulation. The narrative focuses on attacker behavior but omits " |
|
|
"specific exploit code or instructions._" |
|
|
) |
|
|
else: |
|
|
mode_note = ( |
|
|
"_Mode: mixed. We balance attacker perspective (red) and defender response (blue), " |
|
|
"always biasing outputs toward defense._" |
|
|
) |
|
|
|
|
|
defense_lines = [ |
|
|
f"- **Scope management**: Maintain an up-to-date asset inventory for `{target_clean}` (domains, APIs, cloud resources, data stores). Use it to bound what automated agents can touch.", |
|
|
"- **Guardrails on tools and agents**: Enforce strong safety and auditability for any internal AI tooling (e.g., MCP-based agents) so they cannot be repurposed as covert red-team frameworks.", |
|
|
"- **Detection engineering**: Instrument logs and alerts for patterns Anthropic highlighted: many small, tool-like requests in succession; repeated reconnaissance on the same surface; iterative attempts around auth boundaries.", |
|
|
"- **Least privilege & segmentation**: Assume an AI agent will eventually find a weak link. Design IAM, network segmentation, and blast-radius limits so that a single foothold remains contained.", |
|
|
"- **Incident response playbooks**: Prepare playbooks specifically for AI-orchestrated attacks (sudden high-volume but semi-random probing, large-scale code generation, mass credential testing).", |
|
|
] |
|
|
|
|
|
if detail_clean == "Step-by-step plan": |
|
|
defense_lines.extend( |
|
|
[ |
|
|
"- **Red/blue rehearsal with agents**: Use the Cyber Vibe Lab to stage hypothetical campaigns and then codify new detections and controls after each simulated run.", |
|
|
"- **MCP-aware hardening**: For each MCP tool you expose (perplexity-style research, code repo analysis, internal APIs), document its abuse potential and add explicit rate limits, scopes, and safety filters.", |
|
|
] |
|
|
) |
|
|
|
|
|
mcp_note = ( |
|
|
"\n> In a full setup, this analysis would be enriched by MCP calls to `perplexity-ask` " |
|
|
"(for live threat intel and standards) and `deepwiki` (for code/config insights about the selected target)." |
|
|
) |
|
|
|
|
|
user_hint = "\n\n> Tip: refine the vibe by asking follow-ups like \"focus on identity\" or \"assume a multi-cloud target\". Each turn can tighten the scenario." |
|
|
|
|
|
response = ( |
|
|
f"{mode_note}\n\n" |
|
|
f"{attack_narrative_header}\n" + "\n".join(attack_lines) + "\n\n" + defense_header + "\n" + "\n".join(defense_lines) |
|
|
) |
|
|
|
|
|
if turns_so_far == 0: |
|
|
response += mcp_note |
|
|
|
|
|
response += user_hint |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
def register_sources(files, url, current_sources): |
|
|
"""Update the in-app list of sources (files/URLs) for the left panel. |
|
|
|
|
|
This is a lightweight placeholder; later we can extend it to track types, |
|
|
tags, and whether a source is used for retrieval vs attack-target testing. |
|
|
""" |
|
|
|
|
|
sources = list(current_sources or []) |
|
|
|
|
|
if files: |
|
|
for f in files: |
|
|
name = getattr(f, "name", "uploaded") |
|
|
sources.append({"type": "file", "name": name}) |
|
|
|
|
|
if url and url.strip(): |
|
|
sources.append({"type": "url", "name": url.strip()}) |
|
|
|
|
|
|
|
|
return sources, None, "", sources |
|
|
|
|
|
|
|
|
with gr.Blocks(fill_height=True) as demo: |
|
|
gr.Markdown(f"# {APP_TITLE}") |
|
|
gr.Markdown(INTRO_MD) |
|
|
|
|
|
with gr.Row(equal_height=True): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## Sources\nManage uploaded files, URLs, MITRE docs, and Hugging Face assets.") |
|
|
source_files = gr.File(label="Upload sources", file_count="multiple") |
|
|
source_url = gr.Textbox(label="Add URL source", placeholder="https://attack.mitre.org/...") |
|
|
add_source = gr.Button("Add source") |
|
|
sources_state = gr.State([]) |
|
|
sources_view = gr.JSON(label="Current sources (preview)", value=[]) |
|
|
|
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## Chat") |
|
|
with gr.Row(): |
|
|
target_input = gr.Textbox( |
|
|
label="Target / system name (optional)", |
|
|
placeholder="e.g., airbnb (HackerOne), internal CRM, DeFi dapp", |
|
|
scale=2, |
|
|
) |
|
|
mode_input = gr.Dropdown( |
|
|
["Mixed", "Red-team simulation", "Blue-team defense"], |
|
|
value="Mixed", |
|
|
label="Mode", |
|
|
scale=1, |
|
|
) |
|
|
|
|
|
detail_level_input = gr.Radio( |
|
|
["High-level summary", "Step-by-step plan"], |
|
|
value="High-level summary", |
|
|
label="Detail level", |
|
|
) |
|
|
|
|
|
chatbot = gr.Chatbot(label="Cyber Vibe dialogue") |
|
|
msg = gr.Textbox( |
|
|
label="Describe your scenario or question", |
|
|
placeholder=( |
|
|
"Describe a system and what you want to explore from a cyber 'vibe hacking' " |
|
|
"perspective..." |
|
|
), |
|
|
) |
|
|
clear = gr.Button("Clear conversation") |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## Studio\nAttack-chain mind map, timeline, and reports.") |
|
|
studio_plot = gr.Plot( |
|
|
label="Attack chain overview", |
|
|
value=build_attack_chain_figure([]), |
|
|
) |
|
|
|
|
|
attack_turns_state = gr.State([]) |
|
|
|
|
|
def respond(user_message, chat_history, target, mode, detail_level, attack_turns): |
|
|
if chat_history is None: |
|
|
chat_history = [] |
|
|
reply = cyber_vibe_agent(user_message, chat_history, target, mode, detail_level) |
|
|
chat_history = chat_history + [(user_message, reply)] |
|
|
|
|
|
turns = list(attack_turns or []) |
|
|
stage_id = classify_stage(user_message) |
|
|
turns.append({"text": user_message, "stage_id": stage_id}) |
|
|
fig = build_attack_chain_figure(turns) |
|
|
|
|
|
return "", chat_history, turns, fig |
|
|
|
|
|
msg.submit( |
|
|
respond, |
|
|
inputs=[msg, chatbot, target_input, mode_input, detail_level_input, attack_turns_state], |
|
|
outputs=[msg, chatbot, attack_turns_state, studio_plot], |
|
|
) |
|
|
|
|
|
clear.click( |
|
|
lambda: ([], "", [], build_attack_chain_figure([])), |
|
|
inputs=None, |
|
|
outputs=[chatbot, msg, attack_turns_state, studio_plot], |
|
|
) |
|
|
|
|
|
add_source.click( |
|
|
register_sources, |
|
|
inputs=[source_files, source_url, sources_state], |
|
|
outputs=[sources_state, source_files, source_url, sources_view], |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|