Spaces:
Sleeping
Sleeping
| import os, io, csv, json, random | |
| from datetime import datetime | |
| import gradio as gr | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import subprocess, pathlib, hashlib | |
| import secrets, time, string | |
| # ์ฌ์ฉ์๊ฐ '์ ํ'์ ์๋ฃํ ๊ฐ (์ด๊ธฐ์ None) | |
| selected_action = gr.State(None) | |
| selected_phys = gr.State(None) | |
| def _recompute_save(pid_text: str, sel_a, sel_p): | |
| ok = bool(pid_text and pid_text.strip()) and (sel_a is not None) and (sel_p is not None) | |
| return gr.update(interactive=ok, variant=("primary" if ok else "secondary")) | |
| REWARD_FILE = "reward_codes.csv" # ๋ฆฌ์๋ ์ฝ๋ ๊ธฐ๋ก์ฉ ํ์ผ (HF dataset ์์ ์ ์ฅ) | |
| def _read_codes_bytes(): | |
| try: | |
| p = hf_hub_download( | |
| repo_id=REPO_ID, filename=REWARD_FILE, repo_type="dataset", | |
| token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False | |
| ) | |
| return open(p, "rb").read() | |
| except Exception: | |
| return None | |
| def _append_code(old_bytes, row): | |
| s = io.StringIO() | |
| w = csv.writer(s) | |
| if not old_bytes: | |
| # ์ ํค๋ | |
| w.writerow(["ts_iso", "participant_id", "reward_code", "total_done"]) | |
| else: | |
| s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| w.writerow(row) | |
| return s.getvalue().encode("utf-8") | |
| def _persist_reward_code(pid: str, code: str, total_done: int): | |
| """๋ฆฌ์๋ ์ฝ๋๋ฅผ HF์ reward_codes.csv๋ก ๋์ ์ ์ฅ(append).""" | |
| old = _read_codes_bytes() | |
| row = [datetime.utcnow().isoformat(), pid.strip(), code, int(total_done)] | |
| newb = _append_code(old, row) | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(newb), | |
| path_in_repo=REWARD_FILE, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="append reward code" | |
| ) | |
| def _gen_reward_code(pid: str, length: int = 10, forbid_ambiguous: bool=True) -> str: | |
| """ | |
| ์ฐธ๊ฐ์์๊ฒ ๋ณด์ฌ์ค ๋๋ค ์ฝ๋. ์ถฉ๋ ์ํ ๋งค์ฐ ๋ฎ์. | |
| - PID + ์๊ฐ + ๋ณด์๋๋ค์ผ๋ก ์๋ | |
| - ๋๋ฌธ์/์ซ์ ์กฐํฉ | |
| """ | |
| alphabet = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" if forbid_ambiguous else string.ascii_uppercase + string.digits | |
| # secrets ๊ธฐ๋ฐ ๋์ + pid/time ์์ด์ ํด์ ๋น์ทํ ํจ๊ณผ | |
| rnd = secrets.token_hex(8) + pid + str(time.time_ns()) | |
| # ์๊ณ ๋ฝ๊ธฐ | |
| rng = secrets.SystemRandom() | |
| return "".join(rng.choice(alphabet) for _ in range(length)) | |
| MUTED_CACHE_DIR = "/tmp/hf_video_cache_muted" | |
| pathlib.Path(MUTED_CACHE_DIR).mkdir(parents=True, exist_ok=True) | |
| def _sha1_8(s: str) -> str: | |
| return hashlib.sha1(s.encode("utf-8")).hexdigest()[:8] | |
| def ensure_muted_copy(src_path: str) -> str: | |
| """ | |
| ์ฃผ์ด์ง mp4์์ ์ค๋์ค ํธ๋์ ์ ๊ฑฐ(-an)ํ ๋ฌด์ ๋ณต์ฌ๋ณธ์ ์บ์์ ์์ฑ. | |
| ์ด๋ฏธ ์์ผ๋ฉด ๊ทธ ํ์ผ ๊ฒฝ๋ก ๋ฐํ. | |
| ์คํจํ๋ฉด ์๋ณธ ๊ฒฝ๋ก ๋ฐํ. | |
| """ | |
| if not src_path or not os.path.exists(src_path): | |
| return src_path | |
| out = os.path.join(MUTED_CACHE_DIR, _sha1_8(src_path) + ".mp4") | |
| if os.path.exists(out): | |
| return out | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", src_path, "-c:v", "copy", "-an", out], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True | |
| ) | |
| return out if os.path.exists(out) else src_path | |
| except Exception: | |
| return src_path | |
| FEEDBACK_FILE = "final_feedback.csv" | |
| def _read_feedback_bytes(): | |
| try: | |
| p = hf_hub_download( | |
| repo_id=REPO_ID, filename=FEEDBACK_FILE, repo_type="dataset", | |
| token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False | |
| ) | |
| return open(p, "rb").read() | |
| except Exception: | |
| return None | |
| def _append_feedback(old_bytes, row): | |
| s = io.StringIO() | |
| w = csv.writer(s) | |
| if not old_bytes: | |
| # ์ต์ข ์ฝ๋ฉํธ ์ ์ฉ CSV ํค๋ | |
| w.writerow(["ts_iso", "participant_id", "final_comment"]) | |
| else: | |
| s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| w.writerow(row) | |
| return s.getvalue().encode("utf-8") | |
| def push_final_feedback(participant_id: str, comment: str): | |
| """๋ง์ง๋ง์ ๋ฐ๋ ์์ ์ฝ๋ฉํธ๋ฅผ FEEDBACK_FILE๋ก ์ ์ฅ.""" | |
| if not participant_id or not participant_id.strip(): | |
| return gr.update(visible=True, value="โ Missing participant ID.") | |
| if comment is None or not str(comment).strip(): | |
| # ๋น์ด์์ผ๋ฉด ์ ์ฅํ์ง ์๊ณ ์๋ด๋ง | |
| return gr.update(visible=True, value="โน๏ธ No comment entered โ nothing to submit.") | |
| try: | |
| old = _read_feedback_bytes() | |
| row = [datetime.utcnow().isoformat(), participant_id.strip(), comment.strip()] | |
| newb = _append_feedback(old, row) | |
| if not REPO_ID: | |
| return gr.update(visible=True, value="โ RESULTS_REPO is not set.") | |
| if not HF_TOKEN: | |
| return gr.update(visible=True, value="โ HF_TOKEN is missing. Set a write token for the dataset repo.") | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(newb), | |
| path_in_repo=FEEDBACK_FILE, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="append final feedback" | |
| ) | |
| return gr.update(visible=True, value="โ Thanks! Your feedback was submitted.") | |
| except Exception as e: | |
| return gr.update( | |
| visible=True, | |
| value=f"โ Feedback save failed: {type(e).__name__}: {e}" | |
| ) | |
| # -------------------- Config -------------------- | |
| REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_eval") # ์ ๋ก๋ํ ๋ฆฌํฌ์ ์ผ์น | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| RESULTS_FILE = "results.csv" | |
| TOTAL_PER_PARTICIPANT = 30 # ๋ชฉํ ํ๊ฐ ๊ฐ์(์ธ์ ๊ธฐ์ค) | |
| # videos.json ์์: {"url": "...mp4", "id": "BodyWeightSquats__XXXX.mp4", "action": "BodyWeightSquats"} | |
| with open("videos.json", "r", encoding="utf-8") as f: | |
| V = json.load(f) | |
| api = HfApi() | |
| # ๊ต์๋ ์ง์นจ(๊ทธ๋๋ก, ๊ตต๊ฒ ์ฒ๋ฆฌ ํฌํจ) | |
| # INSTRUCTION_MD = """ | |
| # **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate how well the personโs action in the AI-generated video matches the action specified as "**expected action**". Some things to keep in mind: | |
| # - The generated video should **capture** the expected action **throughout the video**. | |
| # - Try to **focus only** on the expected action and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. | |
| # - You will be **paid** once **all the videos are viewed and rated**. | |
| # """ | |
| INSTRUCTION_MD = """ | |
| **Task:** You will watch a series of **AI-generated videos**. | |
| For each video, your job is to rate: | |
| 1. **Action Consistency** - how well the personโs action matches the action specified as the "**Expected action**". | |
| 2. **Physical Plausibility** - how natural and physically possible the motion looks. | |
| Some things to keep in mind: | |
| - Try to **focus only** on the expected action and motion quality, and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. | |
| - For **physical plausibility**, look for **smooth and realistic** motion without impossible poses, missing limbs, or extreme stretching. | |
| - Action consistency and physical plausibility are **not mutually exclusive** with each other. | |
| - **Physically plausible motion does not imply correct depiction of action.** | |
| - **A video cannot portray action consistency if it has physically impossible movements.** | |
| - **0: indicates really poor depiction, 10: represents perfect, realistic depiction** | |
| - The **Save & Next** button will be enabled **only after you have clicked or adjusted both sliders at least once**. | |
| - You will be **paid** once **all the videos are viewed and rated**. | |
| """ | |
| # -------------------- Helper funcs -------------------- | |
| def _load_eval_counts(): | |
| """ | |
| Hugging Face dataset์ results.csv๋ฅผ ์ฝ์ด video_id๋ณ ํ๊ฐ ๊ฐ์(dict)๋ฅผ ๋ฐํ. | |
| ์์ผ๋ฉด 0์ผ๋ก ์ด๊ธฐํ. | |
| """ | |
| # ๋ชจ๋ id๋ฅผ 0์ผ๋ก ์ด๊ธฐํ | |
| counts = {} | |
| for v in V: | |
| vid = _get_video_id(v) | |
| counts[vid] = 0 | |
| b = _read_csv_bytes() | |
| if not b: | |
| return counts | |
| s = io.StringIO(b.decode("utf-8", errors="ignore")) | |
| r = csv.reader(s) | |
| rows = list(r) | |
| if not rows: | |
| return counts | |
| # ํค๋ ํ์ | |
| header = rows[0] | |
| body = rows[1:] if header and ("video_id" in header or "overall" in header) else rows | |
| vid_col = None | |
| if header and "video_id" in header: | |
| vid_col = header.index("video_id") | |
| for row in body: | |
| try: | |
| vid = row[vid_col] if vid_col is not None else row[2] # ๊ธฐ๋ณธ ํฌ๋งท: ts, pid, video_id, overall, notes | |
| if vid in counts: | |
| counts[vid] += 1 | |
| except Exception: | |
| continue | |
| return counts | |
| def _get_video_id(v: dict) -> str: | |
| if "id" in v and v["id"]: | |
| return v["id"] | |
| # id๊ฐ ์์ผ๋ฉด URL ํ์ผ๋ช ์ผ๋ก ๋์ฒด | |
| return os.path.basename(v.get("url", "")) | |
| def _read_csv_bytes(): | |
| try: | |
| p = hf_hub_download( | |
| repo_id=REPO_ID, filename=RESULTS_FILE, repo_type="dataset", | |
| token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False | |
| ) | |
| return open(p, "rb").read() | |
| except Exception: | |
| return None | |
| # def _append(old_bytes, row): | |
| # s = io.StringIO() | |
| # w = csv.writer(s) | |
| # if not old_bytes: | |
| # # โ ์ ํค๋ | |
| # w.writerow(["ts_iso", "participant_id", "video_id", "overall", "notes"]) | |
| # else: | |
| # s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| # w.writerow(row) | |
| # return s.getvalue().encode("utf-8") | |
| def _append(old_bytes, row): | |
| s = io.StringIO() | |
| w = csv.writer(s) | |
| if not old_bytes: | |
| # โ new header with two scores | |
| w.writerow(["ts_iso", "participant_id", "video_id", | |
| "action_consistency", "physical_plausibility", "notes"]) | |
| else: | |
| s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| w.writerow(row) | |
| return s.getvalue().encode("utf-8") | |
| # def push(participant_id, video_id, score, notes=""): | |
| # if not participant_id or not participant_id.strip(): | |
| # return gr.update(visible=True, value="โ Please enter your Participant ID before proceeding.") | |
| # if not video_id or score is None: | |
| # return gr.update(visible=True, value="โ Fill out all fields.") | |
| # try: | |
| # old = _read_csv_bytes() | |
| # row = [ | |
| # datetime.utcnow().isoformat(), | |
| # participant_id.strip(), | |
| # video_id, # โ action ๋์ video_id ์ ์ฅ | |
| # float(score), # overall | |
| # notes or "" | |
| # ] | |
| # newb = _append(old, row) | |
| # if not REPO_ID: | |
| # return gr.update(visible=True, value="โ RESULTS_REPO is not set.") | |
| # if not HF_TOKEN: | |
| # return gr.update(visible=True, value="โ HF_TOKEN is missing. Set a write token for the dataset repo.") | |
| # api.upload_file( | |
| # path_or_fileobj=io.BytesIO(newb), | |
| # path_in_repo=RESULTS_FILE, | |
| # repo_id=REPO_ID, | |
| # repo_type="dataset", | |
| # token=HF_TOKEN, | |
| # commit_message="append" | |
| # ) | |
| # return gr.update(visible=True, value=f"โ Saved successfully!") | |
| # except Exception as e: | |
| # return gr.update( | |
| # visible=True, | |
| # value=f"โ Save failed: {type(e).__name__}: {e}\n" | |
| # f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" | |
| # ) | |
| def push(participant_id, video_id, action_score, phys_score, notes=""): | |
| if not participant_id or not participant_id.strip(): | |
| return gr.update(visible=True, value="โ Please enter your Participant ID before proceeding.") | |
| if not video_id or action_score is None or phys_score is None: | |
| return gr.update(visible=True, value="โ Fill out all fields.") | |
| try: | |
| old = _read_csv_bytes() | |
| row = [ | |
| datetime.utcnow().isoformat(), | |
| participant_id.strip(), | |
| video_id, | |
| float(action_score), | |
| float(phys_score), | |
| notes or "" | |
| ] | |
| newb = _append(old, row) | |
| if not REPO_ID: | |
| return gr.update(visible=True, value="โ RESULTS_REPO is not set.") | |
| if not HF_TOKEN: | |
| return gr.update(visible=True, value="โ HF_TOKEN is missing. Set a write token for the dataset repo.") | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(newb), | |
| path_in_repo=RESULTS_FILE, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="append" | |
| ) | |
| return gr.update(visible=True, value=f"โ Saved successfully!") | |
| except Exception as e: | |
| return gr.update( | |
| visible=True, | |
| value=f"โ Save failed: {type(e).__name__}: {e}\n" | |
| f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" | |
| ) | |
| def _extract_action(v): | |
| if "action" in v and v["action"]: | |
| return v["action"] | |
| raw = v.get("id", "") | |
| return raw.split("__")[0].split(".")[0] | |
| def pick_one(): | |
| v = random.choice(V) | |
| return v["url"], _extract_action(v) | |
| def _progress_html(done, total): | |
| pct = int(100 * done / max(1, total)) | |
| return f""" | |
| <div style="border:1px solid #ddd; height:20px; border-radius:6px; overflow:hidden; margin-top:6px;"> | |
| <div style="height:100%; width:{pct}%; background:#3b82f6;"></div> | |
| </div> | |
| <div style="font-size:12px; margin-top:4px;">{done} / {total}</div> | |
| """ | |
| def _build_order_with_anchor(total:int, anchor_idx:int, repeats:int, pool_size:int, min_gap:int=1): | |
| """ | |
| total: TOTAL_PER_PARTICIPANT (e.g., 30) | |
| anchor_idx: index of the anchor video in V (0 for first item) | |
| repeats: how many times to show anchor (e.g., 5) | |
| pool_size: len(V) | |
| min_gap: ์ต์ ๊ฐ๊ฒฉ(์ธ์ ๊ธ์ง => 1) | |
| return: list of indices (length=total) | |
| """ | |
| assert repeats <= total, "repeats must be <= total" | |
| assert pool_size >= 1, "videos pool must be non-empty" | |
| # 1) ๋ค๋ฅธ ๋น๋์ค 25๊ฐ(์ค๋ณต ์์ด) ๋ฝ๊ธฐ | |
| others_needed = total - repeats | |
| # anchor๋ฅผ ์ ์ธํ ํ๋ณด ์ธ๋ฑ์ค | |
| candidates = list(range(1, pool_size)) if anchor_idx == 0 else [i for i in range(pool_size) if i != anchor_idx] | |
| if len(candidates) < others_needed: | |
| raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") | |
| others = random.sample(candidates, k=others_needed) | |
| # 2) ๊ธฐ๋ณธ ์ํ์ค(others)๋ฅผ ๋ฌด์์๋ก ์๊ธฐ | |
| random.shuffle(others) | |
| # 3) ์ต์ปค๋ฅผ min_gap๋ฅผ ๋ง์กฑํ๋๋ก ์ฝ์ ํ ์์น ์ ์ | |
| # 30๊ฐ๋ฅผ 5๊ตฌ๊ฐ์ผ๋ก ๋๋ , ๊ฐ ๊ตฌ๊ฐ ๋ด์์ ์ถฉ๋ ๋ ๋๊ฒ ๋ฐฐ์น | |
| # (๊ฐ๋จํ๊ณ ์์ ์ ์ธ ๋ฐฉ์) | |
| seq = others[:] # ๊ธธ์ด=25 | |
| anchor_positions = [] | |
| segment = total // repeats # 30//5 = 6 | |
| for k in range(repeats): | |
| # ๊ฐ ๊ตฌ๊ฐ [k*segment, (k+1)*segment) ์์์ ํ๋ณด ์์น๋ฅผ ๊ณ ๋ฆ | |
| lo = k * segment | |
| hi = (k + 1) * segment if k < repeats - 1 else total # ๋ง์ง๋ง์ ๋๊น์ง | |
| # ๊ฒฝ๊ณ ๋ด ์์ ์คํ์ ์ ํ (์ฌ์ ๋ฅผ ๋๊ณ ์ถฉ๋์ ํผํจ) | |
| candidate_pos = random.randrange(lo, hi) | |
| # ์ธ์ ๊ธ์ง ๋ณด์ : ์ด๋ฏธ ๋ฐฐ์ ๋ anchor ์์น๋ค๊ณผ์ ๊ฑฐ๋ฆฌ๊ฐ min_gap ์ด์ ๋๋๋ก ์กฐ์ | |
| # ํ์ ์ ์ข์ฐ๋ก ๊ทผ์ ํ ๋น ์ฌ๋กฏ ํ์ | |
| def ok(pos): | |
| return all(abs(pos - p) >= min_gap + 1 for p in anchor_positions) # ์ฐ์๊ธ์ง => ๊ฑฐ๋ฆฌ >= 2 | |
| # ๊ทผ๋ฐฉ ํ์ ํญ | |
| found = None | |
| for delta in range(0, segment): # ๊ตฌ๊ฐ ํฌ๊ธฐ ๋ด์์ ํ์ | |
| # ์ข/์ฐ ๋ฒ๊ฐ์๊ฐ๋ฉฐ ํ๋ณด ์๋ | |
| for sign in (+1, -1): | |
| pos = candidate_pos + sign * delta | |
| if 0 <= pos < total and ok(pos): | |
| found = pos | |
| break | |
| if found is not None: | |
| break | |
| if found is None: | |
| # ์ตํ: 0..total-1 ๋ฒ์์์ ์๋ฌด ๋ฐ๋ ์ถฉ๋ ์๋ ๊ณณ ์ฐพ๊ธฐ | |
| for pos in range(total): | |
| if ok(pos): | |
| found = pos | |
| break | |
| if found is None: | |
| raise RuntimeError("Failed to place anchor without adjacency. Try different strategy or loosen min_gap.") | |
| anchor_positions.append(found) | |
| # 4) others๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ธธ์ด total์ ๋น ์ํ์ค๋ฅผ ๋ง๋ค๊ณ anchor๋ฅผ ์ฃผ์ | |
| # ์ฐ์ ๋น ๋ฆฌ์คํธ๋ฅผ ๋ง๋ค๊ณ anchor ์์น๋ฅผ ์ฑ์ด ํ, ๋๋จธ์ง๋ฅผ others๋ก ์ฑ์ | |
| result = [None] * total | |
| for pos in anchor_positions: | |
| result[pos] = anchor_idx | |
| # others ํฌ์ธํฐ | |
| j = 0 | |
| for i in range(total): | |
| if result[i] is None: | |
| result[i] = others[j] | |
| j += 1 | |
| # ์์ ์ฒดํฌ | |
| assert len(result) == total | |
| # ์ธ์ anchor ์๋์ง ํ์ธ | |
| for i in range(1, total): | |
| assert not (result[i] == anchor_idx and result[i-1] == anchor_idx), "Adjacent anchors found." | |
| # anchor ๊ฐ์ ํ์ธ | |
| assert sum(1 for x in result if x == anchor_idx) == repeats, "Anchor count mismatch." | |
| return result | |
| # -------------------- Example videos (download to local cache) -------------------- | |
| EXAMPLES = { | |
| "BodyWeightSquats": { | |
| "real": "examples/BodyWeightSquats_real.mp4", | |
| "bad": "examples/BodyWeightSquats_bad.mp4", | |
| }, | |
| "WallPushUps": { | |
| "real": "examples/WallPushUps_real.mp4", | |
| "bad": "examples/WallPushUps_bad.mp4", | |
| }, | |
| } | |
| EX_CACHE = {} | |
| for cls, files in EXAMPLES.items(): | |
| EX_CACHE[cls] = {"real": None, "bad": None} | |
| for kind, fname in files.items(): | |
| try: | |
| EX_CACHE[cls][kind] = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=fname, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir="/tmp", | |
| local_dir_use_symlinks=False, | |
| ) | |
| except Exception as e: | |
| print(f"[WARN] example missing: {cls} {kind} -> {fname}: {e}") | |
| for cls in EX_CACHE: | |
| for kind in EX_CACHE[cls]: | |
| if EX_CACHE[cls][kind]: | |
| EX_CACHE[cls][kind] = ensure_muted_copy(EX_CACHE[cls][kind]) | |
| CLEAN_BG_CSS = r""" | |
| /* ์ ์ฒด ํ์ด์ง ๋ฐฐ๊ฒฝ์ ํฐ์์ผ๋ก (Spaces์ ํ์ ๋ฐฐ๊ฒฝ ์ ๊ฑฐ) */ | |
| html, body { background:#ffffff !important; } | |
| /* Intro/Eval ์น์ ์ ๋ํผ ๋ฐ์ค๋ง ํฌ๋ช ์ฒ๋ฆฌ (์ ๋ ฅ์ฐฝ/์ฌ๋ผ์ด๋ ๋ฑ ์ปจํธ๋กค์ ๋๋ ) */ | |
| #intro .gr-panel, #intro .gr-group, #intro .gr-box, #intro .gr-row, #intro .gr-column, | |
| #eval .gr-panel, #eval .gr-group, #eval .gr-box, #eval .gr-row, #eval .gr-column { | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| border-color: transparent !important; | |
| } | |
| /* ๋น๋์ค ์นด๋ ์ฃผ๋ณ ํด๋ฐ(์์ ํ์ ๋ฐ์ค)๋ ํฌ๋ช ํ๊ฒ */ | |
| #intro [data-testid="block-video"] .prose, | |
| #eval [data-testid="block-video"] .prose { | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| border-color: transparent !important; | |
| } | |
| """ | |
| # -------------------- UI -------------------- | |
| with gr.Blocks(css=CLEAN_BG_CSS) as demo: | |
| # Blocks ์, ์ด๋์๋ ํ ๋ฒ๋ง ์ถ๊ฐ(๊ถ์ฅ ์์น: Blocks ์์ ์งํ) | |
| gr.HTML(""" | |
| <script> | |
| (function(){ | |
| // ๋ชจ๋ ์ผ๋ฐ DOM + Shadow DOM ์์ <video>๊น์ง ์ฐพ์ ์์๊ฑฐ | |
| function eachNodeWithShadow(root, fn){ | |
| const walker = document.createTreeWalker(root, NodeFilter.SHOW_ELEMENT); | |
| fn(root); | |
| while (walker.nextNode()){ | |
| const el = walker.currentNode; | |
| fn(el); | |
| if (el.shadowRoot){ | |
| eachNodeWithShadow(el.shadowRoot, fn); | |
| } | |
| } | |
| } | |
| function muteVideoEl(v){ | |
| try{ | |
| // ์ฌ์ ์ ์ ๋ฐ๋์ muted ์์ฑ์ด ์์ด์ผ ๋ธ๋ผ์ฐ์ ๊ฐ ์๋ฆฌ ์ฐจ๋จ | |
| v.muted = true; | |
| v.volume = 0.0; | |
| v.setAttribute('muted',''); | |
| v.setAttribute('playsinline',''); | |
| v.setAttribute('preload','metadata'); | |
| // ์ฌ์ฉ์๊ฐ ๋ณผ๋ฅจ/์์๊ฑฐ๋ฅผ ๋ฐ๊ฟ๋ ๋ค์ 0์ผ๋ก | |
| if (!v._muteHooked){ | |
| v.addEventListener('volumechange', () => { | |
| if (!v.muted || v.volume > 0) { v.muted = true; v.volume = 0.0; } | |
| }); | |
| v.addEventListener('play', () => { | |
| v.muted = true; v.volume = 0.0; | |
| }); | |
| v._muteHooked = true; | |
| } | |
| }catch(e){} | |
| } | |
| function muteAll(root){ | |
| eachNodeWithShadow(root || document, (el)=>{ | |
| if (el.tagName && el.tagName.toLowerCase() === 'video'){ | |
| muteVideoEl(el); | |
| }else if (el.querySelectorAll){ | |
| el.querySelectorAll('video').forEach(muteVideoEl); | |
| } | |
| }); | |
| } | |
| // ์ด๊ธฐ ๋ ๋์์ | |
| muteAll(document); | |
| // ์ดํ DOM ๋ณํ ๊ฐ์(Shadow DOM ๋ด๋ถ ๋ณํ๋ ํฌ์ฐฉ) | |
| const obs = new MutationObserver(() => muteAll(document)); | |
| obs.observe(document, {subtree:true, childList:true, attributes:false}); | |
| })(); | |
| </script> | |
| """) | |
| order_state = gr.State(value=[]) # v4์์๋ value= ๊ถ์ฅ | |
| ptr_state = gr.State(value=0) | |
| cur_video_id = gr.State(value="") | |
| reward_code_state = gr.State(value="") # ์๋ฃ ์ ์์ฑํ ์ฝ๋ ์ ์ฅ(์ค๋ณต ์์ฑ ๋ฐฉ์ง) | |
| selected_action = gr.State(value=None) # ์์ง ์ ํ ์์ | |
| selected_phys = gr.State(value=None) # ์์ง ์ ํ ์์ | |
| # ------------------ PAGE 1: Intro + Examples ------------------ | |
| # page_intro = gr.Group(visible=True) | |
| page_intro = gr.Group(visible=True, elem_id="intro") | |
| with page_intro: | |
| # gr.Markdown("## ๐ฏ Action Consistency Human Evaluation") | |
| gr.Markdown("## ๐ฏ Human Evaluation: Action Consistency & Physical Plausibility") | |
| gr.Markdown(INSTRUCTION_MD) | |
| understood = gr.Checkbox(label="I have read and understand the task.", value=False) | |
| start_btn = gr.Button("Yes, start", variant="secondary", interactive=False) | |
| def _toggle_start(checked: bool): | |
| return gr.update(interactive=checked, variant=("primary" if checked else "secondary")) | |
| understood.change(_toggle_start, inputs=understood, outputs=start_btn) | |
| # Examples: Squats | |
| with gr.Group(): | |
| gr.Markdown("### Examples: BodyWeightSquats") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**Expected depiction of action**") | |
| gr.Video(value=EX_CACHE["BodyWeightSquats"]["real"], height=240, autoplay=False, elem_id="ex_squats_real",) | |
| with gr.Column(): | |
| gr.Markdown("**Poorly generated action**") | |
| gr.Video(value=EX_CACHE["BodyWeightSquats"]["bad"], height=240, autoplay=False) | |
| if not (EX_CACHE["BodyWeightSquats"]["real"] and EX_CACHE["BodyWeightSquats"]["bad"]): | |
| gr.Markdown("> โ ๏ธ Upload `examples/BodyWeightSquats_real.mp4` and `_bad.mp4` to show both samples.") | |
| # Examples: WallPushUps | |
| with gr.Group(): | |
| gr.Markdown("### Examples: WallPushUps") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**Expected depiction of action**") | |
| gr.Video(value=EX_CACHE["WallPushUps"]["real"], height=240, autoplay=False, elem_id="ex_wallpushups_real",) | |
| with gr.Column(): | |
| gr.Markdown("**Poorly generated action**") | |
| gr.Video(value=EX_CACHE["WallPushUps"]["bad"], height=240, autoplay=False) | |
| if not (EX_CACHE["WallPushUps"]["real"] and EX_CACHE["WallPushUps"]["bad"]): | |
| gr.Markdown("> โ ๏ธ Upload `examples/WallPushUps_real.mp4` and `_bad.mp4` to show both samples.") | |
| gr.HTML(""" | |
| <script> | |
| (function(){ | |
| const ids = [ | |
| "ex_squats_real", | |
| "ex_wallpushups_real", | |
| ]; | |
| function muteById(id){ | |
| const host = document.getElementById(id); | |
| if(!host) return; | |
| // gr.Video๋ Shadow DOM ๋ด๋ถ์ <video>๊ฐ ๋๋๋ง๋จ | |
| const roots = [host, host.shadowRoot].filter(Boolean); | |
| roots.forEach(root=>{ | |
| const vids = root.querySelectorAll('video'); | |
| vids.forEach(v=>{ | |
| try{ | |
| v.muted = true; | |
| v.volume = 0.0; | |
| v.setAttribute('muted',''); | |
| v.setAttribute('playsinline',''); | |
| // ์ฌ์ฉ์๊ฐ ๋ฐ๊ฟ๋ ์ฆ์ ์๋ณต | |
| if(!v._exMuteHooked){ | |
| v.addEventListener('volumechange', ()=>{ | |
| if(!v.muted || v.volume > 0){ v.muted = true; v.volume = 0.0; } | |
| }); | |
| v.addEventListener('play', ()=>{ | |
| v.muted = true; v.volume = 0.0; | |
| }); | |
| v._exMuteHooked = true; | |
| } | |
| }catch(e){} | |
| }); | |
| }); | |
| } | |
| function apply(){ | |
| ids.forEach(muteById); | |
| } | |
| // ์ด๊ธฐ + DOM๋ณ๊ฒฝ ์ ์ ์ฉ | |
| apply(); | |
| new MutationObserver(apply).observe(document, {subtree:true, childList:true}); | |
| })(); | |
| </script> | |
| """) | |
| # ------------------ PAGE 2: Evaluation ------------------ | |
| page_eval = gr.Group(visible=False, elem_id="eval") | |
| with page_eval: | |
| # PID ์ ๋ ฅ | |
| with gr.Row(): | |
| pid = gr.Textbox(label="Participant ID (required)", placeholder="e.g., Youngsun-2025/10/24") | |
| # ์ง์นจ(์๋ฌธ) + ๋น๋์ค + ์งํ๋ฐ / ์ค๋ฅธ์ชฝ์ ์ฌ๋ผ์ด๋ + Save&Next | |
| with gr.Row(): #equal_height=True | |
| with gr.Column(scale=1): | |
| gr.Markdown(INSTRUCTION_MD) # ๊ต์๋ ๋ฌธ๊ตฌ ๊ทธ๋๋ก | |
| video = gr.Video(label="Video", height=360) | |
| progress = gr.HTML(_progress_html(0, TOTAL_PER_PARTICIPANT)) | |
| with gr.Column(scale=1): | |
| action_tb = gr.Textbox(label="Expected action", interactive=False) | |
| # NEW: two separate sliders | |
| score_action = gr.Slider( | |
| minimum=0.0, maximum=10.0, step=0.1, value=5.0, | |
| label="Action Consistency (0.0 - 10.0)" | |
| ) | |
| score_phys = gr.Slider( | |
| minimum=0.0, maximum=10.0, step=0.1, value=5.0, | |
| label="Physical Plausibility (0.0 - 10.0)" | |
| ) | |
| save_next = gr.Button("๐พ Save & Next โถ", variant="secondary", interactive=False) | |
| status = gr.Markdown(visible=False) | |
| done_state = gr.State(0) | |
| # PID ์ ๋ ฅ์ ๋ฐ๋ผ Save&Next ํ ๊ธ | |
| def _toggle_by_pid(pid_text: str): | |
| enabled = bool(pid_text and pid_text.strip()) | |
| return gr.update(interactive=enabled, variant=("primary" if enabled else "secondary")) | |
| # pid.change(_toggle_by_pid, inputs=pid, outputs=save_next) | |
| def _on_action_release(val: float, pid_text: str, sel_p): | |
| # ์ฌ์ฉ์๊ฐ ์ก์ ์ฌ๋ผ์ด๋๋ฅผ ํ ๋ฒ์ด๋ผ๋ ๋์ผ๋ฉด ์ ํ ์๋ฃ | |
| return val, _recompute_save(pid_text, val, sel_p) | |
| def _on_phys_release(val: float, pid_text: str, sel_a): | |
| return val, _recompute_save(pid_text, sel_a, val) | |
| # release: ๋ง์ฐ์ค๋ฅผ ๋์ ๋ 1ํ ํ์ | |
| score_action.release(_on_action_release, | |
| inputs=[score_action, pid, selected_phys], | |
| outputs=[selected_action, save_next]) | |
| score_phys.release(_on_phys_release, | |
| inputs=[score_phys, pid, selected_action], | |
| outputs=[selected_phys, save_next]) | |
| # PID๊ฐ ๋ฐ๋๋ฉด ํ์ฌ ์ ํ ์ํ๋ก Save ๋ฒํผ ์ฌ๊ณ์ฐ | |
| pid.change(_recompute_save, | |
| inputs=[pid, selected_action, selected_phys], | |
| outputs=save_next) | |
| page_thanks = gr.Group(visible=False) | |
| with page_thanks: | |
| reward_msg = gr.Markdown(visible=False) | |
| reward_code_box = gr.Textbox(label="Your reward code (copy & paste)", interactive=False, visible=False) | |
| reward_pid_box = gr.Textbox(label="Your participant ID", interactive=False, visible=False) | |
| gr.Markdown("### Any comments (optional)") | |
| feedback_tb = gr.Textbox( | |
| label="Any comments (optional)", | |
| placeholder="Leave any feedback for the study organizers (optional).", | |
| lines=4 | |
| ) | |
| feedback_submit = gr.Button("Submit feedback") | |
| feedback_status = gr.Markdown(visible=False) | |
| # -------- ํ์ด์ง ์ ํ & ์ฒซ ๋ก๋ -------- | |
| ANCHOR_IDX = 0 # videos.json์ ๋งจ ์ฒซ ๋น๋์ค | |
| ANCHOR_REPEATS = 5 # ์ต์ปค 5ํ | |
| MIN_GAP = 1 # ์ต์ปค ์ฐ์ ๊ธ์ง(์ธ์ ๊ธ์ง) | |
| def _build_order_least_first_with_anchor(total:int, anchor_idx:int, repeats:int, min_gap:int=1): | |
| """ | |
| - results.csv๋ฅผ ์ฝ์ด video_id๋ณ ์นด์ดํธ๋ฅผ ๊ณ์ฐ | |
| - ์ต์ปค(์ฒซ ๋น๋์ค) 5ํ ํฌํจ, ์ฐ์ ๊ธ์ง | |
| - ๋๋จธ์ง๋ '๊ฐ์ฅ ์ ๊ฒ ํ๊ฐ๋ ์'์ผ๋ก ์ค๋ณต ์์ด ์ฑ์ | |
| """ | |
| assert repeats <= total | |
| N = len(V) | |
| assert N >= 1 | |
| # 0) id ๋งคํ | |
| def vid_of(i): return _get_video_id(V[i]) | |
| # 1) ํ์ฌ ๋์ ์นด์ดํธ ๋ก๋ | |
| counts = _load_eval_counts() | |
| # 2) ์ต์ปค ์ ์ธ ํ๋ณด(์ค๋ณต ์์ด) ์ ๋ ฌ: ์นด์ดํธ ์ค๋ฆ์ฐจ์, ๋๋ฅ ์ ๋๋ค ์ ํ | |
| anchor_vid = vid_of(anchor_idx) | |
| candidates = [i for i in range(N) if i != anchor_idx] | |
| # ๋๋ฅ ๋๋คํ๋ฅผ ์ํด ์ผ๋จ ์ ํ | |
| random.shuffle(candidates) | |
| candidates.sort(key=lambda i: counts.get(vid_of(i), 0)) | |
| others_needed = total - repeats | |
| if len(candidates) < others_needed: | |
| raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") | |
| others = candidates[:others_needed] # ์ค๋ณต ์์ด ์ ํ | |
| # 3) others๋ฅผ ๋ฒ ์ด์ค ์ํ์ค๋ก(๋๋ค ์ด์ง ์๊ธฐ) | |
| random.shuffle(others) | |
| # 4) ์ต์ปค๋ฅผ ๊ตฌ๊ฐ ๋ฐฐ์น(์ฐ์ ๊ธ์ง) | |
| seq = [None] * total | |
| segment = total // repeats if repeats > 0 else total | |
| anchor_positions = [] | |
| for k in range(repeats): | |
| lo = k * segment | |
| hi = (k + 1) * segment if k < repeats - 1 else total | |
| cand = random.randrange(lo, hi) | |
| def ok(pos): | |
| return all(abs(pos - p) >= (min_gap + 1) for p in anchor_positions) | |
| found = None | |
| for d in range(0, max(1, segment)): | |
| for sgn in (+1, -1): | |
| pos = cand + sgn * d | |
| if 0 <= pos < total and ok(pos): | |
| found = pos | |
| break | |
| if found is not None: | |
| break | |
| if found is None: | |
| # ๋ง์ง๋ง ์๋จ: ์ ์ฒด ํ์ | |
| for pos in range(total): | |
| if ok(pos): | |
| found = pos | |
| break | |
| if found is None: | |
| raise RuntimeError("Failed to place anchor without adjacency.") | |
| anchor_positions.append(found) | |
| for pos in anchor_positions: | |
| seq[pos] = anchor_idx | |
| # 5) ๋น ์๋ฆฌ๋ฅผ others๋ก ์ฑ์ฐ๊ธฐ | |
| j = 0 | |
| for i in range(total): | |
| if seq[i] is None: | |
| seq[i] = others[j] | |
| j += 1 | |
| # 6) ์์ ์ฒดํฌ | |
| assert sum(1 for x in seq if x == anchor_idx) == repeats | |
| for i in range(1, total): | |
| assert not (seq[i] == anchor_idx and seq[i-1] == anchor_idx), "Adjacent anchors found." | |
| return seq | |
| def _start_and_load_first(): | |
| total = TOTAL_PER_PARTICIPANT | |
| order = _build_order_least_first_with_anchor( | |
| total=total, | |
| anchor_idx=ANCHOR_IDX, | |
| repeats=ANCHOR_REPEATS, | |
| min_gap=MIN_GAP | |
| ) | |
| first_idx = order[0] | |
| v0 = V[first_idx] | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| v0["url"], | |
| _extract_action(v0), | |
| 5.0, # score_action default | |
| 5.0, # score_phys default โ NEW | |
| gr.update(visible=False, value=""), | |
| 0, | |
| _progress_html(0, TOTAL_PER_PARTICIPANT), | |
| order, | |
| 1, | |
| _get_video_id(v0), # cur_video_id | |
| "", | |
| None, # โฌ ๏ธ selected_action ์ด๊ธฐํ(= ์์ง ์ ํ ์๋จ) | |
| None, # โฌ ๏ธ selected_phys ์ด๊ธฐํ | |
| gr.update(interactive=False, variant="secondary"), # โฌ ๏ธ Save ๋ฒํผ ์ ๊ธ | |
| ) | |
| start_btn.click( | |
| _start_and_load_first, | |
| inputs=[], | |
| outputs=[ | |
| page_intro, page_eval, page_thanks, | |
| video, action_tb, | |
| score_action, score_phys, # <-- two sliders | |
| status, | |
| done_state, progress, order_state, ptr_state, cur_video_id, | |
| reward_code_state, | |
| selected_action, selected_phys, | |
| save_next, | |
| ] | |
| ) | |
| def save_and_next(participant_id, cur_vid, action_score, phys_score, | |
| done_cnt, order, ptr, reward_code): | |
| try: | |
| if not participant_id or not participant_id.strip(): | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=True, value="โ Please enter your Participant ID."), | |
| gr.update(), gr.update(), | |
| done_cnt, | |
| _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), | |
| 5.0, # reset action slider | |
| 5.0, # reset phys slider โ | |
| ptr, | |
| cur_vid, | |
| reward_code, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| None, # โฌ ๏ธ selected_action reset | |
| None, # โฌ ๏ธ selected_phys reset | |
| gr.update(interactive=False, variant="secondary"), # โฌ ๏ธ lock button | |
| ) | |
| # save both scores | |
| status_msg = push(participant_id, cur_vid, action_score, phys_score, "") | |
| new_done = int(done_cnt) + 1 | |
| if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order): | |
| code = reward_code.strip() or _gen_reward_code(participant_id, length=10) | |
| try: | |
| _persist_reward_code(participant_id, code, new_done) | |
| except Exception: | |
| pass | |
| thanks_text = ( | |
| "## ๐ Thank you so much!\n" | |
| "Your responses have been recorded. You may now close this window.\n\n" | |
| "**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment." | |
| ) | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| status_msg, | |
| None, | |
| "", | |
| TOTAL_PER_PARTICIPANT, | |
| _progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT), | |
| 5.0, # reset action | |
| 5.0, # reset phys | |
| len(order), | |
| cur_vid, | |
| code, | |
| gr.update(visible=True, value=thanks_text), | |
| gr.update(visible=True, value=code), | |
| gr.update(visible=True, value=participant_id.strip()), | |
| None, # โฌ ๏ธ selected_action reset | |
| None, # โฌ ๏ธ selected_phys reset | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| # next video | |
| next_idx = order[ptr] | |
| v = V[next_idx] | |
| next_vid = _get_video_id(v) | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| status_msg, | |
| v["url"], | |
| _extract_action(v), | |
| new_done, | |
| _progress_html(new_done, TOTAL_PER_PARTICIPANT), | |
| 5.0, # reset action | |
| 5.0, # reset phys | |
| ptr + 1, | |
| next_vid, | |
| reward_code, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| None, # โฌ ๏ธ selected_action reset | |
| None, # โฌ ๏ธ selected_phys reset | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| except Exception as e: | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=True, value=f"โ Error: {type(e).__name__}: {e}"), | |
| gr.update(), gr.update(), | |
| done_cnt, | |
| _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), | |
| 5.0, | |
| 5.0, | |
| ptr, | |
| cur_vid, | |
| reward_code, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| None, # โฌ ๏ธ reset on error too | |
| None, | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| save_next.click( | |
| save_and_next, | |
| inputs=[pid, cur_video_id, score_action, score_phys, done_state, order_state, ptr_state, reward_code_state], | |
| outputs=[ | |
| page_eval, page_thanks, | |
| status, video, action_tb, | |
| done_state, progress, score_action, score_phys, | |
| ptr_state, cur_video_id, | |
| reward_code_state, | |
| reward_msg, reward_code_box, reward_pid_box, | |
| selected_action, selected_phys, # โฌ ๏ธ ์ถ๊ฐ | |
| save_next, # โฌ ๏ธ ๋ฒํผ ์ํ ๊ฐฑ์ | |
| ] | |
| ) | |
| feedback_submit.click( | |
| push_final_feedback, | |
| # ์๋ฃ ํ page_thanks์์ ๋ณด์ฌ์ฃผ๋ participant ID ๋ฐ์ค๋ฅผ ์ฌ์ฉ (๊ฐ์ด ์ฑ์์ ธ ์์) | |
| inputs=[reward_pid_box, feedback_tb], | |
| outputs=feedback_status | |
| ) | |
| if __name__ == "__main__": | |
| # demo.launch() | |
| demo.launch(ssr_mode=False) | |