#!/usr/bin/env python3 import os, re, csv, hashlib, shutil, subprocess from pathlib import Path from collections import defaultdict from typing import List, Tuple from huggingface_hub import HfApi, list_repo_files, hf_hub_download, create_repo from huggingface_hub import whoami from huggingface_hub import CommitOperationAdd from huggingface_hub import snapshot_download import time, glob # ========= CONFIG ========= SRC_REPO = "XThomasBU/video_evals_ucf101" SRC_SUBDIRS = ["Hunyuan_videos", "Opensora_768", "RunwayGen4", "wan21_videos"] # 필요한 것만 # 너의 로컬 Wan2.2(60개)도 포함 WAN22_LOCAL_ROOT = Path("/projectnb/ivc-ml/youngsun/Video_Eval/Datasets/Wan2p2/Generated_UCF") WAN22_TARGET_SUBDIR = "Wan2.2" # 목적지 리포의 상위 폴더명(우리가 쓰던 이름 유지) DEST_REPO = "SGTLIM/ucf101_eval_unified" # ← 네 계정으로 고정 DEST_REPO_IS_PRIVATE = False # 용량 넉넉한 작업 디렉토리( /tmp 대신 /projectnb ) WORKDIR = Path("/projectnb/ivc-ml/youngsun/tmp_ucf_unified") STAGING = WORKDIR / "staging" # 스테이징 루트 DL_ROOT = WORKDIR / "downloads" # 원본 다운로드 저장소 MAKE_SILENT = False TOKEN = os.getenv("HF_TOKEN") # ========================= # /tmp 대신 WORKDIR을 임시/캐시로 사용 os.environ.setdefault("TMPDIR", str(WORKDIR / "_tmp")) os.environ.setdefault("HF_HOME", "/projectnb/ivc-ml/youngsun/.cache/huggingface") # 디렉토리 보장 (WORKDIR / "_tmp").mkdir(parents=True, exist_ok=True) STAGING.mkdir(parents=True, exist_ok=True) DL_ROOT.mkdir(parents=True, exist_ok=True) # 액션명 정규화(필요시 추가) ALIAS = { "bodyweightsquats":"body_weight_squats", "bodysquats":"body_weight_squats", "body_weight_squats":"body_weight_squats", "hulahoop":"hula_hoop", "jumpingjack":"jumping_jack", "pullups":"pull_ups", "pushups":"push_ups", "throwdiscus":"throw_discus", "wallpushups":"wall_pushups", } def camel_action(s: str) -> str: # 'body_weight_squats' -> 'BodyWeightSquats' parts = s.strip("_").split("_") return "".join(p.capitalize() for p in parts if p) def extract_action_from_remote(rel_remote: str) -> str: """ HF 리포의 '원본 경로'에서만 액션을 뽑는다. 예: Hunyuan_videos/v_BodyWeightSquats_g05_c01.mp4 -> body_weight_squats """ base = os.path.basename(rel_remote) m = re.match(r"^v_([A-Za-z0-9]+)_", base) # 반드시 v__... if m: return slugify_action(m.group(1)) # (예외 케이스 최소화: 원본 리포는 v_패턴이므로 여기까지 올 일 거의 없음) return slugify_action(base.split("_", 1)[0]) def slugify_action(s: str) -> str: s = s.strip().lower().replace(" ", "_") s = re.sub(r"[^a-z0-9_]+", "_", s) s = re.sub(r"_+", "_", s).strip("_") return ALIAS.get(s, s) def model_slug(s: str) -> str: s = s.strip().lower() s = s.replace(" ", "_") s = re.sub(r"[^a-z0-9_]+", "_", s) s = re.sub(r"_+", "_", s).strip("_") return s def sha1_8(p: Path) -> str: h = hashlib.sha1() with open(p, "rb") as f: for chunk in iter(lambda: f.read(1024*1024), b""): h.update(chunk) return h.hexdigest()[:8] def ensure_ffmpeg(): # MAKE_SILENT=False 이면 ffmpeg 체크 안 함 if MAKE_SILENT and not shutil.which("ffmpeg"): raise RuntimeError("ffmpeg not found, but MAKE_SILENT=True") def mute_copy(src: Path, dst: Path): # (지금은 쓰지 않지만, 옵션 되살리기 대비로 남겨둠) cmd = ["ffmpeg","-y","-i",str(src),"-c:v","copy","-an",str(dst)] try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: cmd = [ "ffmpeg","-y","-i",str(src), "-vf","format=yuv420p","-movflags","+faststart", "-c:v","libx264","-crf","18","-preset","veryfast", "-an",str(dst) ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def extract_action_from_filename(fn: str) -> str: # 예: v_BodyWeightSquats_g05_c01.mp4 → BodyWeightSquats m = re.match(r"v_([A-Za-z0-9]+)", fn) if m: return slugify_action(m.group(1)) parts = fn.split("/") if len(parts) >= 2: return slugify_action(parts[-2]) stem = Path(fn).stem stem = re.sub(r"^\w+_", "", stem) return slugify_action(stem) def stage_from_hf_model(api, model_dir, rows): # 1) 원본 리포 파일 목록에서 이 폴더의 mp4만 고르기 all_remote = list_repo_files(repo_id=SRC_REPO, repo_type="dataset", token=TOKEN) remotes = [p for p in all_remote if p.startswith(model_dir + "/") and p.lower().endswith(".mp4")] print(f"[FETCH] {model_dir}: {len(remotes)} remote mp4 files") if not remotes: print(f"[WARN] no matches under {model_dir}") return # 2) 이 모델 전용 다운로드 폴더를 매번 깨끗하게 생성 dl_dir = DL_ROOT / model_dir if dl_dir.exists(): shutil.rmtree(dl_dir) dl_dir.mkdir(parents=True, exist_ok=True) # 3) (remote_path, local_path) 쌍으로 보관 (액션 추출은 remote_path로!) pairs = [] for rel_remote in remotes: local = hf_hub_download( repo_id=SRC_REPO, filename=rel_remote, # ← 원본 경로 유지 repo_type="dataset", token=TOKEN, local_dir=str(dl_dir), local_dir_use_symlinks=False, ) pairs.append((rel_remote, local)) # 4) 스테이징으로 이동 + 올바른 이름으로 리네임 folder_name = model_dir # 예: 'Hunyuan_videos', 'RunwayGen4', ... dst_dir = STAGING / folder_name dst_dir.mkdir(parents=True, exist_ok=True) counters = defaultdict(int) moved = 0 for rel_remote, local in sorted(pairs): # ✅ 액션은 '원본 경로'에서만 추출 action_slug = extract_action_from_remote(rel_remote) # 'body_weight_squats' print(action_slug) action_camel = camel_action(action_slug) # 'BodyWeightSquats' print(action_camel) counters[action_slug] += 1 idx = counters[action_slug] h8 = sha1_8(Path(local)) # ✅ 최종 규칙: Model_Action_두자리_해시8.mp4 new_name = f"{folder_name}_{action_camel}_{idx:02d}_{h8}.mp4" dst = dst_dir / new_name shutil.move(local, dst) moved += 1 rows.append([f"hf://{SRC_REPO}/{rel_remote}", folder_name, action_slug, idx, h8, f"{folder_name}/{new_name}"]) print(f"[STAGED] {model_dir}: moved {moved} files to {dst_dir}") def stage_from_local_wan22(rows: List[List[str]]): pretty_model = "Wan2.2" counters = defaultdict(int) for class_dir in sorted([p for p in WAN22_LOCAL_ROOT.iterdir() if p.is_dir()]): action = slugify_action(class_dir.name) for mp4 in sorted([p for p in class_dir.iterdir() if p.suffix.lower()==".mp4"]): counters[action] += 1 idx = counters[action] h8 = sha1_8(mp4) # new_name = f"wan2p2_{action}_{idx:02d}_{h8}.mp4" pretty_model = WAN22_TARGET_SUBDIR # ex) 'Wan2.2' camel = camel_action(action) # ex) 'PushUps' new_name = f"{pretty_model}_{camel}_{idx:02d}_{h8}.mp4" dst_dir = STAGING / pretty_model dst_dir.mkdir(parents=True, exist_ok=True) dst = dst_dir / new_name if MAKE_SILENT: mute_copy(mp4, dst) else: shutil.copy2(mp4, dst) # 🔊 원본 그대로 rows.append([str(mp4), pretty_model, action, idx, h8, f"{pretty_model}/{new_name}"]) def main(): if not TOKEN: raise SystemExit("Set HF_TOKEN with write permission.") ensure_ffmpeg() # 🔻 작업 디렉토리 초기화 (여기 추가) if STAGING.exists(): shutil.rmtree(STAGING) if DL_ROOT.exists(): shutil.rmtree(DL_ROOT) STAGING.mkdir(parents=True, exist_ok=True) DL_ROOT.mkdir(parents=True, exist_ok=True) api = HfApi() try: create_repo(repo_id=DEST_REPO, repo_type="dataset", private=DEST_REPO_IS_PRIVATE, token=TOKEN, exist_ok=True) except Exception: pass rows: List[List[str]] = [] # 1) 원본 리포에서 4개 모델 폴더만 스테이징 for sub in SRC_SUBDIRS: stage_from_hf_model(api, sub, rows) # 2) 로컬 Wan2.2(60개) 추가 stage_from_local_wan22(rows) # 3) 매핑 csv 저장 with open(STAGING / "mapping.csv", "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["source","model","action","idx","hash8","dest_path"]) w.writerows(rows) # 4) 폴더별로 명시적 추가 커밋(변경감지 우회) subdirs_to_push = ["Hunyuan_videos", "Opensora_768", "RunwayGen4", "wan21_videos"] #, "Wan2.2" for sd in subdirs_to_push: sd_path = STAGING / sd if not sd_path.exists(): print(f"[WARN] skip missing subdir: {sd}") continue files = sorted(str(p) for p in sd_path.rglob("*.mp4")) if not files: print(f"[WARN] skip empty subdir: {sd}") continue print(f"[PUSH] {sd} ... ({len(files)} files)") ops = [] for fp in files: rel_in_repo = os.path.relpath(fp, start=STAGING) # 예: Hunyuan_videos/xxx.mp4 ops.append(CommitOperationAdd(path_in_repo=rel_in_repo, path_or_fileobj=fp)) api.create_commit( repo_id=DEST_REPO, repo_type="dataset", operations=ops, commit_message=f"Add {sd} ({len(files)} files)", token=TOKEN, ) # mapping.csv 업로드 map_csv = STAGING / "mapping.csv" if map_csv.exists(): api.upload_file( path_or_fileobj=str(map_csv), path_in_repo="mapping.csv", repo_id=DEST_REPO, repo_type="dataset", token=TOKEN, commit_message="Add mapping.csv", ) print(f"[DONE] Pushed to https://huggingface.co/datasets/{DEST_REPO}") if __name__ == "__main__": main()