|
|
|
|
|
import os |
|
|
import json |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
from huggingface_hub import hf_hub_download, HfApi |
|
|
DATASET_REPO_ID = "Heng2004/lao-science-qa-store" |
|
|
DATASET_FILENAME = "manual_qa.jsonl" |
|
|
|
|
|
import qa_store |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
DATA_DIR = os.path.join(BASE_DIR, "data") |
|
|
|
|
|
CURRICULUM_PATH = os.path.join(DATA_DIR, "M_1_U_1.jsonl") |
|
|
MANUAL_QA_PATH = os.path.join(DATA_DIR, "manual_qa.jsonl") |
|
|
|
|
|
GLOSSARY_PATH = os.path.join(DATA_DIR, "glossary.jsonl") |
|
|
|
|
|
|
|
|
def sync_upload_manual_qa() -> str: |
|
|
""" |
|
|
Upload the local manual_qa.jsonl back to the Hugging Face Dataset repo. |
|
|
Returns a status message string to display in the UI. |
|
|
""" |
|
|
if not DATASET_REPO_ID or "YOUR_USERNAME" in DATASET_REPO_ID: |
|
|
return "⚠️ Upload Skipped (Repo ID not set)" |
|
|
|
|
|
print(f"[INFO] Uploading {DATASET_FILENAME} to {DATASET_REPO_ID}...") |
|
|
try: |
|
|
from huggingface_hub import HfApi |
|
|
|
|
|
api = HfApi() |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=MANUAL_QA_PATH, |
|
|
path_in_repo=DATASET_FILENAME, |
|
|
repo_id=DATASET_REPO_ID, |
|
|
repo_type="dataset", |
|
|
commit_message="Teacher Panel: Updated Q&A data" |
|
|
) |
|
|
print("[INFO] Upload success!") |
|
|
return "☁️ Cloud Upload Success" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] Could not upload manual_qa.jsonl: {e}") |
|
|
return f"⚠️ Cloud Upload Failed: {e}" |
|
|
|
|
|
def sync_download_manual_qa() -> None: |
|
|
""" |
|
|
Download the latest manual_qa.jsonl from the Hugging Face Dataset repo |
|
|
at startup so we don't lose previous teacher edits. |
|
|
""" |
|
|
if not DATASET_REPO_ID or "YOUR_USERNAME" in DATASET_REPO_ID: |
|
|
print("[WARN] DATASET_REPO_ID is not set. Skipping download.") |
|
|
return |
|
|
|
|
|
print(f"[INFO] Downloading {DATASET_FILENAME} from {DATASET_REPO_ID}...") |
|
|
try: |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
downloaded_path = hf_hub_download( |
|
|
repo_id=DATASET_REPO_ID, |
|
|
filename=DATASET_FILENAME, |
|
|
repo_type="dataset", |
|
|
token=os.environ.get("HF_TOKEN") |
|
|
) |
|
|
|
|
|
|
|
|
import shutil |
|
|
target_path = MANUAL_QA_PATH |
|
|
shutil.copy(downloaded_path, target_path) |
|
|
print("[INFO] Download success!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[WARN] Could not download manual_qa.jsonl: {e}") |
|
|
print("[INFO] Starting with empty or local manual_qa.jsonl instead.") |
|
|
|
|
|
def load_curriculum() -> None: |
|
|
""" |
|
|
Load official textbook JSONL into qa_store.ENTRIES and AUTO_QA_KNOWLEDGE. |
|
|
""" |
|
|
qa_store.ENTRIES.clear() |
|
|
qa_store.AUTO_QA_KNOWLEDGE.clear() |
|
|
|
|
|
if not os.path.exists(CURRICULUM_PATH): |
|
|
print(f"[WARN] Curriculum file not found: {CURRICULUM_PATH}") |
|
|
qa_store.RAW_KNOWLEDGE = "ຍັງບໍ່ມີຂໍ້ມູນປະຫວັດສາດຖືກໂຫຼດ." |
|
|
return |
|
|
|
|
|
|
|
|
with open(CURRICULUM_PATH, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
try: |
|
|
obj: Dict[str, Any] = json.loads(line) |
|
|
except json.JSONDecodeError: |
|
|
print("[WARN] Skipping invalid JSON line in curriculum file.") |
|
|
continue |
|
|
|
|
|
|
|
|
if "text" not in obj: |
|
|
continue |
|
|
|
|
|
qa_store.ENTRIES.append(obj) |
|
|
|
|
|
for pair in obj.get("qa", []): |
|
|
q = (pair.get("q") or "").strip() |
|
|
a = (pair.get("a") or "").strip() |
|
|
if not q or not a: |
|
|
continue |
|
|
norm_q = qa_store.normalize_question(q) |
|
|
qa_store.AUTO_QA_KNOWLEDGE.append( |
|
|
{ |
|
|
"norm_q": norm_q, |
|
|
"q": q, |
|
|
"a": a, |
|
|
"source": "auto", |
|
|
"id": obj.get("id", ""), |
|
|
} |
|
|
) |
|
|
|
|
|
if qa_store.ENTRIES: |
|
|
qa_store.RAW_KNOWLEDGE = "\n\n".join(e["text"] for e in qa_store.ENTRIES) |
|
|
else: |
|
|
qa_store.RAW_KNOWLEDGE = "ຍັງບໍ່ມີຂໍ້ມູນປະຫວັດສາດທີ່ອ່ານໄດ້." |
|
|
|
|
|
|
|
|
def load_glossary() -> None: |
|
|
"""Load glossary entries into qa_store.GLOSSARY.""" |
|
|
qa_store.GLOSSARY.clear() |
|
|
|
|
|
if not os.path.exists(GLOSSARY_PATH): |
|
|
print(f"[WARN] Glossary file not found: {GLOSSARY_PATH}") |
|
|
return |
|
|
|
|
|
with open(GLOSSARY_PATH, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
try: |
|
|
obj = json.loads(line) |
|
|
except json.JSONDecodeError: |
|
|
print("[WARN] Skipping invalid glossary JSON line") |
|
|
continue |
|
|
qa_store.GLOSSARY.append(obj) |
|
|
|
|
|
print(f"[INFO] Loaded {len(qa_store.GLOSSARY)} glossary terms.") |
|
|
|
|
|
|
|
|
def load_manual_qa() -> None: |
|
|
""" |
|
|
Load manual_qa.jsonl into qa_store.MANUAL_QA_LIST and MANUAL_QA_INDEX. |
|
|
""" |
|
|
qa_store.MANUAL_QA_LIST.clear() |
|
|
qa_store.MANUAL_QA_INDEX.clear() |
|
|
max_num = 0 |
|
|
|
|
|
if not os.path.exists(MANUAL_QA_PATH): |
|
|
print(f"[WARN] Manual QA file not found: {MANUAL_QA_PATH}") |
|
|
qa_store.NEXT_MANUAL_ID = 1 |
|
|
return |
|
|
|
|
|
|
|
|
with open(MANUAL_QA_PATH, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
try: |
|
|
obj = json.loads(line) |
|
|
except json.JSONDecodeError: |
|
|
print("[WARN] Skipping invalid JSON line in manual QA file.") |
|
|
continue |
|
|
|
|
|
|
|
|
q = (obj.get("q") or "").strip() |
|
|
a = (obj.get("a") or "").strip() |
|
|
if not q or not a: |
|
|
continue |
|
|
|
|
|
entry_id = str(obj.get("id") or "") |
|
|
if not entry_id: |
|
|
max_num += 1 |
|
|
entry_id = f"manual_{max_num:04d}" |
|
|
|
|
|
|
|
|
import re as _re |
|
|
|
|
|
m = _re.search(r"(\d+)$", entry_id) |
|
|
if m: |
|
|
max_num = max(max_num, int(m.group(1))) |
|
|
|
|
|
norm_q = qa_store.normalize_question(q) |
|
|
entry = { |
|
|
"id": entry_id, |
|
|
"q": q, |
|
|
"a": a, |
|
|
"norm_q": norm_q, |
|
|
} |
|
|
qa_store.MANUAL_QA_LIST.append(entry) |
|
|
qa_store.MANUAL_QA_INDEX[norm_q] = entry |
|
|
|
|
|
qa_store.NEXT_MANUAL_ID = max_num + 1 if max_num > 0 else 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_new_manual_id() -> str: |
|
|
""" |
|
|
Generate the smallest free manual_XXXX ID based on the |
|
|
current MANUAL_QA_LIST (so gaps like 11 after delete |
|
|
are reused). |
|
|
""" |
|
|
import re as _re |
|
|
|
|
|
used_nums = set() |
|
|
|
|
|
|
|
|
for e in qa_store.MANUAL_QA_LIST: |
|
|
raw_id = str(e.get("id") or "") |
|
|
m = _re.search(r"(\d+)$", raw_id) |
|
|
if m: |
|
|
used_nums.add(int(m.group(1))) |
|
|
|
|
|
|
|
|
i = 1 |
|
|
while i in used_nums: |
|
|
i += 1 |
|
|
|
|
|
|
|
|
qa_store.NEXT_MANUAL_ID = i + 1 |
|
|
|
|
|
return f"manual_{i:04d}" |
|
|
|
|
|
|
|
|
|
|
|
def save_manual_qa_file() -> None: |
|
|
""" |
|
|
Persist MANUAL_QA_LIST to data/manual_qa.jsonl. |
|
|
""" |
|
|
os.makedirs(os.path.dirname(MANUAL_QA_PATH), exist_ok=True) |
|
|
with open(MANUAL_QA_PATH, "w", encoding="utf-8") as f: |
|
|
for e in qa_store.MANUAL_QA_LIST: |
|
|
obj = {"id": e["id"], "q": e["q"], "a": e["a"]} |
|
|
f.write(json.dumps(obj, ensure_ascii=False) + "\n") |
|
|
|
|
|
|
|
|
def rebuild_combined_qa() -> None: |
|
|
""" |
|
|
Combine auto and manual QA into QA_INDEX & ALL_QA_KNOWLEDGE. |
|
|
Manual answers override auto ones if same normalized question. |
|
|
""" |
|
|
qa_store.QA_INDEX.clear() |
|
|
qa_store.ALL_QA_KNOWLEDGE.clear() |
|
|
|
|
|
|
|
|
for item in qa_store.AUTO_QA_KNOWLEDGE: |
|
|
norm_q = item["norm_q"] |
|
|
qa_store.QA_INDEX[norm_q] = item["a"] |
|
|
qa_store.ALL_QA_KNOWLEDGE.append(item) |
|
|
|
|
|
|
|
|
for e in qa_store.MANUAL_QA_LIST: |
|
|
item = { |
|
|
"norm_q": e["norm_q"], |
|
|
"q": e["q"], |
|
|
"a": e["a"], |
|
|
"source": "manual", |
|
|
"id": e["id"], |
|
|
} |
|
|
qa_store.QA_INDEX[item["norm_q"]] = item["a"] |
|
|
qa_store.ALL_QA_KNOWLEDGE.append(item) |
|
|
|
|
|
|
|
|
def manual_qa_table_data() -> List[List[str]]: |
|
|
""" |
|
|
Table rows for Teacher Panel. |
|
|
""" |
|
|
return [[e["id"], e["q"], e["a"]] for e in qa_store.MANUAL_QA_LIST] |
|
|
|