File size: 6,508 Bytes
fe6264c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf716c3
 
 
fe6264c
 
 
cf716c3
 
 
 
fe6264c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42ccf80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe6264c
42ccf80
 
fe6264c
 
42ccf80
fe6264c
 
 
 
 
 
 
42ccf80
 
cf716c3
fe6264c
 
 
 
 
 
 
 
ccd9fcb
 
42ccf80
ccd9fcb
 
 
42ccf80
fe6264c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1406a1d
 
 
 
fe6264c
 
 
 
 
 
 
 
 
 
 
 
 
 
42ccf80
fe6264c
 
42ccf80
 
fe6264c
 
 
42ccf80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# model_utils.py
from typing import List, Optional
import re

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

import qa_store
from loader import load_curriculum, load_manual_qa, rebuild_combined_qa

# -----------------------------
# Model
# -----------------------------
MODEL_NAME = "SeaLLMs/SeaLLMs-v3-1.5B-Chat"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

device = "cuda" if torch.cuda.is_available() else "cpu"

model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float32,
).to(device)

model.eval()


# Load data once at import time
load_curriculum()
load_manual_qa()
rebuild_combined_qa()

SYSTEM_PROMPT = (
    "ທ່ານແມ່ນຜູ້ຊ່ວຍເຫຼືອດ້ານປະຫວັດສາດຂອງປະເທດລາວ "
    "ສໍາລັບນັກຮຽນຊັ້ນ ມ.1. "
    "ຕອບແຕ່ພາສາລາວ ໃຫ້ຕອບສັ້ນໆ 2–3 ປະໂຫຍກ ແລະເຂົ້າໃຈງ່າຍ. "
    "ໃຫ້ອີງຈາກຂໍ້ມູນຂ້າງລຸ່ມນີ້ເທົ່ານັ້ນ. "
    "ຖ້າຂໍ້ມູນບໍ່ພຽງພໍ ຫຼືບໍ່ຊັດເຈນ ໃຫ້ບອກວ່າບໍ່ແນ່ໃຈ."
)


def retrieve_context(question: str, max_entries: int = 2) -> str:
    """
    Simple keyword retrieval over textbook entries.
    """
    if not qa_store.ENTRIES:
        return qa_store.RAW_KNOWLEDGE

    q = question.lower().strip()
    terms = [t for t in re.split(r"\s+", q) if len(t) > 1]

    if not terms:
        chosen = qa_store.ENTRIES[:max_entries]
        return "\n\n".join(
            f"[ຊັ້ນ {e.get('grade','')}, ບົດ {e.get('chapter','')}, "
            f"ຫົວຂໍ້ {e.get('section','')}{e.get('title','')}]\n{e['text']}"
            for e in chosen
        )

    scored = []

    for e in qa_store.ENTRIES:
        text = e.get("text", "")
        title = e.get("title", "")
        kws = e.get("keywords", [])
        topic = e.get("topic", "")

        base = (text + " " + title).lower()
        score = 0

        for t in terms:
            score += base.count(t)

        for kw in kws:
            kw_lower = kw.lower()
            for t in terms:
                if t in kw_lower:
                    score += 2

        if topic and any(t in topic for t in terms):
            score += 1

        if score > 0:
            scored.append((score, e))

    scored.sort(key=lambda x: x[0], reverse=True)
    top_entries = [e for _, e in scored[:max_entries]]

    if not top_entries:
        top_entries = qa_store.ENTRIES[:max_entries]

    context_blocks = []
    for e in top_entries:
        header = (
            f"[ຊັ້ນ {e.get('grade','')}, "
            f"ບົດ {e.get('chapter','')}, "
            f"ຫົວຂໍ້ {e.get('section','')}{e.get('title','')}]"
        )
        context_blocks.append(f"{header}\n{e.get('text','')}")

    return "\n\n".join(context_blocks)


def _format_history(history: Optional[List]) -> str:
    """
    Convert last few chat turns into a Lao conversation snippet
    to give the model context for follow-up questions.
    Gradio history format: [[user_msg, bot_msg], [user_msg, bot_msg], ...]
    """
    if not history:
        return ""

    # keep only the last 3 turns to avoid very long prompts
    recent = history[-3:]

    lines = []
    for turn in recent:
        if not isinstance(turn, (list, tuple)) or len(turn) != 2:
            continue
        user_msg, bot_msg = turn
        lines.append(f"ນັກຮຽນ: {user_msg}")
        lines.append(f"ອາຈານ AI: {bot_msg}")

    if not lines:
        return ""

    joined = "\n".join(lines)
    return f"ປະຫວັດການສົນທະນາກ່ອນໜ້າ:\n{joined}\n\n"


def build_prompt(question: str, history: Optional[List] = None) -> str:
    context = retrieve_context(question)
    history_block = _format_history(history)

    return f"""{SYSTEM_PROMPT}

{history_block}ຂໍ້ມູນອ້າງອີງ:
{context}

ຄຳຖາມ: {question}

ຄຳຕອບດ້ວຍພາສາລາວ:"""


def generate_answer(question: str, history: Optional[List] = None) -> str:
    prompt = build_prompt(question, history)
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=160,
            do_sample=False,
        )

    generated_ids = outputs[0][inputs["input_ids"].shape[1]:]
    answer = tokenizer.decode(generated_ids, skip_special_tokens=True).strip()

    # (your 2–3 sentence enforcement can stay here)
    sentences = re.split(r"(?<=[\.?!…])\s+", answer)
    short_answer = " ".join(sentences[:3]).strip()
    return short_answer if short_answer else answer
    

def answer_from_qa(question: str) -> Optional[str]:
    """
    1) exact match in QA_INDEX
    2) fuzzy match via word overlap with ALL_QA_KNOWLEDGE
    """
    norm_q = qa_store.normalize_question(question)
    if not norm_q:
        return None

    if norm_q in qa_store.QA_INDEX:
        return qa_store.QA_INDEX[norm_q]

    q_terms = [t for t in norm_q.split(" ") if len(t) > 1]
    if not q_terms:
        return None

    best_score = 0
    best_answer: Optional[str] = None

    for item in qa_store.ALL_QA_KNOWLEDGE:
        stored_terms = [t for t in item["norm_q"].split(" ") if len(t) > 1]
        overlap = sum(1 for t in q_terms if t in stored_terms)
        if overlap > best_score:
            best_score = overlap
            best_answer = item["a"]

    # require at least 2 overlapping words to accept fuzzy match
    if best_score >= 2:
        # optional: log when fuzzy match is used
        print(f"[FUZZY MATCH] score={best_score} -> {best_answer[:50]!r}")
        return best_answer

    return None


def laos_history_bot(message: str, history: List) -> str:
    """
    Main chatbot function for Student tab.
    """
    if not message.strip():
        return "ກະລຸນາພິມຄໍາຖາມກ່ອນ."

    direct = answer_from_qa(message)
    if direct:
        return direct

    try:
        # ✅ pass history to let LLM understand follow-up questions
        answer = generate_answer(message, history)
    except Exception as e:  # noqa: BLE001
        return f"ລະບົບມີບັນຫາ: {e}"

    return answer