video_eval / app.py
Youngsun Lim
first
f543cdc
raw
history blame
38.9 kB
import os, io, csv, json, random
from datetime import datetime
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
import subprocess, pathlib, hashlib
import secrets, time, string
# ์‚ฌ์šฉ์ž๊ฐ€ '์„ ํƒ'์„ ์™„๋ฃŒํ•œ ๊ฐ’ (์ดˆ๊ธฐ์—” None)
selected_action = gr.State(None)
selected_phys = gr.State(None)
def _recompute_save(pid_text: str, sel_a, sel_p):
ok = bool(pid_text and pid_text.strip()) and (sel_a is not None) and (sel_p is not None)
return gr.update(interactive=ok, variant=("primary" if ok else "secondary"))
REWARD_FILE = "reward_codes.csv" # ๋ฆฌ์›Œ๋“œ ์ฝ”๋“œ ๊ธฐ๋ก์šฉ ํŒŒ์ผ (HF dataset ์•ˆ์— ์ €์žฅ)
def _read_codes_bytes():
try:
p = hf_hub_download(
repo_id=REPO_ID, filename=REWARD_FILE, repo_type="dataset",
token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False
)
return open(p, "rb").read()
except Exception:
return None
def _append_code(old_bytes, row):
s = io.StringIO()
w = csv.writer(s)
if not old_bytes:
# ์ƒˆ ํ—ค๋”
w.writerow(["ts_iso", "participant_id", "reward_code", "total_done"])
else:
s.write(old_bytes.decode("utf-8", errors="ignore"))
w.writerow(row)
return s.getvalue().encode("utf-8")
def _persist_reward_code(pid: str, code: str, total_done: int):
"""๋ฆฌ์›Œ๋“œ ์ฝ”๋“œ๋ฅผ HF์— reward_codes.csv๋กœ ๋ˆ„์  ์ €์žฅ(append)."""
old = _read_codes_bytes()
row = [datetime.utcnow().isoformat(), pid.strip(), code, int(total_done)]
newb = _append_code(old, row)
api.upload_file(
path_or_fileobj=io.BytesIO(newb),
path_in_repo=REWARD_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="append reward code"
)
def _gen_reward_code(pid: str, length: int = 10, forbid_ambiguous: bool=True) -> str:
"""
์ฐธ๊ฐ€์ž์—๊ฒŒ ๋ณด์—ฌ์ค„ ๋žœ๋ค ์ฝ”๋“œ. ์ถฉ๋Œ ์œ„ํ—˜ ๋งค์šฐ ๋‚ฎ์Œ.
- PID + ์‹œ๊ฐ + ๋ณด์•ˆ๋žœ๋ค์œผ๋กœ ์‹œ๋“œ
- ๋Œ€๋ฌธ์ž/์ˆซ์ž ์กฐํ•ฉ
"""
alphabet = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" if forbid_ambiguous else string.ascii_uppercase + string.digits
# secrets ๊ธฐ๋ฐ˜ ๋‚œ์ˆ˜ + pid/time ์„ž์–ด์„œ ํ•ด์‹œ ๋น„์Šทํ•œ ํšจ๊ณผ
rnd = secrets.token_hex(8) + pid + str(time.time_ns())
# ์„ž๊ณ  ๋ฝ‘๊ธฐ
rng = secrets.SystemRandom()
return "".join(rng.choice(alphabet) for _ in range(length))
MUTED_CACHE_DIR = "/tmp/hf_video_cache_muted"
pathlib.Path(MUTED_CACHE_DIR).mkdir(parents=True, exist_ok=True)
def _sha1_8(s: str) -> str:
return hashlib.sha1(s.encode("utf-8")).hexdigest()[:8]
def ensure_muted_copy(src_path: str) -> str:
"""
์ฃผ์–ด์ง„ mp4์—์„œ ์˜ค๋””์˜ค ํŠธ๋ž™์„ ์ œ๊ฑฐ(-an)ํ•œ ๋ฌด์Œ ๋ณต์‚ฌ๋ณธ์„ ์บ์‹œ์— ์ƒ์„ฑ.
์ด๋ฏธ ์žˆ์œผ๋ฉด ๊ทธ ํŒŒ์ผ ๊ฒฝ๋กœ ๋ฐ˜ํ™˜.
์‹คํŒจํ•˜๋ฉด ์›๋ณธ ๊ฒฝ๋กœ ๋ฐ˜ํ™˜.
"""
if not src_path or not os.path.exists(src_path):
return src_path
out = os.path.join(MUTED_CACHE_DIR, _sha1_8(src_path) + ".mp4")
if os.path.exists(out):
return out
try:
subprocess.run(
["ffmpeg", "-y", "-i", src_path, "-c:v", "copy", "-an", out],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
return out if os.path.exists(out) else src_path
except Exception:
return src_path
FEEDBACK_FILE = "final_feedback.csv"
def _read_feedback_bytes():
try:
p = hf_hub_download(
repo_id=REPO_ID, filename=FEEDBACK_FILE, repo_type="dataset",
token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False
)
return open(p, "rb").read()
except Exception:
return None
def _append_feedback(old_bytes, row):
s = io.StringIO()
w = csv.writer(s)
if not old_bytes:
# ์ตœ์ข… ์ฝ”๋ฉ˜ํŠธ ์ „์šฉ CSV ํ—ค๋”
w.writerow(["ts_iso", "participant_id", "final_comment"])
else:
s.write(old_bytes.decode("utf-8", errors="ignore"))
w.writerow(row)
return s.getvalue().encode("utf-8")
def push_final_feedback(participant_id: str, comment: str):
"""๋งˆ์ง€๋ง‰์— ๋ฐ›๋Š” ์ž์œ  ์ฝ”๋ฉ˜ํŠธ๋ฅผ FEEDBACK_FILE๋กœ ์ €์žฅ."""
if not participant_id or not participant_id.strip():
return gr.update(visible=True, value="โ— Missing participant ID.")
if comment is None or not str(comment).strip():
# ๋น„์–ด์žˆ์œผ๋ฉด ์ €์žฅํ•˜์ง€ ์•Š๊ณ  ์•ˆ๋‚ด๋งŒ
return gr.update(visible=True, value="โ„น๏ธ No comment entered โ€” nothing to submit.")
try:
old = _read_feedback_bytes()
row = [datetime.utcnow().isoformat(), participant_id.strip(), comment.strip()]
newb = _append_feedback(old, row)
if not REPO_ID:
return gr.update(visible=True, value="โ— RESULTS_REPO is not set.")
if not HF_TOKEN:
return gr.update(visible=True, value="โ— HF_TOKEN is missing. Set a write token for the dataset repo.")
api.upload_file(
path_or_fileobj=io.BytesIO(newb),
path_in_repo=FEEDBACK_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="append final feedback"
)
return gr.update(visible=True, value="โœ… Thanks! Your feedback was submitted.")
except Exception as e:
return gr.update(
visible=True,
value=f"โŒ Feedback save failed: {type(e).__name__}: {e}"
)
# -------------------- Config --------------------
REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_eval") # ์—…๋กœ๋“œํ•œ ๋ฆฌํฌ์™€ ์ผ์น˜
HF_TOKEN = os.getenv("HF_TOKEN")
RESULTS_FILE = "results.csv"
TOTAL_PER_PARTICIPANT = 30 # ๋ชฉํ‘œ ํ‰๊ฐ€ ๊ฐœ์ˆ˜(์„ธ์…˜ ๊ธฐ์ค€)
# videos.json ์˜ˆ์‹œ: {"url": "...mp4", "id": "BodyWeightSquats__XXXX.mp4", "action": "BodyWeightSquats"}
with open("videos.json", "r", encoding="utf-8") as f:
V = json.load(f)
api = HfApi()
# ๊ต์ˆ˜๋‹˜ ์ง€์นจ(๊ทธ๋Œ€๋กœ, ๊ตต๊ฒŒ ์ฒ˜๋ฆฌ ํฌํ•จ)
# INSTRUCTION_MD = """
# **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate how well the personโ€™s action in the AI-generated video matches the action specified as "**expected action**". Some things to keep in mind:
# - The generated video should **capture** the expected action **throughout the video**.
# - Try to **focus only** on the expected action and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**.
# - You will be **paid** once **all the videos are viewed and rated**.
# """
INSTRUCTION_MD = """
**Task:** You will watch a series of **AI-generated videos**.
For each video, your job is to rate:
1. **Action Consistency** - how well the personโ€™s action matches the action specified as the "**Expected action**".
2. **Physical Plausibility** - how natural and physically possible the motion looks.
Some things to keep in mind:
- Try to **focus only** on the expected action and motion quality, and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**.
- For **physical plausibility**, look for **smooth and realistic** motion without impossible poses, missing limbs, or extreme stretching.
- Action consistency and physical plausibility are **not mutually exclusive** with each other.
- **Physically plausible motion does not imply correct depiction of action.**
- **A video cannot portray action consistency if it has physically impossible movements.**
- **0: indicates really poor depiction, 10: represents perfect, realistic depiction**
- The **Save & Next** button will be enabled **only after you have clicked or adjusted both sliders at least once**.
- You will be **paid** once **all the videos are viewed and rated**.
"""
# -------------------- Helper funcs --------------------
def _load_eval_counts():
"""
Hugging Face dataset์˜ results.csv๋ฅผ ์ฝ์–ด video_id๋ณ„ ํ‰๊ฐ€ ๊ฐœ์ˆ˜(dict)๋ฅผ ๋ฐ˜ํ™˜.
์—†์œผ๋ฉด 0์œผ๋กœ ์ดˆ๊ธฐํ™”.
"""
# ๋ชจ๋“  id๋ฅผ 0์œผ๋กœ ์ดˆ๊ธฐํ™”
counts = {}
for v in V:
vid = _get_video_id(v)
counts[vid] = 0
b = _read_csv_bytes()
if not b:
return counts
s = io.StringIO(b.decode("utf-8", errors="ignore"))
r = csv.reader(s)
rows = list(r)
if not rows:
return counts
# ํ—ค๋” ํŒŒ์•…
header = rows[0]
body = rows[1:] if header and ("video_id" in header or "overall" in header) else rows
vid_col = None
if header and "video_id" in header:
vid_col = header.index("video_id")
for row in body:
try:
vid = row[vid_col] if vid_col is not None else row[2] # ๊ธฐ๋ณธ ํฌ๋งท: ts, pid, video_id, overall, notes
if vid in counts:
counts[vid] += 1
except Exception:
continue
return counts
def _get_video_id(v: dict) -> str:
if "id" in v and v["id"]:
return v["id"]
# id๊ฐ€ ์—†์œผ๋ฉด URL ํŒŒ์ผ๋ช…์œผ๋กœ ๋Œ€์ฒด
return os.path.basename(v.get("url", ""))
def _read_csv_bytes():
try:
p = hf_hub_download(
repo_id=REPO_ID, filename=RESULTS_FILE, repo_type="dataset",
token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False
)
return open(p, "rb").read()
except Exception:
return None
# def _append(old_bytes, row):
# s = io.StringIO()
# w = csv.writer(s)
# if not old_bytes:
# # โœ… ์ƒˆ ํ—ค๋”
# w.writerow(["ts_iso", "participant_id", "video_id", "overall", "notes"])
# else:
# s.write(old_bytes.decode("utf-8", errors="ignore"))
# w.writerow(row)
# return s.getvalue().encode("utf-8")
def _append(old_bytes, row):
s = io.StringIO()
w = csv.writer(s)
if not old_bytes:
# โœ… new header with two scores
w.writerow(["ts_iso", "participant_id", "video_id",
"action_consistency", "physical_plausibility", "notes"])
else:
s.write(old_bytes.decode("utf-8", errors="ignore"))
w.writerow(row)
return s.getvalue().encode("utf-8")
# def push(participant_id, video_id, score, notes=""):
# if not participant_id or not participant_id.strip():
# return gr.update(visible=True, value="โ— Please enter your Participant ID before proceeding.")
# if not video_id or score is None:
# return gr.update(visible=True, value="โ— Fill out all fields.")
# try:
# old = _read_csv_bytes()
# row = [
# datetime.utcnow().isoformat(),
# participant_id.strip(),
# video_id, # โœ… action ๋Œ€์‹  video_id ์ €์žฅ
# float(score), # overall
# notes or ""
# ]
# newb = _append(old, row)
# if not REPO_ID:
# return gr.update(visible=True, value="โ— RESULTS_REPO is not set.")
# if not HF_TOKEN:
# return gr.update(visible=True, value="โ— HF_TOKEN is missing. Set a write token for the dataset repo.")
# api.upload_file(
# path_or_fileobj=io.BytesIO(newb),
# path_in_repo=RESULTS_FILE,
# repo_id=REPO_ID,
# repo_type="dataset",
# token=HF_TOKEN,
# commit_message="append"
# )
# return gr.update(visible=True, value=f"โœ… Saved successfully!")
# except Exception as e:
# return gr.update(
# visible=True,
# value=f"โŒ Save failed: {type(e).__name__}: {e}\n"
# f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing"
# )
def push(participant_id, video_id, action_score, phys_score, notes=""):
if not participant_id or not participant_id.strip():
return gr.update(visible=True, value="โ— Please enter your Participant ID before proceeding.")
if not video_id or action_score is None or phys_score is None:
return gr.update(visible=True, value="โ— Fill out all fields.")
try:
old = _read_csv_bytes()
row = [
datetime.utcnow().isoformat(),
participant_id.strip(),
video_id,
float(action_score),
float(phys_score),
notes or ""
]
newb = _append(old, row)
if not REPO_ID:
return gr.update(visible=True, value="โ— RESULTS_REPO is not set.")
if not HF_TOKEN:
return gr.update(visible=True, value="โ— HF_TOKEN is missing. Set a write token for the dataset repo.")
api.upload_file(
path_or_fileobj=io.BytesIO(newb),
path_in_repo=RESULTS_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="append"
)
return gr.update(visible=True, value=f"โœ… Saved successfully!")
except Exception as e:
return gr.update(
visible=True,
value=f"โŒ Save failed: {type(e).__name__}: {e}\n"
f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing"
)
def _extract_action(v):
if "action" in v and v["action"]:
return v["action"]
raw = v.get("id", "")
return raw.split("__")[0].split(".")[0]
def pick_one():
v = random.choice(V)
return v["url"], _extract_action(v)
def _progress_html(done, total):
pct = int(100 * done / max(1, total))
return f"""
<div style="border:1px solid #ddd; height:20px; border-radius:6px; overflow:hidden; margin-top:6px;">
<div style="height:100%; width:{pct}%; background:#3b82f6;"></div>
</div>
<div style="font-size:12px; margin-top:4px;">{done} / {total}</div>
"""
def _build_order_with_anchor(total:int, anchor_idx:int, repeats:int, pool_size:int, min_gap:int=1):
"""
total: TOTAL_PER_PARTICIPANT (e.g., 30)
anchor_idx: index of the anchor video in V (0 for first item)
repeats: how many times to show anchor (e.g., 5)
pool_size: len(V)
min_gap: ์ตœ์†Œ ๊ฐ„๊ฒฉ(์ธ์ ‘ ๊ธˆ์ง€ => 1)
return: list of indices (length=total)
"""
assert repeats <= total, "repeats must be <= total"
assert pool_size >= 1, "videos pool must be non-empty"
# 1) ๋‹ค๋ฅธ ๋น„๋””์˜ค 25๊ฐœ(์ค‘๋ณต ์—†์ด) ๋ฝ‘๊ธฐ
others_needed = total - repeats
# anchor๋ฅผ ์ œ์™ธํ•œ ํ›„๋ณด ์ธ๋ฑ์Šค
candidates = list(range(1, pool_size)) if anchor_idx == 0 else [i for i in range(pool_size) if i != anchor_idx]
if len(candidates) < others_needed:
raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.")
others = random.sample(candidates, k=others_needed)
# 2) ๊ธฐ๋ณธ ์‹œํ€€์Šค(others)๋ฅผ ๋ฌด์ž‘์œ„๋กœ ์„ž๊ธฐ
random.shuffle(others)
# 3) ์•ต์ปค๋ฅผ min_gap๋ฅผ ๋งŒ์กฑํ•˜๋„๋ก ์‚ฝ์ž…ํ•  ์œ„์น˜ ์„ ์ •
# 30๊ฐœ๋ฅผ 5๊ตฌ๊ฐ„์œผ๋กœ ๋‚˜๋ˆ , ๊ฐ ๊ตฌ๊ฐ„ ๋‚ด์—์„œ ์ถฉ๋Œ ๋œ ๋‚˜๊ฒŒ ๋ฐฐ์น˜
# (๊ฐ„๋‹จํ•˜๊ณ  ์•ˆ์ •์ ์ธ ๋ฐฉ์‹)
seq = others[:] # ๊ธธ์ด=25
anchor_positions = []
segment = total // repeats # 30//5 = 6
for k in range(repeats):
# ๊ฐ ๊ตฌ๊ฐ„ [k*segment, (k+1)*segment) ์•ˆ์—์„œ ํ›„๋ณด ์œ„์น˜๋ฅผ ๊ณ ๋ฆ„
lo = k * segment
hi = (k + 1) * segment if k < repeats - 1 else total # ๋งˆ์ง€๋ง‰์€ ๋๊นŒ์ง€
# ๊ฒฝ๊ณ„ ๋‚ด ์ž„์˜ ์˜คํ”„์…‹ ์„ ํƒ (์—ฌ์œ ๋ฅผ ๋‘๊ณ  ์ถฉ๋Œ์„ ํ”ผํ•จ)
candidate_pos = random.randrange(lo, hi)
# ์ธ์ ‘ ๊ธˆ์ง€ ๋ณด์ •: ์ด๋ฏธ ๋ฐฐ์ •๋œ anchor ์œ„์น˜๋“ค๊ณผ์˜ ๊ฑฐ๋ฆฌ๊ฐ€ min_gap ์ด์ƒ ๋˜๋„๋ก ์กฐ์ •
# ํ•„์š” ์‹œ ์ขŒ์šฐ๋กœ ๊ทผ์ ‘ํ•œ ๋นˆ ์Šฌ๋กฏ ํƒ์ƒ‰
def ok(pos):
return all(abs(pos - p) >= min_gap + 1 for p in anchor_positions) # ์—ฐ์†๊ธˆ์ง€ => ๊ฑฐ๋ฆฌ >= 2
# ๊ทผ๋ฐฉ ํƒ์ƒ‰ ํญ
found = None
for delta in range(0, segment): # ๊ตฌ๊ฐ„ ํฌ๊ธฐ ๋‚ด์—์„œ ํƒ์ƒ‰
# ์ขŒ/์šฐ ๋ฒˆ๊ฐˆ์•„๊ฐ€๋ฉฐ ํ›„๋ณด ์‹œ๋„
for sign in (+1, -1):
pos = candidate_pos + sign * delta
if 0 <= pos < total and ok(pos):
found = pos
break
if found is not None:
break
if found is None:
# ์ตœํ›„: 0..total-1 ๋ฒ”์œ„์—์„œ ์•„๋ฌด ๋ฐ๋‚˜ ์ถฉ๋Œ ์—†๋Š” ๊ณณ ์ฐพ๊ธฐ
for pos in range(total):
if ok(pos):
found = pos
break
if found is None:
raise RuntimeError("Failed to place anchor without adjacency. Try different strategy or loosen min_gap.")
anchor_positions.append(found)
# 4) others๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ธธ์ด total์˜ ๋นˆ ์‹œํ€€์Šค๋ฅผ ๋งŒ๋“ค๊ณ  anchor๋ฅผ ์ฃผ์ž…
# ์šฐ์„  ๋นˆ ๋ฆฌ์ŠคํŠธ๋ฅผ ๋งŒ๋“ค๊ณ  anchor ์œ„์น˜๋ฅผ ์ฑ„์šด ํ›„, ๋‚˜๋จธ์ง€๋ฅผ others๋กœ ์ฑ„์›€
result = [None] * total
for pos in anchor_positions:
result[pos] = anchor_idx
# others ํฌ์ธํ„ฐ
j = 0
for i in range(total):
if result[i] is None:
result[i] = others[j]
j += 1
# ์•ˆ์ „์ฒดํฌ
assert len(result) == total
# ์ธ์ ‘ anchor ์—†๋Š”์ง€ ํ™•์ธ
for i in range(1, total):
assert not (result[i] == anchor_idx and result[i-1] == anchor_idx), "Adjacent anchors found."
# anchor ๊ฐœ์ˆ˜ ํ™•์ธ
assert sum(1 for x in result if x == anchor_idx) == repeats, "Anchor count mismatch."
return result
# -------------------- Example videos (download to local cache) --------------------
EXAMPLES = {
"BodyWeightSquats": {
"real": "examples/BodyWeightSquats_real.mp4",
"bad": "examples/BodyWeightSquats_bad.mp4",
},
"WallPushUps": {
"real": "examples/WallPushUps_real.mp4",
"bad": "examples/WallPushUps_bad.mp4",
},
}
EX_CACHE = {}
for cls, files in EXAMPLES.items():
EX_CACHE[cls] = {"real": None, "bad": None}
for kind, fname in files.items():
try:
EX_CACHE[cls][kind] = hf_hub_download(
repo_id=REPO_ID,
filename=fname,
repo_type="dataset",
token=HF_TOKEN,
local_dir="/tmp",
local_dir_use_symlinks=False,
)
except Exception as e:
print(f"[WARN] example missing: {cls} {kind} -> {fname}: {e}")
for cls in EX_CACHE:
for kind in EX_CACHE[cls]:
if EX_CACHE[cls][kind]:
EX_CACHE[cls][kind] = ensure_muted_copy(EX_CACHE[cls][kind])
CLEAN_BG_CSS = r"""
/* ์ „์ฒด ํŽ˜์ด์ง€ ๋ฐฐ๊ฒฝ์„ ํฐ์ƒ‰์œผ๋กœ (Spaces์˜ ํšŒ์ƒ‰ ๋ฐฐ๊ฒฝ ์ œ๊ฑฐ) */
html, body { background:#ffffff !important; }
/* Intro/Eval ์„น์…˜์˜ ๋ž˜ํผ ๋ฐ•์Šค๋งŒ ํˆฌ๋ช… ์ฒ˜๋ฆฌ (์ž…๋ ฅ์ฐฝ/์Šฌ๋ผ์ด๋” ๋“ฑ ์ปจํŠธ๋กค์€ ๋†”๋‘ ) */
#intro .gr-panel, #intro .gr-group, #intro .gr-box, #intro .gr-row, #intro .gr-column,
#eval .gr-panel, #eval .gr-group, #eval .gr-box, #eval .gr-row, #eval .gr-column {
background: transparent !important;
box-shadow: none !important;
border-color: transparent !important;
}
/* ๋น„๋””์˜ค ์นด๋“œ ์ฃผ๋ณ€ ํˆด๋ฐ”(์ž‘์€ ํšŒ์ƒ‰ ๋ฐ•์Šค)๋„ ํˆฌ๋ช…ํ•˜๊ฒŒ */
#intro [data-testid="block-video"] .prose,
#eval [data-testid="block-video"] .prose {
background: transparent !important;
box-shadow: none !important;
border-color: transparent !important;
}
"""
# -------------------- UI --------------------
with gr.Blocks(css=CLEAN_BG_CSS) as demo:
# Blocks ์•ˆ, ์–ด๋””์„œ๋“  ํ•œ ๋ฒˆ๋งŒ ์ถ”๊ฐ€(๊ถŒ์žฅ ์œ„์น˜: Blocks ์‹œ์ž‘ ์งํ›„)
gr.HTML("""
<script>
(function(){
// ๋ชจ๋“  ์ผ๋ฐ˜ DOM + Shadow DOM ์•ˆ์˜ <video>๊นŒ์ง€ ์ฐพ์•„ ์Œ์†Œ๊ฑฐ
function eachNodeWithShadow(root, fn){
const walker = document.createTreeWalker(root, NodeFilter.SHOW_ELEMENT);
fn(root);
while (walker.nextNode()){
const el = walker.currentNode;
fn(el);
if (el.shadowRoot){
eachNodeWithShadow(el.shadowRoot, fn);
}
}
}
function muteVideoEl(v){
try{
// ์žฌ์ƒ ์ „์— ๋ฐ˜๋“œ์‹œ muted ์†์„ฑ์ด ์žˆ์–ด์•ผ ๋ธŒ๋ผ์šฐ์ €๊ฐ€ ์†Œ๋ฆฌ ์ฐจ๋‹จ
v.muted = true;
v.volume = 0.0;
v.setAttribute('muted','');
v.setAttribute('playsinline','');
v.setAttribute('preload','metadata');
// ์‚ฌ์šฉ์ž๊ฐ€ ๋ณผ๋ฅจ/์Œ์†Œ๊ฑฐ๋ฅผ ๋ฐ”๊ฟ”๋„ ๋‹ค์‹œ 0์œผ๋กœ
if (!v._muteHooked){
v.addEventListener('volumechange', () => {
if (!v.muted || v.volume > 0) { v.muted = true; v.volume = 0.0; }
});
v.addEventListener('play', () => {
v.muted = true; v.volume = 0.0;
});
v._muteHooked = true;
}
}catch(e){}
}
function muteAll(root){
eachNodeWithShadow(root || document, (el)=>{
if (el.tagName && el.tagName.toLowerCase() === 'video'){
muteVideoEl(el);
}else if (el.querySelectorAll){
el.querySelectorAll('video').forEach(muteVideoEl);
}
});
}
// ์ดˆ๊ธฐ ๋ Œ๋”์—์„œ
muteAll(document);
// ์ดํ›„ DOM ๋ณ€ํ™” ๊ฐ์‹œ(Shadow DOM ๋‚ด๋ถ€ ๋ณ€ํ™”๋„ ํฌ์ฐฉ)
const obs = new MutationObserver(() => muteAll(document));
obs.observe(document, {subtree:true, childList:true, attributes:false});
})();
</script>
""")
order_state = gr.State(value=[]) # v4์—์„œ๋Š” value= ๊ถŒ์žฅ
ptr_state = gr.State(value=0)
cur_video_id = gr.State(value="")
reward_code_state = gr.State(value="") # ์™„๋ฃŒ ์‹œ ์ƒ์„ฑํ•œ ์ฝ”๋“œ ์ €์žฅ(์ค‘๋ณต ์ƒ์„ฑ ๋ฐฉ์ง€)
selected_action = gr.State(value=None) # ์•„์ง ์„ ํƒ ์—†์Œ
selected_phys = gr.State(value=None) # ์•„์ง ์„ ํƒ ์—†์Œ
# ------------------ PAGE 1: Intro + Examples ------------------
# page_intro = gr.Group(visible=True)
page_intro = gr.Group(visible=True, elem_id="intro")
with page_intro:
# gr.Markdown("## ๐ŸŽฏ Action Consistency Human Evaluation")
gr.Markdown("## ๐ŸŽฏ Human Evaluation: Action Consistency & Physical Plausibility")
gr.Markdown(INSTRUCTION_MD)
understood = gr.Checkbox(label="I have read and understand the task.", value=False)
start_btn = gr.Button("Yes, start", variant="secondary", interactive=False)
def _toggle_start(checked: bool):
return gr.update(interactive=checked, variant=("primary" if checked else "secondary"))
understood.change(_toggle_start, inputs=understood, outputs=start_btn)
# Examples: Squats
with gr.Group():
gr.Markdown("### Examples: BodyWeightSquats")
with gr.Row():
with gr.Column():
gr.Markdown("**Expected depiction of action**")
gr.Video(value=EX_CACHE["BodyWeightSquats"]["real"], height=240, autoplay=False, elem_id="ex_squats_real",)
with gr.Column():
gr.Markdown("**Poorly generated action**")
gr.Video(value=EX_CACHE["BodyWeightSquats"]["bad"], height=240, autoplay=False)
if not (EX_CACHE["BodyWeightSquats"]["real"] and EX_CACHE["BodyWeightSquats"]["bad"]):
gr.Markdown("> โš ๏ธ Upload `examples/BodyWeightSquats_real.mp4` and `_bad.mp4` to show both samples.")
# Examples: WallPushUps
with gr.Group():
gr.Markdown("### Examples: WallPushUps")
with gr.Row():
with gr.Column():
gr.Markdown("**Expected depiction of action**")
gr.Video(value=EX_CACHE["WallPushUps"]["real"], height=240, autoplay=False, elem_id="ex_wallpushups_real",)
with gr.Column():
gr.Markdown("**Poorly generated action**")
gr.Video(value=EX_CACHE["WallPushUps"]["bad"], height=240, autoplay=False)
if not (EX_CACHE["WallPushUps"]["real"] and EX_CACHE["WallPushUps"]["bad"]):
gr.Markdown("> โš ๏ธ Upload `examples/WallPushUps_real.mp4` and `_bad.mp4` to show both samples.")
gr.HTML("""
<script>
(function(){
const ids = [
"ex_squats_real",
"ex_wallpushups_real",
];
function muteById(id){
const host = document.getElementById(id);
if(!host) return;
// gr.Video๋Š” Shadow DOM ๋‚ด๋ถ€์— <video>๊ฐ€ ๋žœ๋”๋ง๋จ
const roots = [host, host.shadowRoot].filter(Boolean);
roots.forEach(root=>{
const vids = root.querySelectorAll('video');
vids.forEach(v=>{
try{
v.muted = true;
v.volume = 0.0;
v.setAttribute('muted','');
v.setAttribute('playsinline','');
// ์‚ฌ์šฉ์ž๊ฐ€ ๋ฐ”๊ฟ”๋„ ์ฆ‰์‹œ ์›๋ณต
if(!v._exMuteHooked){
v.addEventListener('volumechange', ()=>{
if(!v.muted || v.volume > 0){ v.muted = true; v.volume = 0.0; }
});
v.addEventListener('play', ()=>{
v.muted = true; v.volume = 0.0;
});
v._exMuteHooked = true;
}
}catch(e){}
});
});
}
function apply(){
ids.forEach(muteById);
}
// ์ดˆ๊ธฐ + DOM๋ณ€๊ฒฝ ์‹œ ์ ์šฉ
apply();
new MutationObserver(apply).observe(document, {subtree:true, childList:true});
})();
</script>
""")
# ------------------ PAGE 2: Evaluation ------------------
page_eval = gr.Group(visible=False, elem_id="eval")
with page_eval:
# PID ์ž…๋ ฅ
with gr.Row():
pid = gr.Textbox(label="Participant ID (required)", placeholder="e.g., Youngsun-2025/10/24")
# ์ง€์นจ(์›๋ฌธ) + ๋น„๋””์˜ค + ์ง„ํ–‰๋ฐ” / ์˜ค๋ฅธ์ชฝ์— ์Šฌ๋ผ์ด๋” + Save&Next
with gr.Row(): #equal_height=True
with gr.Column(scale=1):
gr.Markdown(INSTRUCTION_MD) # ๊ต์ˆ˜๋‹˜ ๋ฌธ๊ตฌ ๊ทธ๋Œ€๋กœ
video = gr.Video(label="Video", height=360)
progress = gr.HTML(_progress_html(0, TOTAL_PER_PARTICIPANT))
with gr.Column(scale=1):
action_tb = gr.Textbox(label="Expected action", interactive=False)
# NEW: two separate sliders
score_action = gr.Slider(
minimum=0.0, maximum=10.0, step=0.1, value=5.0,
label="Action Consistency (0.0 - 10.0)"
)
score_phys = gr.Slider(
minimum=0.0, maximum=10.0, step=0.1, value=5.0,
label="Physical Plausibility (0.0 - 10.0)"
)
save_next = gr.Button("๐Ÿ’พ Save & Next โ–ถ", variant="secondary", interactive=False)
status = gr.Markdown(visible=False)
done_state = gr.State(0)
# PID ์ž…๋ ฅ์— ๋”ฐ๋ผ Save&Next ํ† ๊ธ€
def _toggle_by_pid(pid_text: str):
enabled = bool(pid_text and pid_text.strip())
return gr.update(interactive=enabled, variant=("primary" if enabled else "secondary"))
# pid.change(_toggle_by_pid, inputs=pid, outputs=save_next)
def _on_action_release(val: float, pid_text: str, sel_p):
# ์‚ฌ์šฉ์ž๊ฐ€ ์•ก์…˜ ์Šฌ๋ผ์ด๋”๋ฅผ ํ•œ ๋ฒˆ์ด๋ผ๋„ ๋†“์œผ๋ฉด ์„ ํƒ ์™„๋ฃŒ
return val, _recompute_save(pid_text, val, sel_p)
def _on_phys_release(val: float, pid_text: str, sel_a):
return val, _recompute_save(pid_text, sel_a, val)
# release: ๋งˆ์šฐ์Šค๋ฅผ ๋†“์„ ๋•Œ 1ํšŒ ํ™•์ •
score_action.release(_on_action_release,
inputs=[score_action, pid, selected_phys],
outputs=[selected_action, save_next])
score_phys.release(_on_phys_release,
inputs=[score_phys, pid, selected_action],
outputs=[selected_phys, save_next])
# PID๊ฐ€ ๋ฐ”๋€Œ๋ฉด ํ˜„์žฌ ์„ ํƒ ์ƒํƒœ๋กœ Save ๋ฒ„ํŠผ ์žฌ๊ณ„์‚ฐ
pid.change(_recompute_save,
inputs=[pid, selected_action, selected_phys],
outputs=save_next)
page_thanks = gr.Group(visible=False)
with page_thanks:
reward_msg = gr.Markdown(visible=False)
reward_code_box = gr.Textbox(label="Your reward code (copy & paste)", interactive=False, visible=False)
reward_pid_box = gr.Textbox(label="Your participant ID", interactive=False, visible=False)
gr.Markdown("### Any comments (optional)")
feedback_tb = gr.Textbox(
label="Any comments (optional)",
placeholder="Leave any feedback for the study organizers (optional).",
lines=4
)
feedback_submit = gr.Button("Submit feedback")
feedback_status = gr.Markdown(visible=False)
# -------- ํŽ˜์ด์ง€ ์ „ํ™˜ & ์ฒซ ๋กœ๋“œ --------
ANCHOR_IDX = 0 # videos.json์˜ ๋งจ ์ฒซ ๋น„๋””์˜ค
ANCHOR_REPEATS = 5 # ์•ต์ปค 5ํšŒ
MIN_GAP = 1 # ์•ต์ปค ์—ฐ์† ๊ธˆ์ง€(์ธ์ ‘ ๊ธˆ์ง€)
def _build_order_least_first_with_anchor(total:int, anchor_idx:int, repeats:int, min_gap:int=1):
"""
- results.csv๋ฅผ ์ฝ์–ด video_id๋ณ„ ์นด์šดํŠธ๋ฅผ ๊ณ„์‚ฐ
- ์•ต์ปค(์ฒซ ๋น„๋””์˜ค) 5ํšŒ ํฌํ•จ, ์—ฐ์† ๊ธˆ์ง€
- ๋‚˜๋จธ์ง€๋Š” '๊ฐ€์žฅ ์ ๊ฒŒ ํ‰๊ฐ€๋œ ์ˆœ'์œผ๋กœ ์ค‘๋ณต ์—†์ด ์ฑ„์›€
"""
assert repeats <= total
N = len(V)
assert N >= 1
# 0) id ๋งคํ•‘
def vid_of(i): return _get_video_id(V[i])
# 1) ํ˜„์žฌ ๋ˆ„์  ์นด์šดํŠธ ๋กœ๋“œ
counts = _load_eval_counts()
# 2) ์•ต์ปค ์ œ์™ธ ํ›„๋ณด(์ค‘๋ณต ์—†์ด) ์ •๋ ฌ: ์นด์šดํŠธ ์˜ค๋ฆ„์ฐจ์ˆœ, ๋™๋ฅ ์€ ๋žœ๋ค ์…”ํ”Œ
anchor_vid = vid_of(anchor_idx)
candidates = [i for i in range(N) if i != anchor_idx]
# ๋™๋ฅ  ๋žœ๋คํ™”๋ฅผ ์œ„ํ•ด ์ผ๋‹จ ์…”ํ”Œ
random.shuffle(candidates)
candidates.sort(key=lambda i: counts.get(vid_of(i), 0))
others_needed = total - repeats
if len(candidates) < others_needed:
raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.")
others = candidates[:others_needed] # ์ค‘๋ณต ์—†์ด ์„ ํƒ
# 3) others๋ฅผ ๋ฒ ์ด์Šค ์‹œํ€€์Šค๋กœ(๋žœ๋ค ์‚ด์ง ์„ž๊ธฐ)
random.shuffle(others)
# 4) ์•ต์ปค๋ฅผ ๊ตฌ๊ฐ„ ๋ฐฐ์น˜(์—ฐ์† ๊ธˆ์ง€)
seq = [None] * total
segment = total // repeats if repeats > 0 else total
anchor_positions = []
for k in range(repeats):
lo = k * segment
hi = (k + 1) * segment if k < repeats - 1 else total
cand = random.randrange(lo, hi)
def ok(pos):
return all(abs(pos - p) >= (min_gap + 1) for p in anchor_positions)
found = None
for d in range(0, max(1, segment)):
for sgn in (+1, -1):
pos = cand + sgn * d
if 0 <= pos < total and ok(pos):
found = pos
break
if found is not None:
break
if found is None:
# ๋งˆ์ง€๋ง‰ ์ˆ˜๋‹จ: ์ „์ฒด ํƒ์ƒ‰
for pos in range(total):
if ok(pos):
found = pos
break
if found is None:
raise RuntimeError("Failed to place anchor without adjacency.")
anchor_positions.append(found)
for pos in anchor_positions:
seq[pos] = anchor_idx
# 5) ๋นˆ ์ž๋ฆฌ๋ฅผ others๋กœ ์ฑ„์šฐ๊ธฐ
j = 0
for i in range(total):
if seq[i] is None:
seq[i] = others[j]
j += 1
# 6) ์•ˆ์ „ ์ฒดํฌ
assert sum(1 for x in seq if x == anchor_idx) == repeats
for i in range(1, total):
assert not (seq[i] == anchor_idx and seq[i-1] == anchor_idx), "Adjacent anchors found."
return seq
def _start_and_load_first():
total = TOTAL_PER_PARTICIPANT
order = _build_order_least_first_with_anchor(
total=total,
anchor_idx=ANCHOR_IDX,
repeats=ANCHOR_REPEATS,
min_gap=MIN_GAP
)
first_idx = order[0]
v0 = V[first_idx]
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False),
v0["url"],
_extract_action(v0),
5.0, # score_action default
5.0, # score_phys default โœ… NEW
gr.update(visible=False, value=""),
0,
_progress_html(0, TOTAL_PER_PARTICIPANT),
order,
1,
_get_video_id(v0), # cur_video_id
"",
None, # โฌ…๏ธ selected_action ์ดˆ๊ธฐํ™”(= ์•„์ง ์„ ํƒ ์•ˆ๋จ)
None, # โฌ…๏ธ selected_phys ์ดˆ๊ธฐํ™”
gr.update(interactive=False, variant="secondary"), # โฌ…๏ธ Save ๋ฒ„ํŠผ ์ž ๊ธˆ
)
start_btn.click(
_start_and_load_first,
inputs=[],
outputs=[
page_intro, page_eval, page_thanks,
video, action_tb,
score_action, score_phys, # <-- two sliders
status,
done_state, progress, order_state, ptr_state, cur_video_id,
reward_code_state,
selected_action, selected_phys,
save_next,
]
)
def save_and_next(participant_id, cur_vid, action_score, phys_score,
done_cnt, order, ptr, reward_code):
try:
if not participant_id or not participant_id.strip():
return (
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=True, value="โ— Please enter your Participant ID."),
gr.update(), gr.update(),
done_cnt,
_progress_html(done_cnt, TOTAL_PER_PARTICIPANT),
5.0, # reset action slider
5.0, # reset phys slider โœ…
ptr,
cur_vid,
reward_code,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
None, # โฌ…๏ธ selected_action reset
None, # โฌ…๏ธ selected_phys reset
gr.update(interactive=False, variant="secondary"), # โฌ…๏ธ lock button
)
# save both scores
status_msg = push(participant_id, cur_vid, action_score, phys_score, "")
new_done = int(done_cnt) + 1
if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order):
code = reward_code.strip() or _gen_reward_code(participant_id, length=10)
try:
_persist_reward_code(participant_id, code, new_done)
except Exception:
pass
thanks_text = (
"## ๐ŸŽ‰ Thank you so much!\n"
"Your responses have been recorded. You may now close this window.\n\n"
"**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment."
)
return (
gr.update(visible=False),
gr.update(visible=True),
status_msg,
None,
"",
TOTAL_PER_PARTICIPANT,
_progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT),
5.0, # reset action
5.0, # reset phys
len(order),
cur_vid,
code,
gr.update(visible=True, value=thanks_text),
gr.update(visible=True, value=code),
gr.update(visible=True, value=participant_id.strip()),
None, # โฌ…๏ธ selected_action reset
None, # โฌ…๏ธ selected_phys reset
gr.update(interactive=False, variant="secondary"),
)
# next video
next_idx = order[ptr]
v = V[next_idx]
next_vid = _get_video_id(v)
return (
gr.update(visible=True),
gr.update(visible=False),
status_msg,
v["url"],
_extract_action(v),
new_done,
_progress_html(new_done, TOTAL_PER_PARTICIPANT),
5.0, # reset action
5.0, # reset phys
ptr + 1,
next_vid,
reward_code,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
None, # โฌ…๏ธ selected_action reset
None, # โฌ…๏ธ selected_phys reset
gr.update(interactive=False, variant="secondary"),
)
except Exception as e:
return (
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=True, value=f"โŒ Error: {type(e).__name__}: {e}"),
gr.update(), gr.update(),
done_cnt,
_progress_html(done_cnt, TOTAL_PER_PARTICIPANT),
5.0,
5.0,
ptr,
cur_vid,
reward_code,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
None, # โฌ…๏ธ reset on error too
None,
gr.update(interactive=False, variant="secondary"),
)
save_next.click(
save_and_next,
inputs=[pid, cur_video_id, score_action, score_phys, done_state, order_state, ptr_state, reward_code_state],
outputs=[
page_eval, page_thanks,
status, video, action_tb,
done_state, progress, score_action, score_phys,
ptr_state, cur_video_id,
reward_code_state,
reward_msg, reward_code_box, reward_pid_box,
selected_action, selected_phys, # โฌ…๏ธ ์ถ”๊ฐ€
save_next, # โฌ…๏ธ ๋ฒ„ํŠผ ์ƒํƒœ ๊ฐฑ์‹ 
]
)
feedback_submit.click(
push_final_feedback,
# ์™„๋ฃŒ ํ›„ page_thanks์—์„œ ๋ณด์—ฌ์ฃผ๋Š” participant ID ๋ฐ•์Šค๋ฅผ ์‚ฌ์šฉ (๊ฐ’์ด ์ฑ„์›Œ์ ธ ์žˆ์Œ)
inputs=[reward_pid_box, feedback_tb],
outputs=feedback_status
)
if __name__ == "__main__":
# demo.launch()
demo.launch(ssr_mode=False)