import os, io, csv, json, random
from datetime import datetime
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
import subprocess, pathlib, hashlib
import secrets, time, string
# 사용자가 '선택'을 완료한 값 (초기엔 None)
selected_action = gr.State(None)
selected_phys = gr.State(None)
def _recompute_save(pid_text: str, sel_a, sel_p):
ok = bool(pid_text and pid_text.strip()) and (sel_a is not None) and (sel_p is not None)
return gr.update(interactive=ok, variant=("primary" if ok else "secondary"))
REWARD_FILE = "reward_codes.csv" # 리워드 코드 기록용 파일 (HF dataset 안에 저장)
def _read_codes_bytes():
try:
p = hf_hub_download(
repo_id=REPO_ID, filename=REWARD_FILE, repo_type="dataset",
token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False
)
return open(p, "rb").read()
except Exception:
return None
def _append_code(old_bytes, row):
s = io.StringIO()
w = csv.writer(s)
if not old_bytes:
# 새 헤더
w.writerow(["ts_iso", "participant_id", "reward_code", "total_done"])
else:
s.write(old_bytes.decode("utf-8", errors="ignore"))
w.writerow(row)
return s.getvalue().encode("utf-8")
def _persist_reward_code(pid: str, code: str, total_done: int):
"""리워드 코드를 HF에 reward_codes.csv로 누적 저장(append)."""
old = _read_codes_bytes()
row = [datetime.utcnow().isoformat(), pid.strip(), code, int(total_done)]
newb = _append_code(old, row)
api.upload_file(
path_or_fileobj=io.BytesIO(newb),
path_in_repo=REWARD_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="append reward code"
)
def _gen_reward_code(pid: str, length: int = 10, forbid_ambiguous: bool=True) -> str:
"""
참가자에게 보여줄 랜덤 코드. 충돌 위험 매우 낮음.
- PID + 시각 + 보안랜덤으로 시드
- 대문자/숫자 조합
"""
alphabet = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" if forbid_ambiguous else string.ascii_uppercase + string.digits
# secrets 기반 난수 + pid/time 섞어서 해시 비슷한 효과
rnd = secrets.token_hex(8) + pid + str(time.time_ns())
# 섞고 뽑기
rng = secrets.SystemRandom()
return "".join(rng.choice(alphabet) for _ in range(length))
MUTED_CACHE_DIR = "/tmp/hf_video_cache_muted"
pathlib.Path(MUTED_CACHE_DIR).mkdir(parents=True, exist_ok=True)
def _sha1_8(s: str) -> str:
return hashlib.sha1(s.encode("utf-8")).hexdigest()[:8]
def ensure_muted_copy(src_path: str) -> str:
"""
주어진 mp4에서 오디오 트랙을 제거(-an)한 무음 복사본을 캐시에 생성.
이미 있으면 그 파일 경로 반환.
실패하면 원본 경로 반환.
"""
if not src_path or not os.path.exists(src_path):
return src_path
out = os.path.join(MUTED_CACHE_DIR, _sha1_8(src_path) + ".mp4")
if os.path.exists(out):
return out
try:
subprocess.run(
["ffmpeg", "-y", "-i", src_path, "-c:v", "copy", "-an", out],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
return out if os.path.exists(out) else src_path
except Exception:
return src_path
FEEDBACK_FILE = "final_feedback.csv"
def _read_feedback_bytes():
try:
p = hf_hub_download(
repo_id=REPO_ID, filename=FEEDBACK_FILE, repo_type="dataset",
token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False
)
return open(p, "rb").read()
except Exception:
return None
def _append_feedback(old_bytes, row):
s = io.StringIO()
w = csv.writer(s)
if not old_bytes:
# 최종 코멘트 전용 CSV 헤더
w.writerow(["ts_iso", "participant_id", "final_comment"])
else:
s.write(old_bytes.decode("utf-8", errors="ignore"))
w.writerow(row)
return s.getvalue().encode("utf-8")
def push_final_feedback(participant_id: str, comment: str):
"""마지막에 받는 자유 코멘트를 FEEDBACK_FILE로 저장."""
if not participant_id or not participant_id.strip():
return gr.update(visible=True, value="❗ Missing participant ID.")
if comment is None or not str(comment).strip():
# 비어있으면 저장하지 않고 안내만
return gr.update(visible=True, value="ℹ️ No comment entered — nothing to submit.")
try:
old = _read_feedback_bytes()
row = [datetime.utcnow().isoformat(), participant_id.strip(), comment.strip()]
newb = _append_feedback(old, row)
if not REPO_ID:
return gr.update(visible=True, value="❗ RESULTS_REPO is not set.")
if not HF_TOKEN:
return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.")
api.upload_file(
path_or_fileobj=io.BytesIO(newb),
path_in_repo=FEEDBACK_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="append final feedback"
)
return gr.update(visible=True, value="✅ Thanks! Your feedback was submitted.")
except Exception as e:
return gr.update(
visible=True,
value=f"❌ Feedback save failed: {type(e).__name__}: {e}"
)
# -------------------- Config --------------------
REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_eval") # 업로드한 리포와 일치
HF_TOKEN = os.getenv("HF_TOKEN")
RESULTS_FILE = "results.csv"
TOTAL_PER_PARTICIPANT = 30 # 목표 평가 개수(세션 기준)
# videos.json 예시: {"url": "...mp4", "id": "BodyWeightSquats__XXXX.mp4", "action": "BodyWeightSquats"}
with open("videos.json", "r", encoding="utf-8") as f:
V = json.load(f)
api = HfApi()
# 교수님 지침(그대로, 굵게 처리 포함)
# INSTRUCTION_MD = """
# **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate how well the person’s action in the AI-generated video matches the action specified as "**expected action**". Some things to keep in mind:
# - The generated video should **capture** the expected action **throughout the video**.
# - Try to **focus only** on the expected action and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**.
# - You will be **paid** once **all the videos are viewed and rated**.
# """
INSTRUCTION_MD = """
**Task:** You will watch a series of **AI-generated videos**.
For each video, your job is to rate:
1. **Action Consistency** - how well the person’s action matches the action specified as the "**Expected action**".
2. **Physical Plausibility** - how natural and physically possible the motion looks.
Some things to keep in mind:
- Try to **focus only** on the expected action and motion quality, and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**.
- For **physical plausibility**, look for **smooth and realistic** motion without impossible poses, missing limbs, or extreme stretching.
- Action consistency and physical plausibility are **not mutually exclusive** with each other.
- **Physically plausible motion does not imply correct depiction of action.**
- **A video cannot portray action consistency if it has physically impossible movements.**
- **0: indicates really poor depiction, 10: represents perfect, realistic depiction**
- The **Save & Next** button will be enabled **only after you have clicked or adjusted both sliders at least once**.
- You will be **paid** once **all the videos are viewed and rated**.
"""
# -------------------- Helper funcs --------------------
def _load_eval_counts():
"""
Hugging Face dataset의 results.csv를 읽어 video_id별 평가 개수(dict)를 반환.
없으면 0으로 초기화.
"""
# 모든 id를 0으로 초기화
counts = {}
for v in V:
vid = _get_video_id(v)
counts[vid] = 0
b = _read_csv_bytes()
if not b:
return counts
s = io.StringIO(b.decode("utf-8", errors="ignore"))
r = csv.reader(s)
rows = list(r)
if not rows:
return counts
# 헤더 파악
header = rows[0]
body = rows[1:] if header and ("video_id" in header or "overall" in header) else rows
vid_col = None
if header and "video_id" in header:
vid_col = header.index("video_id")
for row in body:
try:
vid = row[vid_col] if vid_col is not None else row[2] # 기본 포맷: ts, pid, video_id, overall, notes
if vid in counts:
counts[vid] += 1
except Exception:
continue
return counts
def _get_video_id(v: dict) -> str:
if "id" in v and v["id"]:
return v["id"]
# id가 없으면 URL 파일명으로 대체
return os.path.basename(v.get("url", ""))
def _read_csv_bytes():
try:
p = hf_hub_download(
repo_id=REPO_ID, filename=RESULTS_FILE, repo_type="dataset",
token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False
)
return open(p, "rb").read()
except Exception:
return None
# def _append(old_bytes, row):
# s = io.StringIO()
# w = csv.writer(s)
# if not old_bytes:
# # ✅ 새 헤더
# w.writerow(["ts_iso", "participant_id", "video_id", "overall", "notes"])
# else:
# s.write(old_bytes.decode("utf-8", errors="ignore"))
# w.writerow(row)
# return s.getvalue().encode("utf-8")
def _append(old_bytes, row):
s = io.StringIO()
w = csv.writer(s)
if not old_bytes:
# ✅ new header with two scores
w.writerow(["ts_iso", "participant_id", "video_id",
"action_consistency", "physical_plausibility", "notes"])
else:
s.write(old_bytes.decode("utf-8", errors="ignore"))
w.writerow(row)
return s.getvalue().encode("utf-8")
# def push(participant_id, video_id, score, notes=""):
# if not participant_id or not participant_id.strip():
# return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.")
# if not video_id or score is None:
# return gr.update(visible=True, value="❗ Fill out all fields.")
# try:
# old = _read_csv_bytes()
# row = [
# datetime.utcnow().isoformat(),
# participant_id.strip(),
# video_id, # ✅ action 대신 video_id 저장
# float(score), # overall
# notes or ""
# ]
# newb = _append(old, row)
# if not REPO_ID:
# return gr.update(visible=True, value="❗ RESULTS_REPO is not set.")
# if not HF_TOKEN:
# return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.")
# api.upload_file(
# path_or_fileobj=io.BytesIO(newb),
# path_in_repo=RESULTS_FILE,
# repo_id=REPO_ID,
# repo_type="dataset",
# token=HF_TOKEN,
# commit_message="append"
# )
# return gr.update(visible=True, value=f"✅ Saved successfully!")
# except Exception as e:
# return gr.update(
# visible=True,
# value=f"❌ Save failed: {type(e).__name__}: {e}\n"
# f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing"
# )
def push(participant_id, video_id, action_score, phys_score, notes=""):
if not participant_id or not participant_id.strip():
return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.")
if not video_id or action_score is None or phys_score is None:
return gr.update(visible=True, value="❗ Fill out all fields.")
try:
old = _read_csv_bytes()
row = [
datetime.utcnow().isoformat(),
participant_id.strip(),
video_id,
float(action_score),
float(phys_score),
notes or ""
]
newb = _append(old, row)
if not REPO_ID:
return gr.update(visible=True, value="❗ RESULTS_REPO is not set.")
if not HF_TOKEN:
return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.")
api.upload_file(
path_or_fileobj=io.BytesIO(newb),
path_in_repo=RESULTS_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="append"
)
return gr.update(visible=True, value=f"✅ Saved successfully!")
except Exception as e:
return gr.update(
visible=True,
value=f"❌ Save failed: {type(e).__name__}: {e}\n"
f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing"
)
def _extract_action(v):
if "action" in v and v["action"]:
return v["action"]
raw = v.get("id", "")
return raw.split("__")[0].split(".")[0]
def pick_one():
v = random.choice(V)
return v["url"], _extract_action(v)
def _progress_html(done, total):
pct = int(100 * done / max(1, total))
return f"""
{done} / {total}
"""
def _build_order_with_anchor(total:int, anchor_idx:int, repeats:int, pool_size:int, min_gap:int=1):
"""
total: TOTAL_PER_PARTICIPANT (e.g., 30)
anchor_idx: index of the anchor video in V (0 for first item)
repeats: how many times to show anchor (e.g., 5)
pool_size: len(V)
min_gap: 최소 간격(인접 금지 => 1)
return: list of indices (length=total)
"""
assert repeats <= total, "repeats must be <= total"
assert pool_size >= 1, "videos pool must be non-empty"
# 1) 다른 비디오 25개(중복 없이) 뽑기
others_needed = total - repeats
# anchor를 제외한 후보 인덱스
candidates = list(range(1, pool_size)) if anchor_idx == 0 else [i for i in range(pool_size) if i != anchor_idx]
if len(candidates) < others_needed:
raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.")
others = random.sample(candidates, k=others_needed)
# 2) 기본 시퀀스(others)를 무작위로 섞기
random.shuffle(others)
# 3) 앵커를 min_gap를 만족하도록 삽입할 위치 선정
# 30개를 5구간으로 나눠, 각 구간 내에서 충돌 덜 나게 배치
# (간단하고 안정적인 방식)
seq = others[:] # 길이=25
anchor_positions = []
segment = total // repeats # 30//5 = 6
for k in range(repeats):
# 각 구간 [k*segment, (k+1)*segment) 안에서 후보 위치를 고름
lo = k * segment
hi = (k + 1) * segment if k < repeats - 1 else total # 마지막은 끝까지
# 경계 내 임의 오프셋 선택 (여유를 두고 충돌을 피함)
candidate_pos = random.randrange(lo, hi)
# 인접 금지 보정: 이미 배정된 anchor 위치들과의 거리가 min_gap 이상 되도록 조정
# 필요 시 좌우로 근접한 빈 슬롯 탐색
def ok(pos):
return all(abs(pos - p) >= min_gap + 1 for p in anchor_positions) # 연속금지 => 거리 >= 2
# 근방 탐색 폭
found = None
for delta in range(0, segment): # 구간 크기 내에서 탐색
# 좌/우 번갈아가며 후보 시도
for sign in (+1, -1):
pos = candidate_pos + sign * delta
if 0 <= pos < total and ok(pos):
found = pos
break
if found is not None:
break
if found is None:
# 최후: 0..total-1 범위에서 아무 데나 충돌 없는 곳 찾기
for pos in range(total):
if ok(pos):
found = pos
break
if found is None:
raise RuntimeError("Failed to place anchor without adjacency. Try different strategy or loosen min_gap.")
anchor_positions.append(found)
# 4) others를 기반으로 길이 total의 빈 시퀀스를 만들고 anchor를 주입
# 우선 빈 리스트를 만들고 anchor 위치를 채운 후, 나머지를 others로 채움
result = [None] * total
for pos in anchor_positions:
result[pos] = anchor_idx
# others 포인터
j = 0
for i in range(total):
if result[i] is None:
result[i] = others[j]
j += 1
# 안전체크
assert len(result) == total
# 인접 anchor 없는지 확인
for i in range(1, total):
assert not (result[i] == anchor_idx and result[i-1] == anchor_idx), "Adjacent anchors found."
# anchor 개수 확인
assert sum(1 for x in result if x == anchor_idx) == repeats, "Anchor count mismatch."
return result
# -------------------- Example videos (download to local cache) --------------------
EXAMPLES = {
"BodyWeightSquats": {
"real": "examples/BodyWeightSquats_real.mp4",
"bad": "examples/BodyWeightSquats_bad.mp4",
},
"WallPushUps": {
"real": "examples/WallPushUps_real.mp4",
"bad": "examples/WallPushUps_bad.mp4",
},
}
EX_CACHE = {}
for cls, files in EXAMPLES.items():
EX_CACHE[cls] = {"real": None, "bad": None}
for kind, fname in files.items():
try:
EX_CACHE[cls][kind] = hf_hub_download(
repo_id=REPO_ID,
filename=fname,
repo_type="dataset",
token=HF_TOKEN,
local_dir="/tmp",
local_dir_use_symlinks=False,
)
except Exception as e:
print(f"[WARN] example missing: {cls} {kind} -> {fname}: {e}")
for cls in EX_CACHE:
for kind in EX_CACHE[cls]:
if EX_CACHE[cls][kind]:
EX_CACHE[cls][kind] = ensure_muted_copy(EX_CACHE[cls][kind])
CLEAN_BG_CSS = r"""
/* 전체 페이지 배경을 흰색으로 (Spaces의 회색 배경 제거) */
html, body { background:#ffffff !important; }
/* Intro/Eval 섹션의 래퍼 박스만 투명 처리 (입력창/슬라이더 등 컨트롤은 놔둠) */
#intro .gr-panel, #intro .gr-group, #intro .gr-box, #intro .gr-row, #intro .gr-column,
#eval .gr-panel, #eval .gr-group, #eval .gr-box, #eval .gr-row, #eval .gr-column {
background: transparent !important;
box-shadow: none !important;
border-color: transparent !important;
}
/* 비디오 카드 주변 툴바(작은 회색 박스)도 투명하게 */
#intro [data-testid="block-video"] .prose,
#eval [data-testid="block-video"] .prose {
background: transparent !important;
box-shadow: none !important;
border-color: transparent !important;
}
"""
# -------------------- UI --------------------
with gr.Blocks(css=CLEAN_BG_CSS) as demo:
# Blocks 안, 어디서든 한 번만 추가(권장 위치: Blocks 시작 직후)
gr.HTML("""
""")
order_state = gr.State(value=[]) # v4에서는 value= 권장
ptr_state = gr.State(value=0)
cur_video_id = gr.State(value="")
reward_code_state = gr.State(value="") # 완료 시 생성한 코드 저장(중복 생성 방지)
selected_action = gr.State(value=None) # 아직 선택 없음
selected_phys = gr.State(value=None) # 아직 선택 없음
# ------------------ PAGE 1: Intro + Examples ------------------
# page_intro = gr.Group(visible=True)
page_intro = gr.Group(visible=True, elem_id="intro")
with page_intro:
# gr.Markdown("## 🎯 Action Consistency Human Evaluation")
gr.Markdown("## 🎯 Human Evaluation: Action Consistency & Physical Plausibility")
gr.Markdown(INSTRUCTION_MD)
understood = gr.Checkbox(label="I have read and understand the task.", value=False)
start_btn = gr.Button("Yes, start", variant="secondary", interactive=False)
def _toggle_start(checked: bool):
return gr.update(interactive=checked, variant=("primary" if checked else "secondary"))
understood.change(_toggle_start, inputs=understood, outputs=start_btn)
# Examples: Squats
with gr.Group():
gr.Markdown("### Examples: BodyWeightSquats")
with gr.Row():
with gr.Column():
gr.Markdown("**Expected depiction of action**")
gr.Video(value=EX_CACHE["BodyWeightSquats"]["real"], height=240, autoplay=False, elem_id="ex_squats_real",)
with gr.Column():
gr.Markdown("**Poorly generated action**")
gr.Video(value=EX_CACHE["BodyWeightSquats"]["bad"], height=240, autoplay=False)
if not (EX_CACHE["BodyWeightSquats"]["real"] and EX_CACHE["BodyWeightSquats"]["bad"]):
gr.Markdown("> ⚠️ Upload `examples/BodyWeightSquats_real.mp4` and `_bad.mp4` to show both samples.")
# Examples: WallPushUps
with gr.Group():
gr.Markdown("### Examples: WallPushUps")
with gr.Row():
with gr.Column():
gr.Markdown("**Expected depiction of action**")
gr.Video(value=EX_CACHE["WallPushUps"]["real"], height=240, autoplay=False, elem_id="ex_wallpushups_real",)
with gr.Column():
gr.Markdown("**Poorly generated action**")
gr.Video(value=EX_CACHE["WallPushUps"]["bad"], height=240, autoplay=False)
if not (EX_CACHE["WallPushUps"]["real"] and EX_CACHE["WallPushUps"]["bad"]):
gr.Markdown("> ⚠️ Upload `examples/WallPushUps_real.mp4` and `_bad.mp4` to show both samples.")
gr.HTML("""
""")
# ------------------ PAGE 2: Evaluation ------------------
page_eval = gr.Group(visible=False, elem_id="eval")
with page_eval:
# PID 입력
with gr.Row():
pid = gr.Textbox(label="Participant ID (required)", placeholder="e.g., Youngsun-2025/10/24")
# 지침(원문) + 비디오 + 진행바 / 오른쪽에 슬라이더 + Save&Next
with gr.Row(): #equal_height=True
with gr.Column(scale=1):
gr.Markdown(INSTRUCTION_MD) # 교수님 문구 그대로
video = gr.Video(label="Video", height=360)
progress = gr.HTML(_progress_html(0, TOTAL_PER_PARTICIPANT))
with gr.Column(scale=1):
action_tb = gr.Textbox(label="Expected action", interactive=False)
# NEW: two separate sliders
score_action = gr.Slider(
minimum=0.0, maximum=10.0, step=0.1, value=5.0,
label="Action Consistency (0.0 - 10.0)"
)
score_phys = gr.Slider(
minimum=0.0, maximum=10.0, step=0.1, value=5.0,
label="Physical Plausibility (0.0 - 10.0)"
)
save_next = gr.Button("💾 Save & Next ▶", variant="secondary", interactive=False)
status = gr.Markdown(visible=False)
done_state = gr.State(0)
# PID 입력에 따라 Save&Next 토글
def _toggle_by_pid(pid_text: str):
enabled = bool(pid_text and pid_text.strip())
return gr.update(interactive=enabled, variant=("primary" if enabled else "secondary"))
# pid.change(_toggle_by_pid, inputs=pid, outputs=save_next)
def _on_action_release(val: float, pid_text: str, sel_p):
# 사용자가 액션 슬라이더를 한 번이라도 놓으면 선택 완료
return val, _recompute_save(pid_text, val, sel_p)
def _on_phys_release(val: float, pid_text: str, sel_a):
return val, _recompute_save(pid_text, sel_a, val)
# release: 마우스를 놓을 때 1회 확정
score_action.release(_on_action_release,
inputs=[score_action, pid, selected_phys],
outputs=[selected_action, save_next])
score_phys.release(_on_phys_release,
inputs=[score_phys, pid, selected_action],
outputs=[selected_phys, save_next])
# PID가 바뀌면 현재 선택 상태로 Save 버튼 재계산
pid.change(_recompute_save,
inputs=[pid, selected_action, selected_phys],
outputs=save_next)
page_thanks = gr.Group(visible=False)
with page_thanks:
reward_msg = gr.Markdown(visible=False)
reward_code_box = gr.Textbox(label="Your reward code (copy & paste)", interactive=False, visible=False)
reward_pid_box = gr.Textbox(label="Your participant ID", interactive=False, visible=False)
gr.Markdown("### Any comments (optional)")
feedback_tb = gr.Textbox(
label="Any comments (optional)",
placeholder="Leave any feedback for the study organizers (optional).",
lines=4
)
feedback_submit = gr.Button("Submit feedback")
feedback_status = gr.Markdown(visible=False)
# -------- 페이지 전환 & 첫 로드 --------
ANCHOR_IDX = 0 # videos.json의 맨 첫 비디오
ANCHOR_REPEATS = 5 # 앵커 5회
MIN_GAP = 1 # 앵커 연속 금지(인접 금지)
def _build_order_least_first_with_anchor(total:int, anchor_idx:int, repeats:int, min_gap:int=1):
"""
- results.csv를 읽어 video_id별 카운트를 계산
- 앵커(첫 비디오) 5회 포함, 연속 금지
- 나머지는 '가장 적게 평가된 순'으로 중복 없이 채움
"""
assert repeats <= total
N = len(V)
assert N >= 1
# 0) id 매핑
def vid_of(i): return _get_video_id(V[i])
# 1) 현재 누적 카운트 로드
counts = _load_eval_counts()
# 2) 앵커 제외 후보(중복 없이) 정렬: 카운트 오름차순, 동률은 랜덤 셔플
anchor_vid = vid_of(anchor_idx)
candidates = [i for i in range(N) if i != anchor_idx]
# 동률 랜덤화를 위해 일단 셔플
random.shuffle(candidates)
candidates.sort(key=lambda i: counts.get(vid_of(i), 0))
others_needed = total - repeats
if len(candidates) < others_needed:
raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.")
others = candidates[:others_needed] # 중복 없이 선택
# 3) others를 베이스 시퀀스로(랜덤 살짝 섞기)
random.shuffle(others)
# 4) 앵커를 구간 배치(연속 금지)
seq = [None] * total
segment = total // repeats if repeats > 0 else total
anchor_positions = []
for k in range(repeats):
lo = k * segment
hi = (k + 1) * segment if k < repeats - 1 else total
cand = random.randrange(lo, hi)
def ok(pos):
return all(abs(pos - p) >= (min_gap + 1) for p in anchor_positions)
found = None
for d in range(0, max(1, segment)):
for sgn in (+1, -1):
pos = cand + sgn * d
if 0 <= pos < total and ok(pos):
found = pos
break
if found is not None:
break
if found is None:
# 마지막 수단: 전체 탐색
for pos in range(total):
if ok(pos):
found = pos
break
if found is None:
raise RuntimeError("Failed to place anchor without adjacency.")
anchor_positions.append(found)
for pos in anchor_positions:
seq[pos] = anchor_idx
# 5) 빈 자리를 others로 채우기
j = 0
for i in range(total):
if seq[i] is None:
seq[i] = others[j]
j += 1
# 6) 안전 체크
assert sum(1 for x in seq if x == anchor_idx) == repeats
for i in range(1, total):
assert not (seq[i] == anchor_idx and seq[i-1] == anchor_idx), "Adjacent anchors found."
return seq
def _start_and_load_first():
total = TOTAL_PER_PARTICIPANT
order = _build_order_least_first_with_anchor(
total=total,
anchor_idx=ANCHOR_IDX,
repeats=ANCHOR_REPEATS,
min_gap=MIN_GAP
)
first_idx = order[0]
v0 = V[first_idx]
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False),
v0["url"],
_extract_action(v0),
5.0, # score_action default
5.0, # score_phys default ✅ NEW
gr.update(visible=False, value=""),
0,
_progress_html(0, TOTAL_PER_PARTICIPANT),
order,
1,
_get_video_id(v0), # cur_video_id
"",
None, # ⬅️ selected_action 초기화(= 아직 선택 안됨)
None, # ⬅️ selected_phys 초기화
gr.update(interactive=False, variant="secondary"), # ⬅️ Save 버튼 잠금
)
start_btn.click(
_start_and_load_first,
inputs=[],
outputs=[
page_intro, page_eval, page_thanks,
video, action_tb,
score_action, score_phys, # <-- two sliders
status,
done_state, progress, order_state, ptr_state, cur_video_id,
reward_code_state,
selected_action, selected_phys,
save_next,
]
)
def save_and_next(participant_id, cur_vid, action_score, phys_score,
done_cnt, order, ptr, reward_code):
try:
if not participant_id or not participant_id.strip():
return (
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=True, value="❗ Please enter your Participant ID."),
gr.update(), gr.update(),
done_cnt,
_progress_html(done_cnt, TOTAL_PER_PARTICIPANT),
5.0, # reset action slider
5.0, # reset phys slider ✅
ptr,
cur_vid,
reward_code,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
None, # ⬅️ selected_action reset
None, # ⬅️ selected_phys reset
gr.update(interactive=False, variant="secondary"), # ⬅️ lock button
)
# save both scores
status_msg = push(participant_id, cur_vid, action_score, phys_score, "")
new_done = int(done_cnt) + 1
if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order):
code = reward_code.strip() or _gen_reward_code(participant_id, length=10)
try:
_persist_reward_code(participant_id, code, new_done)
except Exception:
pass
thanks_text = (
"## 🎉 Thank you so much!\n"
"Your responses have been recorded. You may now close this window.\n\n"
"**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment."
)
return (
gr.update(visible=False),
gr.update(visible=True),
status_msg,
None,
"",
TOTAL_PER_PARTICIPANT,
_progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT),
5.0, # reset action
5.0, # reset phys
len(order),
cur_vid,
code,
gr.update(visible=True, value=thanks_text),
gr.update(visible=True, value=code),
gr.update(visible=True, value=participant_id.strip()),
None, # ⬅️ selected_action reset
None, # ⬅️ selected_phys reset
gr.update(interactive=False, variant="secondary"),
)
# next video
next_idx = order[ptr]
v = V[next_idx]
next_vid = _get_video_id(v)
return (
gr.update(visible=True),
gr.update(visible=False),
status_msg,
v["url"],
_extract_action(v),
new_done,
_progress_html(new_done, TOTAL_PER_PARTICIPANT),
5.0, # reset action
5.0, # reset phys
ptr + 1,
next_vid,
reward_code,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
None, # ⬅️ selected_action reset
None, # ⬅️ selected_phys reset
gr.update(interactive=False, variant="secondary"),
)
except Exception as e:
return (
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=True, value=f"❌ Error: {type(e).__name__}: {e}"),
gr.update(), gr.update(),
done_cnt,
_progress_html(done_cnt, TOTAL_PER_PARTICIPANT),
5.0,
5.0,
ptr,
cur_vid,
reward_code,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
None, # ⬅️ reset on error too
None,
gr.update(interactive=False, variant="secondary"),
)
save_next.click(
save_and_next,
inputs=[pid, cur_video_id, score_action, score_phys, done_state, order_state, ptr_state, reward_code_state],
outputs=[
page_eval, page_thanks,
status, video, action_tb,
done_state, progress, score_action, score_phys,
ptr_state, cur_video_id,
reward_code_state,
reward_msg, reward_code_box, reward_pid_box,
selected_action, selected_phys, # ⬅️ 추가
save_next, # ⬅️ 버튼 상태 갱신
]
)
feedback_submit.click(
push_final_feedback,
# 완료 후 page_thanks에서 보여주는 participant ID 박스를 사용 (값이 채워져 있음)
inputs=[reward_pid_box, feedback_tb],
outputs=feedback_status
)
if __name__ == "__main__":
# demo.launch()
demo.launch(ssr_mode=False)