import os, io, csv, json, random from datetime import datetime import gradio as gr from huggingface_hub import HfApi, hf_hub_download import subprocess, pathlib, hashlib import secrets, time, string # 사용자가 '선택'을 완료한 값 (초기엔 None) selected_action = gr.State(None) selected_phys = gr.State(None) def _recompute_save(pid_text: str, sel_a, sel_p): ok = bool(pid_text and pid_text.strip()) and (sel_a is not None) and (sel_p is not None) return gr.update(interactive=ok, variant=("primary" if ok else "secondary")) REWARD_FILE = "reward_codes.csv" # 리워드 코드 기록용 파일 (HF dataset 안에 저장) def _read_codes_bytes(): try: p = hf_hub_download( repo_id=REPO_ID, filename=REWARD_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False ) return open(p, "rb").read() except Exception: return None def _append_code(old_bytes, row): s = io.StringIO() w = csv.writer(s) if not old_bytes: # 새 헤더 w.writerow(["ts_iso", "participant_id", "reward_code", "total_done"]) else: s.write(old_bytes.decode("utf-8", errors="ignore")) w.writerow(row) return s.getvalue().encode("utf-8") def _persist_reward_code(pid: str, code: str, total_done: int): """리워드 코드를 HF에 reward_codes.csv로 누적 저장(append).""" old = _read_codes_bytes() row = [datetime.utcnow().isoformat(), pid.strip(), code, int(total_done)] newb = _append_code(old, row) api.upload_file( path_or_fileobj=io.BytesIO(newb), path_in_repo=REWARD_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN, commit_message="append reward code" ) def _gen_reward_code(pid: str, length: int = 10, forbid_ambiguous: bool=True) -> str: """ 참가자에게 보여줄 랜덤 코드. 충돌 위험 매우 낮음. - PID + 시각 + 보안랜덤으로 시드 - 대문자/숫자 조합 """ alphabet = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" if forbid_ambiguous else string.ascii_uppercase + string.digits # secrets 기반 난수 + pid/time 섞어서 해시 비슷한 효과 rnd = secrets.token_hex(8) + pid + str(time.time_ns()) # 섞고 뽑기 rng = secrets.SystemRandom() return "".join(rng.choice(alphabet) for _ in range(length)) MUTED_CACHE_DIR = "/tmp/hf_video_cache_muted" pathlib.Path(MUTED_CACHE_DIR).mkdir(parents=True, exist_ok=True) def _sha1_8(s: str) -> str: return hashlib.sha1(s.encode("utf-8")).hexdigest()[:8] def ensure_muted_copy(src_path: str) -> str: """ 주어진 mp4에서 오디오 트랙을 제거(-an)한 무음 복사본을 캐시에 생성. 이미 있으면 그 파일 경로 반환. 실패하면 원본 경로 반환. """ if not src_path or not os.path.exists(src_path): return src_path out = os.path.join(MUTED_CACHE_DIR, _sha1_8(src_path) + ".mp4") if os.path.exists(out): return out try: subprocess.run( ["ffmpeg", "-y", "-i", src_path, "-c:v", "copy", "-an", out], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) return out if os.path.exists(out) else src_path except Exception: return src_path FEEDBACK_FILE = "final_feedback.csv" def _read_feedback_bytes(): try: p = hf_hub_download( repo_id=REPO_ID, filename=FEEDBACK_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False ) return open(p, "rb").read() except Exception: return None def _append_feedback(old_bytes, row): s = io.StringIO() w = csv.writer(s) if not old_bytes: # 최종 코멘트 전용 CSV 헤더 w.writerow(["ts_iso", "participant_id", "final_comment"]) else: s.write(old_bytes.decode("utf-8", errors="ignore")) w.writerow(row) return s.getvalue().encode("utf-8") def push_final_feedback(participant_id: str, comment: str): """마지막에 받는 자유 코멘트를 FEEDBACK_FILE로 저장.""" if not participant_id or not participant_id.strip(): return gr.update(visible=True, value="❗ Missing participant ID.") if comment is None or not str(comment).strip(): # 비어있으면 저장하지 않고 안내만 return gr.update(visible=True, value="ℹ️ No comment entered — nothing to submit.") try: old = _read_feedback_bytes() row = [datetime.utcnow().isoformat(), participant_id.strip(), comment.strip()] newb = _append_feedback(old, row) if not REPO_ID: return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") if not HF_TOKEN: return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") api.upload_file( path_or_fileobj=io.BytesIO(newb), path_in_repo=FEEDBACK_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN, commit_message="append final feedback" ) return gr.update(visible=True, value="✅ Thanks! Your feedback was submitted.") except Exception as e: return gr.update( visible=True, value=f"❌ Feedback save failed: {type(e).__name__}: {e}" ) # -------------------- Config -------------------- REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_eval") # 업로드한 리포와 일치 HF_TOKEN = os.getenv("HF_TOKEN") RESULTS_FILE = "results.csv" TOTAL_PER_PARTICIPANT = 30 # 목표 평가 개수(세션 기준) # videos.json 예시: {"url": "...mp4", "id": "BodyWeightSquats__XXXX.mp4", "action": "BodyWeightSquats"} with open("videos.json", "r", encoding="utf-8") as f: V = json.load(f) api = HfApi() # 교수님 지침(그대로, 굵게 처리 포함) # INSTRUCTION_MD = """ # **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate how well the person’s action in the AI-generated video matches the action specified as "**expected action**". Some things to keep in mind: # - The generated video should **capture** the expected action **throughout the video**. # - Try to **focus only** on the expected action and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. # - You will be **paid** once **all the videos are viewed and rated**. # """ INSTRUCTION_MD = """ **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate: 1. **Action Consistency** - how well the person’s action matches the action specified as the "**Expected action**". 2. **Physical Plausibility** - how natural and physically possible the motion looks. Some things to keep in mind: - Try to **focus only** on the expected action and motion quality, and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. - For **physical plausibility**, look for **smooth and realistic** motion without impossible poses, missing limbs, or extreme stretching. - Action consistency and physical plausibility are **not mutually exclusive** with each other. - **Physically plausible motion does not imply correct depiction of action.** - **A video cannot portray action consistency if it has physically impossible movements.** - **0: indicates really poor depiction, 10: represents perfect, realistic depiction** - The **Save & Next** button will be enabled **only after you have clicked or adjusted both sliders at least once**. - You will be **paid** once **all the videos are viewed and rated**. """ # -------------------- Helper funcs -------------------- def _load_eval_counts(): """ Hugging Face dataset의 results.csv를 읽어 video_id별 평가 개수(dict)를 반환. 없으면 0으로 초기화. """ # 모든 id를 0으로 초기화 counts = {} for v in V: vid = _get_video_id(v) counts[vid] = 0 b = _read_csv_bytes() if not b: return counts s = io.StringIO(b.decode("utf-8", errors="ignore")) r = csv.reader(s) rows = list(r) if not rows: return counts # 헤더 파악 header = rows[0] body = rows[1:] if header and ("video_id" in header or "overall" in header) else rows vid_col = None if header and "video_id" in header: vid_col = header.index("video_id") for row in body: try: vid = row[vid_col] if vid_col is not None else row[2] # 기본 포맷: ts, pid, video_id, overall, notes if vid in counts: counts[vid] += 1 except Exception: continue return counts def _get_video_id(v: dict) -> str: if "id" in v and v["id"]: return v["id"] # id가 없으면 URL 파일명으로 대체 return os.path.basename(v.get("url", "")) def _read_csv_bytes(): try: p = hf_hub_download( repo_id=REPO_ID, filename=RESULTS_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False ) return open(p, "rb").read() except Exception: return None # def _append(old_bytes, row): # s = io.StringIO() # w = csv.writer(s) # if not old_bytes: # # ✅ 새 헤더 # w.writerow(["ts_iso", "participant_id", "video_id", "overall", "notes"]) # else: # s.write(old_bytes.decode("utf-8", errors="ignore")) # w.writerow(row) # return s.getvalue().encode("utf-8") def _append(old_bytes, row): s = io.StringIO() w = csv.writer(s) if not old_bytes: # ✅ new header with two scores w.writerow(["ts_iso", "participant_id", "video_id", "action_consistency", "physical_plausibility", "notes"]) else: s.write(old_bytes.decode("utf-8", errors="ignore")) w.writerow(row) return s.getvalue().encode("utf-8") # def push(participant_id, video_id, score, notes=""): # if not participant_id or not participant_id.strip(): # return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.") # if not video_id or score is None: # return gr.update(visible=True, value="❗ Fill out all fields.") # try: # old = _read_csv_bytes() # row = [ # datetime.utcnow().isoformat(), # participant_id.strip(), # video_id, # ✅ action 대신 video_id 저장 # float(score), # overall # notes or "" # ] # newb = _append(old, row) # if not REPO_ID: # return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") # if not HF_TOKEN: # return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") # api.upload_file( # path_or_fileobj=io.BytesIO(newb), # path_in_repo=RESULTS_FILE, # repo_id=REPO_ID, # repo_type="dataset", # token=HF_TOKEN, # commit_message="append" # ) # return gr.update(visible=True, value=f"✅ Saved successfully!") # except Exception as e: # return gr.update( # visible=True, # value=f"❌ Save failed: {type(e).__name__}: {e}\n" # f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" # ) def push(participant_id, video_id, action_score, phys_score, notes=""): if not participant_id or not participant_id.strip(): return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.") if not video_id or action_score is None or phys_score is None: return gr.update(visible=True, value="❗ Fill out all fields.") try: old = _read_csv_bytes() row = [ datetime.utcnow().isoformat(), participant_id.strip(), video_id, float(action_score), float(phys_score), notes or "" ] newb = _append(old, row) if not REPO_ID: return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") if not HF_TOKEN: return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") api.upload_file( path_or_fileobj=io.BytesIO(newb), path_in_repo=RESULTS_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN, commit_message="append" ) return gr.update(visible=True, value=f"✅ Saved successfully!") except Exception as e: return gr.update( visible=True, value=f"❌ Save failed: {type(e).__name__}: {e}\n" f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" ) def _extract_action(v): if "action" in v and v["action"]: return v["action"] raw = v.get("id", "") return raw.split("__")[0].split(".")[0] def pick_one(): v = random.choice(V) return v["url"], _extract_action(v) def _progress_html(done, total): pct = int(100 * done / max(1, total)) return f"""
{done} / {total}
""" def _build_order_with_anchor(total:int, anchor_idx:int, repeats:int, pool_size:int, min_gap:int=1): """ total: TOTAL_PER_PARTICIPANT (e.g., 30) anchor_idx: index of the anchor video in V (0 for first item) repeats: how many times to show anchor (e.g., 5) pool_size: len(V) min_gap: 최소 간격(인접 금지 => 1) return: list of indices (length=total) """ assert repeats <= total, "repeats must be <= total" assert pool_size >= 1, "videos pool must be non-empty" # 1) 다른 비디오 25개(중복 없이) 뽑기 others_needed = total - repeats # anchor를 제외한 후보 인덱스 candidates = list(range(1, pool_size)) if anchor_idx == 0 else [i for i in range(pool_size) if i != anchor_idx] if len(candidates) < others_needed: raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") others = random.sample(candidates, k=others_needed) # 2) 기본 시퀀스(others)를 무작위로 섞기 random.shuffle(others) # 3) 앵커를 min_gap를 만족하도록 삽입할 위치 선정 # 30개를 5구간으로 나눠, 각 구간 내에서 충돌 덜 나게 배치 # (간단하고 안정적인 방식) seq = others[:] # 길이=25 anchor_positions = [] segment = total // repeats # 30//5 = 6 for k in range(repeats): # 각 구간 [k*segment, (k+1)*segment) 안에서 후보 위치를 고름 lo = k * segment hi = (k + 1) * segment if k < repeats - 1 else total # 마지막은 끝까지 # 경계 내 임의 오프셋 선택 (여유를 두고 충돌을 피함) candidate_pos = random.randrange(lo, hi) # 인접 금지 보정: 이미 배정된 anchor 위치들과의 거리가 min_gap 이상 되도록 조정 # 필요 시 좌우로 근접한 빈 슬롯 탐색 def ok(pos): return all(abs(pos - p) >= min_gap + 1 for p in anchor_positions) # 연속금지 => 거리 >= 2 # 근방 탐색 폭 found = None for delta in range(0, segment): # 구간 크기 내에서 탐색 # 좌/우 번갈아가며 후보 시도 for sign in (+1, -1): pos = candidate_pos + sign * delta if 0 <= pos < total and ok(pos): found = pos break if found is not None: break if found is None: # 최후: 0..total-1 범위에서 아무 데나 충돌 없는 곳 찾기 for pos in range(total): if ok(pos): found = pos break if found is None: raise RuntimeError("Failed to place anchor without adjacency. Try different strategy or loosen min_gap.") anchor_positions.append(found) # 4) others를 기반으로 길이 total의 빈 시퀀스를 만들고 anchor를 주입 # 우선 빈 리스트를 만들고 anchor 위치를 채운 후, 나머지를 others로 채움 result = [None] * total for pos in anchor_positions: result[pos] = anchor_idx # others 포인터 j = 0 for i in range(total): if result[i] is None: result[i] = others[j] j += 1 # 안전체크 assert len(result) == total # 인접 anchor 없는지 확인 for i in range(1, total): assert not (result[i] == anchor_idx and result[i-1] == anchor_idx), "Adjacent anchors found." # anchor 개수 확인 assert sum(1 for x in result if x == anchor_idx) == repeats, "Anchor count mismatch." return result # -------------------- Example videos (download to local cache) -------------------- EXAMPLES = { "BodyWeightSquats": { "real": "examples/BodyWeightSquats_real.mp4", "bad": "examples/BodyWeightSquats_bad.mp4", }, "WallPushUps": { "real": "examples/WallPushUps_real.mp4", "bad": "examples/WallPushUps_bad.mp4", }, } EX_CACHE = {} for cls, files in EXAMPLES.items(): EX_CACHE[cls] = {"real": None, "bad": None} for kind, fname in files.items(): try: EX_CACHE[cls][kind] = hf_hub_download( repo_id=REPO_ID, filename=fname, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False, ) except Exception as e: print(f"[WARN] example missing: {cls} {kind} -> {fname}: {e}") for cls in EX_CACHE: for kind in EX_CACHE[cls]: if EX_CACHE[cls][kind]: EX_CACHE[cls][kind] = ensure_muted_copy(EX_CACHE[cls][kind]) CLEAN_BG_CSS = r""" /* 전체 페이지 배경을 흰색으로 (Spaces의 회색 배경 제거) */ html, body { background:#ffffff !important; } /* Intro/Eval 섹션의 래퍼 박스만 투명 처리 (입력창/슬라이더 등 컨트롤은 놔둠) */ #intro .gr-panel, #intro .gr-group, #intro .gr-box, #intro .gr-row, #intro .gr-column, #eval .gr-panel, #eval .gr-group, #eval .gr-box, #eval .gr-row, #eval .gr-column { background: transparent !important; box-shadow: none !important; border-color: transparent !important; } /* 비디오 카드 주변 툴바(작은 회색 박스)도 투명하게 */ #intro [data-testid="block-video"] .prose, #eval [data-testid="block-video"] .prose { background: transparent !important; box-shadow: none !important; border-color: transparent !important; } """ # -------------------- UI -------------------- with gr.Blocks(css=CLEAN_BG_CSS) as demo: # Blocks 안, 어디서든 한 번만 추가(권장 위치: Blocks 시작 직후) gr.HTML(""" """) order_state = gr.State(value=[]) # v4에서는 value= 권장 ptr_state = gr.State(value=0) cur_video_id = gr.State(value="") reward_code_state = gr.State(value="") # 완료 시 생성한 코드 저장(중복 생성 방지) selected_action = gr.State(value=None) # 아직 선택 없음 selected_phys = gr.State(value=None) # 아직 선택 없음 # ------------------ PAGE 1: Intro + Examples ------------------ # page_intro = gr.Group(visible=True) page_intro = gr.Group(visible=True, elem_id="intro") with page_intro: # gr.Markdown("## 🎯 Action Consistency Human Evaluation") gr.Markdown("## 🎯 Human Evaluation: Action Consistency & Physical Plausibility") gr.Markdown(INSTRUCTION_MD) understood = gr.Checkbox(label="I have read and understand the task.", value=False) start_btn = gr.Button("Yes, start", variant="secondary", interactive=False) def _toggle_start(checked: bool): return gr.update(interactive=checked, variant=("primary" if checked else "secondary")) understood.change(_toggle_start, inputs=understood, outputs=start_btn) # Examples: Squats with gr.Group(): gr.Markdown("### Examples: BodyWeightSquats") with gr.Row(): with gr.Column(): gr.Markdown("**Expected depiction of action**") gr.Video(value=EX_CACHE["BodyWeightSquats"]["real"], height=240, autoplay=False, elem_id="ex_squats_real",) with gr.Column(): gr.Markdown("**Poorly generated action**") gr.Video(value=EX_CACHE["BodyWeightSquats"]["bad"], height=240, autoplay=False) if not (EX_CACHE["BodyWeightSquats"]["real"] and EX_CACHE["BodyWeightSquats"]["bad"]): gr.Markdown("> ⚠️ Upload `examples/BodyWeightSquats_real.mp4` and `_bad.mp4` to show both samples.") # Examples: WallPushUps with gr.Group(): gr.Markdown("### Examples: WallPushUps") with gr.Row(): with gr.Column(): gr.Markdown("**Expected depiction of action**") gr.Video(value=EX_CACHE["WallPushUps"]["real"], height=240, autoplay=False, elem_id="ex_wallpushups_real",) with gr.Column(): gr.Markdown("**Poorly generated action**") gr.Video(value=EX_CACHE["WallPushUps"]["bad"], height=240, autoplay=False) if not (EX_CACHE["WallPushUps"]["real"] and EX_CACHE["WallPushUps"]["bad"]): gr.Markdown("> ⚠️ Upload `examples/WallPushUps_real.mp4` and `_bad.mp4` to show both samples.") gr.HTML(""" """) # ------------------ PAGE 2: Evaluation ------------------ page_eval = gr.Group(visible=False, elem_id="eval") with page_eval: # PID 입력 with gr.Row(): pid = gr.Textbox(label="Participant ID (required)", placeholder="e.g., Youngsun-2025/10/24") # 지침(원문) + 비디오 + 진행바 / 오른쪽에 슬라이더 + Save&Next with gr.Row(): #equal_height=True with gr.Column(scale=1): gr.Markdown(INSTRUCTION_MD) # 교수님 문구 그대로 video = gr.Video(label="Video", height=360) progress = gr.HTML(_progress_html(0, TOTAL_PER_PARTICIPANT)) with gr.Column(scale=1): action_tb = gr.Textbox(label="Expected action", interactive=False) # NEW: two separate sliders score_action = gr.Slider( minimum=0.0, maximum=10.0, step=0.1, value=5.0, label="Action Consistency (0.0 - 10.0)" ) score_phys = gr.Slider( minimum=0.0, maximum=10.0, step=0.1, value=5.0, label="Physical Plausibility (0.0 - 10.0)" ) save_next = gr.Button("💾 Save & Next ▶", variant="secondary", interactive=False) status = gr.Markdown(visible=False) done_state = gr.State(0) # PID 입력에 따라 Save&Next 토글 def _toggle_by_pid(pid_text: str): enabled = bool(pid_text and pid_text.strip()) return gr.update(interactive=enabled, variant=("primary" if enabled else "secondary")) # pid.change(_toggle_by_pid, inputs=pid, outputs=save_next) def _on_action_release(val: float, pid_text: str, sel_p): # 사용자가 액션 슬라이더를 한 번이라도 놓으면 선택 완료 return val, _recompute_save(pid_text, val, sel_p) def _on_phys_release(val: float, pid_text: str, sel_a): return val, _recompute_save(pid_text, sel_a, val) # release: 마우스를 놓을 때 1회 확정 score_action.release(_on_action_release, inputs=[score_action, pid, selected_phys], outputs=[selected_action, save_next]) score_phys.release(_on_phys_release, inputs=[score_phys, pid, selected_action], outputs=[selected_phys, save_next]) # PID가 바뀌면 현재 선택 상태로 Save 버튼 재계산 pid.change(_recompute_save, inputs=[pid, selected_action, selected_phys], outputs=save_next) page_thanks = gr.Group(visible=False) with page_thanks: reward_msg = gr.Markdown(visible=False) reward_code_box = gr.Textbox(label="Your reward code (copy & paste)", interactive=False, visible=False) reward_pid_box = gr.Textbox(label="Your participant ID", interactive=False, visible=False) gr.Markdown("### Any comments (optional)") feedback_tb = gr.Textbox( label="Any comments (optional)", placeholder="Leave any feedback for the study organizers (optional).", lines=4 ) feedback_submit = gr.Button("Submit feedback") feedback_status = gr.Markdown(visible=False) # -------- 페이지 전환 & 첫 로드 -------- ANCHOR_IDX = 0 # videos.json의 맨 첫 비디오 ANCHOR_REPEATS = 5 # 앵커 5회 MIN_GAP = 1 # 앵커 연속 금지(인접 금지) def _build_order_least_first_with_anchor(total:int, anchor_idx:int, repeats:int, min_gap:int=1): """ - results.csv를 읽어 video_id별 카운트를 계산 - 앵커(첫 비디오) 5회 포함, 연속 금지 - 나머지는 '가장 적게 평가된 순'으로 중복 없이 채움 """ assert repeats <= total N = len(V) assert N >= 1 # 0) id 매핑 def vid_of(i): return _get_video_id(V[i]) # 1) 현재 누적 카운트 로드 counts = _load_eval_counts() # 2) 앵커 제외 후보(중복 없이) 정렬: 카운트 오름차순, 동률은 랜덤 셔플 anchor_vid = vid_of(anchor_idx) candidates = [i for i in range(N) if i != anchor_idx] # 동률 랜덤화를 위해 일단 셔플 random.shuffle(candidates) candidates.sort(key=lambda i: counts.get(vid_of(i), 0)) others_needed = total - repeats if len(candidates) < others_needed: raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") others = candidates[:others_needed] # 중복 없이 선택 # 3) others를 베이스 시퀀스로(랜덤 살짝 섞기) random.shuffle(others) # 4) 앵커를 구간 배치(연속 금지) seq = [None] * total segment = total // repeats if repeats > 0 else total anchor_positions = [] for k in range(repeats): lo = k * segment hi = (k + 1) * segment if k < repeats - 1 else total cand = random.randrange(lo, hi) def ok(pos): return all(abs(pos - p) >= (min_gap + 1) for p in anchor_positions) found = None for d in range(0, max(1, segment)): for sgn in (+1, -1): pos = cand + sgn * d if 0 <= pos < total and ok(pos): found = pos break if found is not None: break if found is None: # 마지막 수단: 전체 탐색 for pos in range(total): if ok(pos): found = pos break if found is None: raise RuntimeError("Failed to place anchor without adjacency.") anchor_positions.append(found) for pos in anchor_positions: seq[pos] = anchor_idx # 5) 빈 자리를 others로 채우기 j = 0 for i in range(total): if seq[i] is None: seq[i] = others[j] j += 1 # 6) 안전 체크 assert sum(1 for x in seq if x == anchor_idx) == repeats for i in range(1, total): assert not (seq[i] == anchor_idx and seq[i-1] == anchor_idx), "Adjacent anchors found." return seq def _start_and_load_first(): total = TOTAL_PER_PARTICIPANT order = _build_order_least_first_with_anchor( total=total, anchor_idx=ANCHOR_IDX, repeats=ANCHOR_REPEATS, min_gap=MIN_GAP ) first_idx = order[0] v0 = V[first_idx] return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), v0["url"], _extract_action(v0), 5.0, # score_action default 5.0, # score_phys default ✅ NEW gr.update(visible=False, value=""), 0, _progress_html(0, TOTAL_PER_PARTICIPANT), order, 1, _get_video_id(v0), # cur_video_id "", None, # ⬅️ selected_action 초기화(= 아직 선택 안됨) None, # ⬅️ selected_phys 초기화 gr.update(interactive=False, variant="secondary"), # ⬅️ Save 버튼 잠금 ) start_btn.click( _start_and_load_first, inputs=[], outputs=[ page_intro, page_eval, page_thanks, video, action_tb, score_action, score_phys, # <-- two sliders status, done_state, progress, order_state, ptr_state, cur_video_id, reward_code_state, selected_action, selected_phys, save_next, ] ) def save_and_next(participant_id, cur_vid, action_score, phys_score, done_cnt, order, ptr, reward_code): try: if not participant_id or not participant_id.strip(): return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=True, value="❗ Please enter your Participant ID."), gr.update(), gr.update(), done_cnt, _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), 5.0, # reset action slider 5.0, # reset phys slider ✅ ptr, cur_vid, reward_code, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), None, # ⬅️ selected_action reset None, # ⬅️ selected_phys reset gr.update(interactive=False, variant="secondary"), # ⬅️ lock button ) # save both scores status_msg = push(participant_id, cur_vid, action_score, phys_score, "") new_done = int(done_cnt) + 1 if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order): code = reward_code.strip() or _gen_reward_code(participant_id, length=10) try: _persist_reward_code(participant_id, code, new_done) except Exception: pass thanks_text = ( "## 🎉 Thank you so much!\n" "Your responses have been recorded. You may now close this window.\n\n" "**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment." ) return ( gr.update(visible=False), gr.update(visible=True), status_msg, None, "", TOTAL_PER_PARTICIPANT, _progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT), 5.0, # reset action 5.0, # reset phys len(order), cur_vid, code, gr.update(visible=True, value=thanks_text), gr.update(visible=True, value=code), gr.update(visible=True, value=participant_id.strip()), None, # ⬅️ selected_action reset None, # ⬅️ selected_phys reset gr.update(interactive=False, variant="secondary"), ) # next video next_idx = order[ptr] v = V[next_idx] next_vid = _get_video_id(v) return ( gr.update(visible=True), gr.update(visible=False), status_msg, v["url"], _extract_action(v), new_done, _progress_html(new_done, TOTAL_PER_PARTICIPANT), 5.0, # reset action 5.0, # reset phys ptr + 1, next_vid, reward_code, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), None, # ⬅️ selected_action reset None, # ⬅️ selected_phys reset gr.update(interactive=False, variant="secondary"), ) except Exception as e: return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=True, value=f"❌ Error: {type(e).__name__}: {e}"), gr.update(), gr.update(), done_cnt, _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), 5.0, 5.0, ptr, cur_vid, reward_code, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), None, # ⬅️ reset on error too None, gr.update(interactive=False, variant="secondary"), ) save_next.click( save_and_next, inputs=[pid, cur_video_id, score_action, score_phys, done_state, order_state, ptr_state, reward_code_state], outputs=[ page_eval, page_thanks, status, video, action_tb, done_state, progress, score_action, score_phys, ptr_state, cur_video_id, reward_code_state, reward_msg, reward_code_box, reward_pid_box, selected_action, selected_phys, # ⬅️ 추가 save_next, # ⬅️ 버튼 상태 갱신 ] ) feedback_submit.click( push_final_feedback, # 완료 후 page_thanks에서 보여주는 participant ID 박스를 사용 (값이 채워져 있음) inputs=[reward_pid_box, feedback_tb], outputs=feedback_status ) if __name__ == "__main__": # demo.launch() demo.launch(ssr_mode=False)