import argparse import json import math import os import sys import numpy as np from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from contextlib import nullcontext from torch.nn.utils.rnn import pad_sequence from torch.utils.data import DataLoader, Dataset try: from torch.amp import autocast as _autocast, GradScaler as _GradScaler def betina_autocast(device_type: str, enabled: bool = True): if not enabled or device_type != "cuda": return nullcontext() return _autocast(device_type=device_type, enabled=enabled) def betina_grad_scaler(device_type: str, enabled: bool = True): if not enabled or device_type != "cuda": return _NoOpGradScaler() return _GradScaler(device_type=device_type, enabled=enabled) except ImportError: # pragma: no cover from torch.cuda.amp import autocast as _autocast, GradScaler as _GradScaler def betina_autocast(device_type: str, enabled: bool = True): if not enabled or device_type != "cuda": return nullcontext() return _autocast(enabled=enabled) def betina_grad_scaler(device_type: str, enabled: bool = True): if not enabled or device_type != "cuda": return _NoOpGradScaler() return _GradScaler(enabled=enabled) try: from sentence_transformers import SentenceTransformer except ImportError as exc: # pragma: no cover raise ImportError("Install sentence-transformers to run the Betina pipeline") from exc try: from transformers import AutoModelForMaskedLM, AutoTokenizer except ImportError as exc: # pragma: no cover raise ImportError("Install transformers to run the Betina pipeline") from exc try: from datasets import load_dataset # type: ignore[import-not-found] except ImportError: # pragma: no cover load_dataset = None def _safe_load_dataset( path: str, name: Optional[str], *, split: str, hf_token: Optional[str], trust_remote_code: bool, ): if load_dataset is None: raise ImportError("Install the 'datasets' package to use Hugging Face corpora") base_kwargs = {"split": split, "trust_remote_code": trust_remote_code} attempts: List[Dict[str, Optional[str]]] = [] if hf_token: attempts.append({"token": hf_token}) attempts.append({"use_auth_token": hf_token}) attempts.append({}) last_error: Optional[Exception] = None for extra in attempts: try: return load_dataset(path, name, **base_kwargs, **extra) except TypeError as err: last_error = err continue except ValueError as err: if "use_auth_token" in str(err).lower(): last_error = err continue raise if last_error: raise last_error raise RuntimeError(f"Falha ao carregar dataset {path}/{name}") def _read_hf_token_file(path: Path) -> Optional[str]: try: content = path.read_text(encoding="utf-8").strip() except OSError: return None if not content: return None first_line = content.splitlines()[0].strip() return first_line or None def resolve_hf_token(explicit_token: Optional[str], token_file: Optional[str]) -> Tuple[Optional[str], Optional[str]]: """Resolve o token HF preferindo argumento, env vars e arquivo do huggingface-cli.""" if explicit_token and explicit_token.strip(): return explicit_token.strip(), "--hf-token" env_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") if env_token and env_token.strip(): return env_token.strip(), "env" file_candidates: List[Path] = [] if token_file: file_candidates.append(Path(token_file).expanduser()) else: hf_home = os.getenv("HF_HOME") if hf_home: file_candidates.append(Path(hf_home).expanduser() / "token") file_candidates.append(Path.home() / ".cache" / "huggingface" / "token") file_candidates.append(Path.home() / ".huggingface" / "token") for candidate in file_candidates: token = _read_hf_token_file(candidate) if token: return token, str(candidate) return None, None class _NoOpGradScaler: def __init__(self): pass def scale(self, loss): return loss def step(self, optimizer): optimizer.step() def update(self): pass def unscale_(self, optimizer): pass def state_dict(self): return {} def load_state_dict(self, state): pass class CallableAgentAdapter(nn.Module): def __init__(self, fn: Callable[[torch.Tensor], torch.Tensor], name: str): super().__init__() self.fn = fn self.agent_name = name or getattr(fn, "__name__", "callable_agent") def forward(self, tensor: torch.Tensor) -> torch.Tensor: # noqa: D401 return self.fn(tensor) class MultiIntelligenceRouter(nn.Module): """Despacha cada estágio do fluxo matriz para IAs distintas por stream/etapa.""" def __init__( self, num_streams: int, *, stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, stream_aliases: Optional[List[str]] = None, ): super().__init__() self.num_streams = num_streams self.stream_aliases = stream_aliases or [f"S{idx}" for idx in range(num_streams)] self.stage_modules = nn.ModuleDict() self.stage_logs: Dict[str, List[Dict[str, str]]] = {} if stage_config: self.apply_stage_config(stage_config) def apply_stage_config(self, stage_config: Dict[str, Dict[str, nn.Module]]) -> None: for stage_name, mapping in stage_config.items(): module_dict = nn.ModuleDict() for key, module in mapping.items(): module_dict[str(key)] = self._wrap_module(stage_name, key, module) self.stage_modules[stage_name] = module_dict def register_stage(self, stage_name: str, mapping: Dict[str, nn.Module]) -> None: module_dict = nn.ModuleDict() for key, module in mapping.items(): module_dict[str(key)] = self._wrap_module(stage_name, key, module) self.stage_modules[stage_name] = module_dict def _wrap_module(self, stage: str, key: str | int, module: nn.Module | Callable) -> nn.Module: if isinstance(module, list): wrapped = [self._wrap_module(stage, f"{key}_{idx}", item) for idx, item in enumerate(module)] seq = nn.Sequential(*wrapped) if not hasattr(seq, "agent_name"): seq.agent_name = f"seq_{stage}_{key}" return seq if isinstance(module, nn.Module): if not hasattr(module, "agent_name"): module.agent_name = module.__class__.__name__ return module if callable(module): name = getattr(module, "agent_name", None) or getattr(module, "__name__", f"{stage}_{key}_fn") return CallableAgentAdapter(module, name) raise TypeError(f"Módulo IA inválido para estágio {stage}/{key}: {type(module)}") def _select_module(self, stage_dict: nn.ModuleDict, key: str | int | None) -> Optional[nn.Module]: if key is not None: candidate_key = str(key) if candidate_key in stage_dict: return stage_dict[candidate_key] for fallback in ("*", "default", "-1"): if fallback in stage_dict: return stage_dict[fallback] return None def apply(self, stage: str, tensor: torch.Tensor, *, cycle_idx: Optional[int] = None) -> torch.Tensor: stage_dict = self.stage_modules[stage] if stage in self.stage_modules else None log: List[Dict[str, str]] = [] if stage_dict is None: self.stage_logs[stage] = log return tensor if tensor.dim() == 3: outputs = [] for stream_idx in range(tensor.size(1)): module = self._select_module(stage_dict, stream_idx) if module is None and stream_idx < len(self.stream_aliases): module = self._select_module(stage_dict, self.stream_aliases[stream_idx]) chunk = tensor[:, stream_idx, :] if module is not None: chunk = module(chunk) log.append( { "stream": self.stream_aliases[stream_idx] if stream_idx < len(self.stream_aliases) else str(stream_idx), "agent": getattr(module, "agent_name", module.__class__.__name__), } ) outputs.append(chunk) stacked = torch.stack(outputs, dim=1) self.stage_logs[stage] = log return stacked module = None if cycle_idx is not None: module = self._select_module(stage_dict, cycle_idx) if module is None: module = self._select_module(stage_dict, "global") if module is None: module = self._select_module(stage_dict, None) if module is None: self.stage_logs[stage] = log return tensor updated = module(tensor) alias = f"cycle_{cycle_idx}" if cycle_idx is not None else "global" log.append({"stream": alias, "agent": getattr(module, "agent_name", module.__class__.__name__)}) self.stage_logs[stage] = log return updated def stage_signature(self, stage: str) -> str: logs = self.stage_logs.get(stage, []) if not logs: return "identity" return " | ".join(f"{entry['stream']}→{entry['agent']}" for entry in logs) def describe_all_stages(self) -> Dict[str, List[Dict[str, str]]]: return {stage: list(entries) for stage, entries in self.stage_logs.items()} class SyntheticNeuronTriangle(nn.Module): """Refina deltas considerando uma base triangular (X,Y,contrabase).""" def __init__( self, embed_dim: int, num_streams: int, *, hidden_dim: int = 512, max_iters: int = 5, tol: float = 1e-4, delta_gain: float = 1.0, ) -> None: super().__init__() self.embed_dim = embed_dim self.num_streams = num_streams self.max_iters = max(0, max_iters) self.tol = max(1e-6, tol) self.delta_gain = float(delta_gain) seed_in = embed_dim * 4 # X, Y, contrabase média, diagonal refine_in = embed_dim * 3 # delta médio, eixo integrado, diagonal self.seed_proj = nn.Sequential( nn.LayerNorm(seed_in), nn.Linear(seed_in, hidden_dim), nn.GELU(), nn.Linear(hidden_dim, embed_dim), ) self.refine_proj = nn.Sequential( nn.LayerNorm(refine_in), nn.Linear(refine_in, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, embed_dim), ) def forward( self, mat_primary: torch.Tensor, mat_secondary: torch.Tensor, base_core: torch.Tensor, ) -> Tuple[torch.Tensor, Dict[str, object]]: batch, streams, dim = mat_primary.shape x_stream = mat_primary[:, 0, :] y_stream = mat_primary[:, 1, :] if streams > 1 else mat_secondary[:, 0, :] contra_stream = mat_secondary[:, 0, :] contra_mean = mat_secondary.mean(dim=1) base_center = torch.stack([x_stream, y_stream], dim=1).mean(dim=1) diag = torch.stack([x_stream - contra_stream, y_stream - contra_mean], dim=1).mean(dim=1) seed_features = torch.cat([base_center, contra_mean, base_core.squeeze(1), diag], dim=-1) axis_vector = torch.tanh(self.seed_proj(seed_features)) axis_norm = F.normalize(axis_vector, dim=-1) base_delta = mat_primary - base_core stream_align = torch.sum( F.normalize(mat_primary, dim=-1) * axis_norm.unsqueeze(1), dim=-1, keepdim=True, ) axis_component = axis_norm.unsqueeze(1) * stream_align delta = base_delta + self.delta_gain * axis_component iterations = 0 last_change = torch.zeros(1, device=mat_primary.device) if self.max_iters > 0: for idx in range(self.max_iters): delta_mean = delta.mean(dim=1) refine_inp = torch.cat([delta_mean, axis_vector, diag], dim=-1) refine = torch.tanh(self.refine_proj(refine_inp)).unsqueeze(1) delta = delta + refine last_change = refine.norm(dim=-1).mean() iterations = idx + 1 if last_change.item() < self.tol: break debug = { "axis": axis_vector.detach(), "diag": diag.detach(), "iterations": iterations, "residual": float(last_change.detach().item()), } return delta, debug def build_builtin_agent(name: str, embed_dim: int) -> nn.Module: normalized = name.strip().lower() if normalized in {"brock", "brockman", "brock ia"}: module = nn.Sequential( nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim * 2), nn.GELU(), nn.Linear(embed_dim * 2, embed_dim), ) elif normalized in {"chatgpt", "chatgpt 5.1", "chatgpt5.1", "gpt51"}: module = nn.Sequential( nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim), nn.SiLU(), nn.Linear(embed_dim, embed_dim), ) elif normalized in {"code", "code ia", "coder"}: module = nn.Sequential( nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim), ) elif normalized in {"critic", "mirror", "refiner"}: module = nn.Sequential( nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim * 3 // 2), nn.GELU(), nn.Linear(embed_dim * 3 // 2, embed_dim), nn.LayerNorm(embed_dim), ) elif normalized in {"identity", "none"}: module = nn.Identity() else: raise ValueError(f"Agente IA desconhecido: {name}") module.agent_name = name return module def _build_custom_agent(agent_def: Dict[str, object], embed_dim: int) -> nn.Module: if "style" in agent_def: module = build_builtin_agent(str(agent_def["style"]), embed_dim) module.agent_name = str(agent_def.get("name", agent_def["style"])) return module agent_type = str(agent_def.get("type", "mlp")).lower() hidden = int(agent_def.get("hidden", embed_dim * 2)) dropout = float(agent_def.get("dropout", 0.0)) if agent_type == "mlp": layers: List[nn.Module] = [ nn.LayerNorm(embed_dim), nn.Linear(embed_dim, hidden), nn.GELU(), ] if dropout > 0: layers.append(nn.Dropout(dropout)) layers.append(nn.Linear(hidden, embed_dim)) module = nn.Sequential(*layers) elif agent_type == "linear": module = nn.Sequential(nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim)) else: raise ValueError(f"Tipo de agente custom '{agent_type}' não suportado") module.agent_name = str(agent_def.get("name", agent_type)) return module def load_ia_config_file(path: str, embed_dim: int) -> Dict[str, object]: data = json.loads(Path(path).read_text(encoding="utf-8")) stage_entries = data.get("stages", {}) if not isinstance(stage_entries, dict): raise ValueError("Campo 'stages' do arquivo IA precisa ser um objeto mapeando estágio→streams") stage_config: Dict[str, Dict[str, nn.Module]] = {} for stage, mapping in stage_entries.items(): if not isinstance(mapping, dict): raise ValueError(f"Estágio '{stage}' precisa mapear streams para agentes") stage_config[stage] = {} for stream_key, agent_def in mapping.items(): if isinstance(agent_def, str): module = build_builtin_agent(agent_def, embed_dim) elif isinstance(agent_def, dict): module = _build_custom_agent(agent_def, embed_dim) else: raise ValueError(f"Agente inválido para estágio '{stage}' stream '{stream_key}'") stage_config[stage][str(stream_key)] = module stream_aliases = data.get("stream_aliases") if stream_aliases is not None and (not isinstance(stream_aliases, list) or not all(isinstance(x, str) for x in stream_aliases)): raise ValueError("'stream_aliases' deve ser uma lista de strings") return { "stream_aliases": stream_aliases, "stage_config": stage_config, "refinement_cycles": int(data.get("refinement_cycles", 0)), "cycle_stage_name": str(data.get("cycle_stage_name", "cycle")), } # -------------------------- # Lorenz attractor generator (discrete) # -------------------------- def lorenz_step(x, y, z, sigma=10.0, rho=28.0, beta=8/3, dt=0.01): dx = sigma * (y - x) dy = x * (rho - z) - y dz = x * y - beta * z xn = x + dx * dt yn = y + dy * dt zn = z + dz * dt return xn, yn, zn def lorenz_sequence(length, init=(0.1, 0.0, 0.0), sigma=10.0, rho=28.0, beta=8/3, dt=0.01): x, y, z = init seq = [] for _ in range(length): x, y, z = lorenz_step(x, y, z, sigma, rho, beta, dt) seq.append((x, y, z)) return np.array(seq) # shape (length, 3) # -------------------------- # Rössler attractor (alternative chaos source) # -------------------------- def rossler_step(x, y, z, a=0.2, b=0.2, c=5.7, dt=0.01): dx = -y - z dy = x + a * y dz = b + z * (x - c) xn = x + dx * dt yn = y + dy * dt zn = z + dz * dt return xn, yn, zn def rossler_sequence(length, init=(0.1, 0.0, 0.0), a=0.2, b=0.2, c=5.7, dt=0.01): x, y, z = init seq = [] for _ in range(length): x, y, z = rossler_step(x, y, z, a, b, c, dt) seq.append((x, y, z)) return np.array(seq) # -------------------------- # Terminal Velocity Matching (Flow Matching inspired) # -------------------------- def compute_terminal_velocity(embeddings: torch.Tensor, target_distribution: str = "gaussian") -> torch.Tensor: """ Calcula a velocidade terminal para mover embeddings em direção a uma distribuição alvo. Inspirado em Flow Matching / Rectified Flow. """ batch_size, dim = embeddings.shape if target_distribution == "gaussian": # Alvo: Gaussiana isotrópica padrão target = torch.randn_like(embeddings) * 0.1 elif target_distribution == "uniform_sphere": # Alvo: superfície de esfera unitária target = F.normalize(torch.randn_like(embeddings), dim=-1) else: target = torch.zeros_like(embeddings) # Velocidade = direção do alvo - posição atual (normalizada) velocity = target - embeddings velocity = F.normalize(velocity, dim=-1) * embeddings.norm(dim=-1, keepdim=True) * 0.1 return velocity # -------------------------- # Spectral Energy Regularization # -------------------------- def spectral_energy_loss(embeddings: torch.Tensor, target_rank: int = 32) -> torch.Tensor: """ Penaliza energia concentrada em poucos componentes principais. Força distribuição mais uniforme do espectro singular. """ if embeddings.shape[0] < 2: return torch.tensor(0.0, device=embeddings.device) centered = embeddings - embeddings.mean(dim=0, keepdim=True) # SVD para obter valores singulares try: _, s, _ = torch.linalg.svd(centered, full_matrices=False) except RuntimeError: return torch.tensor(0.0, device=embeddings.device) # Normaliza para distribuição de energia s_normalized = s / (s.sum() + 1e-8) # Entropia do espectro (queremos maximizar = distribuição uniforme) spectral_entropy = -(s_normalized * (s_normalized + 1e-8).log()).sum() # Penalidade: quanto menor a entropia, maior a penalidade max_entropy = math.log(min(embeddings.shape[0], embeddings.shape[1])) return (max_entropy - spectral_entropy) / max_entropy # -------------------------- # Semantic Divergence Metrics # -------------------------- def compute_angular_divergence(original: torch.Tensor, perturbed: torch.Tensor) -> float: """Calcula a divergência angular média entre vetores originais e perturbados.""" cos_sim = F.cosine_similarity(original, perturbed, dim=-1) # Clamp para evitar problemas numéricos com acos cos_sim = cos_sim.clamp(-1.0, 1.0) angles = torch.acos(cos_sim) return angles.mean().item() def compute_semantic_entropy(logits: torch.Tensor, top_k: int = 10) -> float: """Calcula a entropia semântica da distribuição de probabilidade.""" probs = F.softmax(logits, dim=-1) top_probs, _ = torch.topk(probs, min(top_k, probs.shape[-1]), dim=-1) # Renormaliza top-k top_probs = top_probs / (top_probs.sum(dim=-1, keepdim=True) + 1e-8) entropy = -(top_probs * (top_probs + 1e-8).log()).sum(dim=-1) return entropy.mean().item() # -------------------------- # SIGReg-like regularizer (encourage isotropic Gaussian embeddings) # -------------------------- def sigreg_loss(embeddings): # embeddings: (B, D) mu = embeddings.mean(dim=0) # (D,) centered = embeddings - mu.unsqueeze(0) # (B, D) # empirical covariance (D x D) approx via (centered^T center) B = embeddings.shape[0] cov = (centered.t() @ centered) / (B - 1 + 1e-8) # (D, D) # penalty = norm(cov - I) + norm(mu) D = embeddings.shape[1] eye = torch.eye(D, device=embeddings.device) cov_pen = F.mse_loss(cov, eye) mu_pen = (mu.pow(2)).mean() return cov_pen + mu_pen class VortexBetinaAntiHalluc(nn.Module): def __init__( self, embed_dim: int = 256, vortex_steps: int = 10, vortex_dt: float = 0.02, num_streams: int = 3, enable_rotation: bool = True, rotation_angle: float = math.pi / 4, rotation_threshold: float = 1e-4, rotation_clockwise: bool = False, enable_quadratic_reflection: bool = False, quadratic_boundary: float = 0.3, quadratic_strength: float = 0.5, boost_small_deltas: bool = True, delta_gain: float = 1.5, reflection_push: float = 0.25, stream_aliases: Optional[List[str]] = None, ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, refinement_cycles: int = 0, cycle_stage_name: str = "cycle", enforce_square_geometry: bool = True, square_rotation_degrees: float = 180.0, square_leak_ratio: float = 0.05, square_jitter_std_degrees: float = 0.0, enable_lorentz_transform: bool = False, lorentz_beta: float = 0.6, lorentz_axis_stream: int = 0, enable_triangle: bool = True, triangle_hidden_dim: int = 512, triangle_max_iters: int = 5, triangle_tol: float = 1e-4, triangle_delta_gain: float = 1.0, ): super().__init__() self.embed_dim = embed_dim self.vortex_steps = vortex_steps self.vortex_dt = vortex_dt self.num_streams = max(1, num_streams) self.enable_rotation = enable_rotation self.rotation_angle = rotation_angle self.rotation_threshold = rotation_threshold self.rotation_clockwise = rotation_clockwise self.enable_quadratic_reflection = enable_quadratic_reflection self.quadratic_boundary = nn.Parameter(torch.tensor(float(quadratic_boundary))) self.quadratic_strength = float(min(1.0, max(0.0, quadratic_strength))) self.boost_small_deltas = boost_small_deltas self.delta_gain = max(1.0, delta_gain) self.reflection_push = max(0.0, reflection_push) self.stream_aliases = self._prepare_stream_aliases(stream_aliases) self.refinement_cycles = max(0, refinement_cycles) self.cycle_stage_name = cycle_stage_name or "cycle" self.enforce_square_geometry = enforce_square_geometry self.square_rotation_radians = math.radians(square_rotation_degrees) self.square_leak_ratio = float(min(1.0, max(0.0, square_leak_ratio))) self.square_jitter_std = math.radians(max(0.0, square_jitter_std_degrees)) beta = max(-0.999, min(0.999, lorentz_beta)) self.enable_lorentz_transform = enable_lorentz_transform self.lorentz_beta = beta self.lorentz_axis_stream = max(0, lorentz_axis_stream) self.enable_triangle = enable_triangle self.ia_router = MultiIntelligenceRouter( num_streams=self.num_streams, stage_config=ia_stage_config, stream_aliases=self.stream_aliases, ) self.triangle_module = ( SyntheticNeuronTriangle( embed_dim, self.num_streams, hidden_dim=triangle_hidden_dim, max_iters=triangle_max_iters, tol=triangle_tol, delta_gain=triangle_delta_gain, ) if self.enable_triangle else None ) self.projector = nn.Sequential( nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim), ) nn.init.kaiming_normal_(self.projector[1].weight, mode="fan_out", nonlinearity="relu") nn.init.constant_(self.projector[1].bias, 0.1) # Vortex Dynamics Parameters (Chaos Injection) self.vortex_linear = nn.Parameter(torch.randn(3, embed_dim) * 0.1) self.vortex_scale = nn.Parameter(torch.ones(embed_dim)) # Terminal Velocity Matching parameters self.velocity_gate = nn.Parameter(torch.zeros(embed_dim)) self.velocity_bias = nn.Parameter(torch.zeros(embed_dim)) # Adaptive chaos parameters self.chaos_temperature = nn.Parameter(torch.tensor(1.0)) self.chaos_gate = nn.Parameter(torch.tensor(0.0)) self.attractor_selector = nn.Parameter(torch.tensor([0.7, 0.3])) # [lorenz, rossler] self.last_flow_states: Dict[str, object] = {} def _prepare_stream_aliases(self, provided: Optional[List[str]]) -> List[str]: if provided: aliases = list(provided) else: aliases = ["X", "Y", "Z", "W", "V", "U", "T", "S"] if len(aliases) < self.num_streams: aliases.extend(f"S{idx}" for idx in range(len(aliases), self.num_streams)) return aliases[: self.num_streams] def _rotate_matrix_plane(self, tensor: torch.Tensor, radians: float) -> torch.Tensor: if tensor.shape[-1] < 2: return tensor angle = radians % (2 * math.pi) if abs(angle) < 1e-9: return tensor rotated = tensor.clone() cos_theta = math.cos(angle) sin_theta = math.sin(angle) x = tensor[..., 0] y = tensor[..., 1] rotated[..., 0] = cos_theta * x - sin_theta * y rotated[..., 1] = sin_theta * x + cos_theta * y return rotated def _rotate_matrix_square(self, tensor: torch.Tensor) -> Tuple[torch.Tensor, float]: base_angle = abs(self.square_rotation_radians) % (2 * math.pi) if base_angle > math.pi: base_angle = 2 * math.pi - base_angle jitter = 0.0 if self.square_jitter_std > 0: jitter = torch.randn(1, device=tensor.device).item() * self.square_jitter_std angle = max(0.0, min(math.pi, base_angle + jitter)) if math.isclose(angle, 0.0, rel_tol=1e-6, abs_tol=1e-6): mirrored = tensor.clone() else: intensity = angle / math.pi mirrored = torch.lerp(tensor, -tensor, intensity) if self.square_leak_ratio > 0: mirrored = torch.lerp(mirrored, tensor, self.square_leak_ratio) return mirrored, angle def _select_axis_stream(self, tensor: torch.Tensor) -> torch.Tensor: stream_idx = min(self.lorentz_axis_stream, tensor.size(1) - 1) return tensor[:, stream_idx, :] def _lorentz_boost( self, spatial: torch.Tensor, reference_stream: torch.Tensor, time_like: torch.Tensor, ) -> Tuple[torch.Tensor, float, torch.Tensor]: beta = self.lorentz_beta gamma = 1.0 / math.sqrt(max(1e-8, 1.0 - beta**2)) axis = F.normalize(reference_stream, dim=-1, eps=1e-6) parallel_scalar = torch.sum(spatial * axis, dim=-1, keepdim=True) parallel_vec = parallel_scalar * axis perpendicular_vec = spatial - parallel_vec t_prime = gamma * (time_like - beta * parallel_scalar) parallel_prime = gamma * (parallel_scalar - beta * time_like) * axis updated_spatial = parallel_prime + perpendicular_vec return updated_spatial, gamma, t_prime.abs() def invert(self, vec: torch.Tensor) -> torch.Tensor: return -vec def intersection_knowledge(self, vec: torch.Tensor, base_k: torch.Tensor) -> torch.Tensor: dot = torch.sum(vec * base_k, dim=-1, keepdim=True) norm_k = torch.norm(base_k, dim=-1, keepdim=True).pow(2).clamp_min(1e-8) return (dot / norm_k) * base_k def euler_vortex(self, state: torch.Tensor) -> torch.Tensor: sigma, rho, beta, gamma = 10.0, 28.0, 8.0 / 3.0, 1.0 feature_dim = state.shape[-1] - 1 base = state[..., :-1] w = state[..., -1:] updated_base = torch.zeros_like(base) for i in range(0, feature_dim, 3): chunk = base[..., i : i + 3] if chunk.shape[-1] < 3: chunk = F.pad(chunk, (0, 3 - chunk.shape[-1])) chunk = chunk.clone() for _ in range(self.vortex_steps): x = chunk[..., 0] y = chunk[..., 1] z = chunk[..., 2] dx = sigma * (y - x) * self.vortex_dt dy = (x * (rho - z) - y) * self.vortex_dt dz = (x * y - beta * z) * self.vortex_dt chunk = chunk + torch.stack([dx, dy, dz], dim=-1) span = min(3, feature_dim - i) updated_base[..., i : i + span] = chunk[..., :span] energy = torch.norm(base, dim=-1, keepdim=True).pow(2) w_iter = w.clone() for _ in range(self.vortex_steps): dw = (gamma * energy - w_iter) * self.vortex_dt w_iter = w_iter + dw updated = state.clone() updated[..., :-1] = updated_base updated[..., -1:] = w_iter return updated def rotate_difference(self, delta: torch.Tensor, anchor: torch.Tensor) -> torch.Tensor: # noqa: ARG002 # Rotation now confined to the (x, y) plane while keeping z (and higher dims) untouched. if delta.shape[-1] < 2: return delta angle = -abs(self.rotation_angle) if self.rotation_clockwise else abs(self.rotation_angle) cos_theta = math.cos(angle) sin_theta = math.sin(angle) rotated = delta.clone() x = delta[..., 0] y = delta[..., 1] rotated[..., 0] = cos_theta * x - sin_theta * y rotated[..., 1] = sin_theta * x + cos_theta * y if delta.shape[-1] >= 3: rotated[..., 2] = delta[..., 2] # keep z static per vortex requirement return rotated def _get_quadratic_boundary(self) -> torch.Tensor: return self.quadratic_boundary.abs().clamp(0.05, 0.5) def quadratic_reflection(self, delta: torch.Tensor) -> Tuple[torch.Tensor, float]: """Reflect components between ±boundary and warp them quadratically (Pong-like bounce).""" if not self.enable_quadratic_reflection: return delta, 0.0 boundary = self._get_quadratic_boundary() period = 2.0 * boundary magnitude = delta.abs() direction = torch.sign(delta) wrapped = torch.remainder(magnitude, period) mirrored = torch.where(wrapped > boundary, period - wrapped, wrapped) normalized = (mirrored / boundary).clamp(0.0, 1.0) squared = normalized.pow(2) bounced = direction * boundary * squared blended = torch.lerp(delta, bounced, self.quadratic_strength) bounce_ratio = (magnitude > boundary).float().mean().item() return blended, bounce_ratio def forward(self, input_vec: torch.Tensor, chaos_factor: float = 1.0): """ chaos_factor: Multiplicador de agressividade (1.0 = normal, 5.0 = agressivo, 10.0 = insano) """ if input_vec.dim() == 2: batch = input_vec.size(0) mat_input = input_vec.unsqueeze(1).expand(batch, self.num_streams, self.embed_dim) elif input_vec.dim() == 3: batch, streams, dim = input_vec.shape if streams != self.num_streams or dim != self.embed_dim: raise ValueError( f"Esperava tensor (batch,{self.num_streams},{self.embed_dim}), recebi {input_vec.shape}" ) mat_input = input_vec else: raise ValueError("input_vec precisa ter 2 ou 3 dimensões") mat_flat = mat_input.reshape(-1, self.embed_dim) mat_primary = self.projector(mat_flat).reshape(-1, self.num_streams, self.embed_dim) stage_signatures: Dict[str, str] = {} mat_primary = self.ia_router.apply("base", mat_primary) stage_signatures["base"] = self.ia_router.stage_signature("base") mirrored_primary: Optional[torch.Tensor] = None square_tensor: Optional[torch.Tensor] = None square_angle_applied: Optional[float] = None if self.enforce_square_geometry: mirrored_primary, square_angle_applied = self._rotate_matrix_square(mat_primary) square_tensor = torch.stack([mat_primary, mirrored_primary], dim=2) mat_secondary = mirrored_primary else: mat_secondary = self.invert(mat_primary) mat_secondary = self.ia_router.apply("inversion", mat_secondary) stage_signatures["inversion"] = self.ia_router.stage_signature("inversion") base_k = mat_primary.mean(dim=1, keepdim=True) + 1e-4 * torch.randn_like(mat_primary[:, :1, :]) inter_primary = self.intersection_knowledge(mat_primary, base_k) inter_secondary = self.intersection_knowledge(mat_secondary, base_k) approx_inter_full = 0.5 * (inter_primary + inter_secondary) approx_inter = approx_inter_full.mean(dim=1) flow_states: Dict[str, object] = {} flow_states["matrix_primary"] = mat_primary flow_states["matrix_secondary"] = mat_secondary flow_states["base_core"] = base_k.squeeze(1) if mirrored_primary is not None: flow_states["matrix_rot_180"] = mirrored_primary if square_tensor is not None: flow_states["matrix_square"] = square_tensor flow_states["square_leak_ratio"] = self.square_leak_ratio if square_angle_applied is not None: flow_states["square_angle"] = square_angle_applied triangle_debug: Dict[str, object] = {} if self.triangle_module is not None: delta_green, triangle_debug = self.triangle_module(mat_primary, mat_secondary, base_k) else: delta_green = inter_primary - base_k flow_states["xy_bridge_matrix"] = delta_green if triangle_debug: flow_states["triangle_axis"] = triangle_debug.get("axis") flow_states["triangle_diag"] = triangle_debug.get("diag") flow_states["triangle_iterations"] = triangle_debug.get("iterations", 0) flow_states["triangle_residual"] = triangle_debug.get("residual", 0.0) delta_norm_stream = torch.norm(delta_green, dim=-1, keepdim=True) if self.boost_small_deltas: boost_mask = delta_norm_stream < self.rotation_threshold if boost_mask.any(): safe_norm = delta_norm_stream.clamp_min(1e-6) factor = (self.rotation_threshold / safe_norm) * self.delta_gain delta_green = torch.where(boost_mask, delta_green * factor, delta_green) boost_ratio = boost_mask.float().mean().item() else: boost_ratio = 0.0 flow_states["matrix_green_boost"] = delta_green if self.enable_rotation: delta_flat = delta_green.reshape(-1, self.embed_dim) base_flat = base_k.expand(-1, self.num_streams, -1).reshape(-1, self.embed_dim) delta_norm = torch.norm(delta_flat, dim=-1, keepdim=True) rotation_mask = delta_norm > self.rotation_threshold if rotation_mask.any(): rotated = self.rotate_difference(delta_flat, base_flat) delta_flat = torch.where(rotation_mask, rotated, delta_flat) delta_green = delta_flat.view(-1, self.num_streams, self.embed_dim) rotation_ratio = rotation_mask.float().mean().item() else: rotation_ratio = 0.0 flow_states["matrix_green_rot"] = delta_green boundary_val = self._get_quadratic_boundary() flow_states["quadratic_boundary"] = boundary_val.detach().item() if self.enable_quadratic_reflection and self.reflection_push > 0: norm_after_rot = torch.norm(delta_green, dim=-1, keepdim=True) push_mask = norm_after_rot < boundary_val if push_mask.any(): push_factor = 1.0 + self.reflection_push delta_green = torch.where(push_mask, delta_green * push_factor, delta_green) pre_reflect_push_ratio = push_mask.float().mean().item() else: pre_reflect_push_ratio = 0.0 if self.enable_quadratic_reflection: delta_green, reflection_ratio = self.quadratic_reflection(delta_green) else: reflection_ratio = 0.0 flow_states["matrix_green_reflect"] = delta_green mirror_reference = self.intersection_knowledge(mat_secondary, base_k) matrix_black = 0.5 * (2 * base_k - delta_green + mirror_reference) matrix_black = self.ia_router.apply("mirror", matrix_black) stage_signatures["mirror"] = self.ia_router.stage_signature("mirror") if self.enforce_square_geometry: x_stream = mat_primary[:, 0, :] x_output = mat_secondary[:, 0, :] matrix_black[:, 0, :] = x_output flow_states["square_input"] = x_stream flow_states["square_output"] = x_output flow_states["matrix_black_square"] = matrix_black flow_states["matrix_black"] = matrix_black delta_inter = matrix_black.mean(dim=1) cycle_signatures: List[str] = [] if self.refinement_cycles > 0: refined_delta = delta_inter for cycle_idx in range(self.refinement_cycles): refined_delta = self.ia_router.apply( self.cycle_stage_name, refined_delta, cycle_idx=cycle_idx, ) cycle_signatures.append(self.ia_router.stage_signature(self.cycle_stage_name)) delta_inter = refined_delta stage_signatures[self.cycle_stage_name] = cycle_signatures[-1] if cycle_signatures else "identity" flow_states["delta_pre_lorentz"] = delta_inter flow_states["ia_cycle_signatures"] = cycle_signatures w = torch.norm(delta_inter, dim=-1, keepdim=True) lorentz_gamma = 1.0 if self.enable_lorentz_transform: reference_stream = self._select_axis_stream(mat_primary) delta_inter, lorentz_gamma, w = self._lorentz_boost(delta_inter, reference_stream, w) flow_states["lorentz_reference"] = reference_stream flow_states["delta_lorentz"] = delta_inter flow_states["lorentz_gamma"] = lorentz_gamma flow_states["lorentz_time"] = w delta_before_chaos = delta_inter # APLICAÇÃO DO FATOR CAOS (OVERDRIVE) - VORTEX DYNAMICS V2 if chaos_factor != 1.0: batch_size = delta_inter.shape[0] # 1. Generate chaos sequences from both attractors lor_seq_np = lorenz_sequence(batch_size, init=(0.1, 0.0, 0.0)) ros_seq_np = rossler_sequence(batch_size, init=(0.1, 0.0, 0.0)) # Normalize sequences lor_seq_np = (lor_seq_np - lor_seq_np.mean(axis=0)) / (lor_seq_np.std(axis=0) + 1e-8) ros_seq_np = (ros_seq_np - ros_seq_np.mean(axis=0)) / (ros_seq_np.std(axis=0) + 1e-8) lor_tensor = torch.tensor(lor_seq_np, dtype=delta_inter.dtype, device=delta_inter.device) ros_tensor = torch.tensor(ros_seq_np, dtype=delta_inter.dtype, device=delta_inter.device) # 2. Adaptive attractor mixing (learnable weights) attractor_weights = F.softmax(self.attractor_selector, dim=0) mixed_chaos = attractor_weights[0] * lor_tensor + attractor_weights[1] * ros_tensor # 3. Map 3D Chaos -> Embedding Dimension perturb = mixed_chaos @ self.vortex_linear perturb = perturb * self.vortex_scale # 4. Terminal Velocity Matching (Flow-like correction) velocity = compute_terminal_velocity(delta_inter, target_distribution="gaussian") gated_velocity = velocity * torch.sigmoid(self.velocity_gate) + self.velocity_bias # 5. Normalize perturbation magnitude emb_norm = delta_inter.norm(dim=1, keepdim=True) + 1e-8 pert_norm = perturb.norm(dim=1, keepdim=True) + 1e-8 normalized_perturb = perturb * (emb_norm / pert_norm) # 6. Adaptive temperature scaling + learned gate chaos_scale = torch.sigmoid(self.chaos_temperature + self.chaos_gate) effective_chaos = chaos_factor * chaos_scale # Gate chaos intensity if semantic entropy explodes (uncertain corrections) delta_entropy = compute_semantic_entropy(delta_inter.unsqueeze(1)) chaos_entropy_gate = 1.0 if delta_entropy < 1.0 else 0.5 effective_chaos = effective_chaos * chaos_entropy_gate # 7. Apply combined perturbation: chaos + velocity flow delta_inter = delta_inter + effective_chaos * normalized_perturb + 0.1 * gated_velocity # Store chaos metrics flow_states["attractor_mix"] = attractor_weights.detach().cpu().tolist() flow_states["effective_chaos"] = effective_chaos.item() flow_states["chaos_scale"] = chaos_scale.item() flow_states["chaos_entropy_gate"] = chaos_entropy_gate flow_states["angular_divergence"] = compute_angular_divergence(delta_before_chaos, delta_inter) # Gain boost to increase boundary hits when chaos is controlled delta_inter = delta_inter * 1.5 flow_states["delta_output"] = delta_inter state = torch.cat([approx_inter, delta_inter, w], dim=-1) evolved = self.euler_vortex(state) flow_states["output_corner"] = evolved[..., :-1] flow_states["ia_stage_logs"] = self.ia_router.describe_all_stages() self.last_flow_states = flow_states overlap = F.cosine_similarity(inter_primary.view(-1, self.embed_dim), inter_secondary.view(-1, self.embed_dim), dim=-1) overlap = overlap.view(-1, self.num_streams).mean(dim=1) annulation = torch.norm(mat_primary - (-mat_secondary), dim=-1).pow(2).mean(dim=1) vortex_sink = evolved[..., -1] hall_penalty = (1 - overlap).clamp_min(0.0) approx_pull = ( torch.norm(mat_primary.mean(dim=1) - approx_inter, dim=-1).pow(2) + torch.norm(mat_secondary.mean(dim=1) - approx_inter, dim=-1).pow(2) ) # SIGReg Regularization (Isotropic Gaussian Enforcement) sig_loss = sigreg_loss(delta_inter) # Spectral Energy Regularization (distribui energia uniformemente) spectral_loss = spectral_energy_loss(delta_inter) delta_probs = F.softmax(delta_inter, dim=-1) semantic_entropy = -(delta_probs * (delta_probs + 1e-8).log()).sum(dim=-1).mean() semantic_entropy_val = semantic_entropy.item() boundary_val = self._get_quadratic_boundary() boundary_reg = (0.2 - boundary_val).clamp_min(0.0).pow(2) loss = ( annulation.mean() + 0.5 * hall_penalty.mean() + 0.25 * approx_pull.mean() - 0.1 * vortex_sink.mean() + 0.05 * sig_loss + 0.02 * spectral_loss # Força distribuição espectral uniforme + 0.02 * boundary_reg + 0.05 * semantic_entropy ) metrics = { "annulation": annulation.mean().item(), "cosine_overlap": overlap.mean().item(), "vortex_energy": vortex_sink.mean().item(), "sigreg_loss": sig_loss.item(), "spectral_loss": spectral_loss.item() if isinstance(spectral_loss, torch.Tensor) else spectral_loss, "boundary_reg": boundary_reg.mean().item() if isinstance(boundary_reg, torch.Tensor) else boundary_reg, "rotation_ratio": rotation_ratio, "approx_alignment": approx_pull.mean().item(), "reflection_ratio": reflection_ratio, "reflect_ratio": reflection_ratio, "boost_ratio": boost_ratio, "reflection_push_ratio": pre_reflect_push_ratio, "ia_base": stage_signatures.get("base", "identity"), "ia_inversion": stage_signatures.get("inversion", "identity"), "ia_mirror": stage_signatures.get("mirror", "identity"), "ia_cycle": " || ".join(cycle_signatures) if cycle_signatures else stage_signatures.get(self.cycle_stage_name, "identity"), "lorentz_gamma": lorentz_gamma, "square_angle_deg": math.degrees(square_angle_applied) if square_angle_applied is not None else 0.0, "square_leak_ratio": self.square_leak_ratio, "angular_divergence": flow_states.get("angular_divergence", 0.0), "attractor_mix": flow_states.get("attractor_mix", [1.0, 0.0]), "effective_chaos": flow_states.get("effective_chaos", 1.0), "semantic_entropy_approx": semantic_entropy_val, "triangle_iters": triangle_debug.get("iterations", 0) if triangle_debug else 0, "triangle_residual": triangle_debug.get("residual", 0.0) if triangle_debug else 0.0, } delta_norm = torch.norm(delta_inter, dim=-1) metrics["delta_norm_mean"] = delta_norm.mean().item() metrics["delta_norm_max"] = delta_norm.max().item() return evolved, loss, metrics, delta_inter class PortugueseSentenceDataset(Dataset): def __init__( self, sentences: List[str], tokenizer: AutoTokenizer, embedding_model: SentenceTransformer, mask_prob: float = 0.15, max_seq_length: int = 512, precompute_embeddings: bool = False, embedding_batch_size: int = 64, ): self.sentences = sentences self.tokenizer = tokenizer self.embedding_model = embedding_model self.mask_prob = mask_prob self.max_seq_length = max_seq_length self.precompute_embeddings = precompute_embeddings self.embedding_batch_size = max(1, embedding_batch_size) self.embeddings: Optional[torch.Tensor] = None self._embedding_cache: Dict[int, torch.Tensor] = {} if self.precompute_embeddings: self._precompute_all_embeddings() def __len__(self) -> int: return len(self.sentences) def _mask_tokens(self, input_ids: torch.Tensor, special_mask: torch.Tensor) -> Dict[str, torch.Tensor]: labels = input_ids.clone() probability_matrix = torch.full(labels.shape, self.mask_prob) probability_matrix.masked_fill_(special_mask.bool(), 0.0) masked_indices = torch.bernoulli(probability_matrix).bool() if not masked_indices.any(): candidate_positions = (~special_mask.bool()).nonzero(as_tuple=False).view(-1) choice = candidate_positions[torch.randint(0, candidate_positions.numel(), (1,)).item()] masked_indices[choice] = True labels[~masked_indices] = -100 input_ids = input_ids.clone() input_ids[masked_indices] = self.tokenizer.mask_token_id return {"input_ids": input_ids, "labels": labels} def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]: sentence = self.sentences[idx] encoding = self.tokenizer( sentence, return_tensors="pt", return_special_tokens_mask=True, truncation=True, max_length=self.max_seq_length, ) input_ids = encoding["input_ids"].squeeze(0) attention_mask = encoding["attention_mask"].squeeze(0) special_mask = encoding["special_tokens_mask"].squeeze(0) masked = self._mask_tokens(input_ids, special_mask) embedding = self._get_embedding(idx) return { "input_ids": masked["input_ids"], "attention_mask": attention_mask, "labels": masked["labels"], "embedding": embedding, } @torch.no_grad() def _precompute_all_embeddings(self) -> None: chunks: List[torch.Tensor] = [] total = len(self.sentences) for start in range(0, total, self.embedding_batch_size): batch = self.sentences[start : start + self.embedding_batch_size] batch_embeds = self.embedding_model.encode( batch, convert_to_tensor=True, show_progress_bar=False, batch_size=self.embedding_batch_size, ) chunks.append(batch_embeds.float().cpu()) self.embeddings = torch.cat(chunks, dim=0) if chunks else torch.empty(0) @torch.no_grad() def _compute_single_embedding(self, sentence: str) -> torch.Tensor: embed = self.embedding_model.encode( sentence, convert_to_tensor=True, show_progress_bar=False, batch_size=1, ) return embed.float().cpu() def _get_embedding(self, idx: int) -> torch.Tensor: if self.embeddings is not None: return self.embeddings[idx] if idx not in self._embedding_cache: self._embedding_cache[idx] = self._compute_single_embedding(self.sentences[idx]) return self._embedding_cache[idx] def build_collate_fn(tokenizer: AutoTokenizer): pad_id = tokenizer.pad_token_id def collate(batch: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]: input_ids = pad_sequence([item["input_ids"] for item in batch], batch_first=True, padding_value=pad_id) attention_mask = pad_sequence([item["attention_mask"] for item in batch], batch_first=True, padding_value=0) labels = pad_sequence([item["labels"] for item in batch], batch_first=True, padding_value=-100) embeddings = torch.stack([item["embedding"] for item in batch]) return { "input_ids": input_ids, "attention_mask": attention_mask, "labels": labels, "embedding": embeddings, } return collate class BetinaTrainer: def __init__( self, vortex: VortexBetinaAntiHalluc, tokenizer: AutoTokenizer, embedding_model: SentenceTransformer, mlm_model: AutoModelForMaskedLM, raw_embedding_dim: int, embed_dim: int, lambda_vortex: float = 0.5, learning_rate: float = 1e-4, mlm_learning_rate: float = 5e-5, freeze_mlm: bool = False, freeze_projectors: bool = False, correction_max_norm: float | None = None, chaos_factor: float = 1.0, eval_chaos_factor: float = 1.0, device: torch.device | None = None, max_seq_length: int = 512, ): self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") self.vortex = vortex.to(self.device) self.tokenizer = tokenizer self.embedding_model = embedding_model self.mlm_model = mlm_model.to(self.device) self.lambda_vortex = lambda_vortex self.chaos_factor = chaos_factor self.eval_chaos_factor = eval_chaos_factor self.embedding_projector = nn.Linear(raw_embedding_dim, embed_dim).to(self.device) hidden_size = self.mlm_model.config.hidden_size self.correction_projector = nn.Linear(embed_dim, hidden_size).to(self.device) self.freeze_projectors = freeze_projectors self.correction_max_norm = correction_max_norm if correction_max_norm and correction_max_norm > 0 else None self.max_seq_length = max_seq_length mlm_params = list(self.mlm_model.parameters()) if freeze_mlm: for param in mlm_params: param.requires_grad = False projector_params = list(self.embedding_projector.parameters()) + list(self.correction_projector.parameters()) if freeze_projectors: for param in projector_params: param.requires_grad = False trainable_projectors = [] if freeze_projectors else projector_params vortex_params = list(self.vortex.parameters()) + trainable_projectors optimizer_groups = [ {"params": vortex_params, "lr": learning_rate}, ] if not freeze_mlm: optimizer_groups.append({"params": mlm_params, "lr": mlm_learning_rate}) self.optimizer = optim.AdamW(optimizer_groups) self.freeze_mlm = freeze_mlm self.scaler = betina_grad_scaler(self.device.type, enabled=self.device.type == "cuda") def _project_correction(self, delta: torch.Tensor) -> torch.Tensor: correction = self.correction_projector(delta) if self.correction_max_norm is not None: dim = correction.dim() - 1 if correction.dim() > 0 else 0 correction = correction.renorm(p=2, dim=dim, maxnorm=self.correction_max_norm) return correction def train(self, dataloader: DataLoader, epochs: int = 5, grad_clip: float = 1.0) -> List[Dict[str, float]]: history: List[Dict[str, float]] = [] self.vortex.train() self.mlm_model.train() for epoch in range(epochs): for step, batch in enumerate(dataloader): input_ids = batch["input_ids"].to(self.device) attention_mask = batch["attention_mask"].to(self.device) labels = batch["labels"].to(self.device) embeds = batch["embedding"].to(self.device) self.optimizer.zero_grad(set_to_none=True) with betina_autocast(self.device.type, enabled=self.device.type == "cuda"): projected = self.embedding_projector(embeds) _, vortex_loss, metrics, delta = self.vortex(projected, chaos_factor=self.chaos_factor) if self.freeze_mlm: with torch.no_grad(): outputs = self.mlm_model( input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, return_dict=True, ) else: outputs = self.mlm_model( input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, return_dict=True, ) hidden = outputs.hidden_states[-1] correction = self._project_correction(delta).unsqueeze(1) attention_mask_f = attention_mask.unsqueeze(-1).float() mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus corrected_hidden = hidden + correction * weight_mask if hasattr(self.mlm_model, "cls"): logits = self.mlm_model.cls(corrected_hidden) else: logits = self.mlm_model.get_output_embeddings()(corrected_hidden) mask_positions = labels != -100 if mask_positions.any(): mlm_loss = F.cross_entropy(logits[mask_positions], labels[mask_positions]) else: mlm_loss = torch.zeros(1, device=self.device) total_loss = mlm_loss + self.lambda_vortex * vortex_loss self.scaler.scale(total_loss).backward() torch.nn.utils.clip_grad_norm_(self.parameters(), grad_clip) self.scaler.step(self.optimizer) self.scaler.update() perplexity = torch.exp(mlm_loss.detach()) record = { "epoch": epoch + 1, "step": step + 1, "mlm_loss": mlm_loss.detach().item(), "vortex_loss": vortex_loss.detach().item(), "total_loss": total_loss.detach().item(), "perplexity": perplexity.item(), "vortex_energy": metrics["vortex_energy"], "cosine_overlap": metrics["cosine_overlap"], "rotation_ratio": metrics["rotation_ratio"], "approx_alignment": metrics["approx_alignment"], "reflection_ratio": metrics["reflection_ratio"], "boost_ratio": metrics.get("boost_ratio", 0.0), "reflection_push_ratio": metrics.get("reflection_push_ratio", 0.0), } history.append(record) if step % 10 == 0: print( f"Epoch {record['epoch']:03d} Step {record['step']:04d} | " f"Total {record['total_loss']:.4f} | MLM {record['mlm_loss']:.4f} | " f"PPL {record['perplexity']:.4f} | " f"Vortex {record['vortex_loss']:.4f} | Overlap {record['cosine_overlap']:.4f} | " f"Energy {record['vortex_energy']:.4f} | Rotation {record['rotation_ratio']:.3f} | " f"Reflect {record['reflection_ratio']:.3f} | Boost {record['boost_ratio']:.3f} | " f"PreReflect {record['reflection_push_ratio']:.3f} | Approx {record['approx_alignment']:.4f}" ) return history def parameters(self): for module in (self.vortex, self.embedding_projector, self.correction_projector, self.mlm_model): for param in module.parameters(): if param.requires_grad: yield param @torch.no_grad() def evaluate_perplexity(self, dataloader: DataLoader, apply_correction: bool = True) -> float: self.vortex.eval() self.mlm_model.eval() total_loss = 0.0 total_tokens = 0 for batch in dataloader: input_ids = batch["input_ids"].to(self.device) attention_mask = batch["attention_mask"].to(self.device) labels = batch["labels"].to(self.device) embeds = batch["embedding"].to(self.device) outputs = self.mlm_model( input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, return_dict=True, ) hidden = outputs.hidden_states[-1] if apply_correction: projected = self.embedding_projector(embeds) _, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor) correction = self._project_correction(delta).unsqueeze(1) attention_mask_f = attention_mask.unsqueeze(-1).float() mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus hidden = hidden + correction * weight_mask if hasattr(self.mlm_model, "cls"): logits = self.mlm_model.cls(hidden) else: logits = self.mlm_model.get_output_embeddings()(hidden) mask_positions = labels != -100 if mask_positions.any(): loss = F.cross_entropy(logits[mask_positions], labels[mask_positions], reduction="sum") total_loss += loss.item() total_tokens += mask_positions.sum().item() if total_tokens == 0: return float("inf") return math.exp(total_loss / total_tokens) @torch.no_grad() def save(self, output_dir: str) -> None: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) torch.save(self.vortex.state_dict(), output_path / "vortex.pt") torch.save(self.embedding_projector.state_dict(), output_path / "embedding_projector.pt") torch.save(self.correction_projector.state_dict(), output_path / "correction_projector.pt") self.mlm_model.save_pretrained(output_path / "mlm") self.tokenizer.save_pretrained(output_path / "mlm") @torch.no_grad() def fill_masks( self, texts: List[str], top_k: int = 5, apply_correction: bool = True, ) -> List[List[List[Tuple[str, float]]]]: self.vortex.eval() self.mlm_model.eval() encodings = self.tokenizer( texts, return_tensors="pt", padding=True, truncation=True, max_length=self.max_seq_length, ) input_ids = encodings["input_ids"].to(self.device) attention_mask = encodings["attention_mask"].to(self.device) outputs = self.mlm_model( input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, return_dict=True, ) hidden = outputs.hidden_states[-1] if apply_correction: embeds = self.embedding_model.encode(texts, convert_to_tensor=True, show_progress_bar=False).to(self.device) projected = self.embedding_projector(embeds) _, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor) correction = self._project_correction(delta).unsqueeze(1) attention_mask_f = attention_mask.unsqueeze(-1).float() mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus hidden = hidden + correction * weight_mask if hasattr(self.mlm_model, "cls"): logits = self.mlm_model.cls(hidden) else: logits = self.mlm_model.get_output_embeddings()(hidden) mask_positions = (input_ids == self.tokenizer.mask_token_id) results: List[List[List[Tuple[str, float]]]] = [] for batch_index in range(input_ids.size(0)): batch_tokens: List[List[Tuple[str, float]]] = [] positions = mask_positions[batch_index].nonzero(as_tuple=False).view(-1) for position in positions: token_logits = logits[batch_index, position] token_probs = F.softmax(token_logits, dim=-1) topk = torch.topk(token_probs, top_k) decoded: List[Tuple[str, float]] = [] for token_id, prob in zip(topk.indices, topk.values): token = self.tokenizer.decode([token_id]).strip() decoded.append((token, prob.item())) batch_tokens.append(decoded) results.append(batch_tokens) return results def run_demo(embed_dim: int = 4, batch_size: int = 3, seed: int = 123) -> None: torch.manual_seed(seed) model = VortexBetinaAntiHalluc(embed_dim=embed_dim) inputs = torch.randn(batch_size, embed_dim) evolved, loss, metrics, delta = model(inputs) print("Dimensão de entrada:", embed_dim) print("Inputs:", inputs) print("Estado evoluído shape:", evolved.shape) print("Delta shape:", delta.shape) print("Loss:", loss.item()) for key, value in metrics.items(): print(f"{key}: {value}") def sample_sentences() -> List[str]: return [ "O céu de Lisboa estava completamente claro naquela manhã.", "A inteligência coletiva da equipe resolveu o problema rapidamente.", "O gato preto dormia tranquilo sobre o sofá da sala.", "A orquestra executou a sinfonia com uma precisão impressionante.", "Os dados indicam uma redução consistente nas alucinações do modelo.", "A pesquisa científica requer paciência, rigor e curiosidade constante.", "A ponte antiga foi restaurada para preservar o patrimônio cultural.", "O sistema Betina ajusta embeddings para evitar distorções semânticas.", ] def load_sentences_from_args(args: argparse.Namespace) -> List[str]: if args.dataset_file: path = Path(args.dataset_file) if not path.exists(): raise FileNotFoundError(f"Dataset file not found: {path}") sentences = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] if not sentences: raise ValueError(f"Dataset file {path} is empty") return sentences if args.dataset_limit is None else sentences[: args.dataset_limit] if args.dataset_hf: if load_dataset is None: raise ImportError("Install the 'datasets' package to use --dataset-hf option") split = args.dataset_split if args.dataset_limit and ":" not in split: split = f"{split}[:{args.dataset_limit}]" dataset_name = args.dataset_hf config_name: Optional[str] = args.dataset_hf_config or None try: dataset = _safe_load_dataset( dataset_name, config_name, split=split, hf_token=args.hf_token, trust_remote_code=args.trust_remote_code, ) except Exception as exc: message = str(exc).lower() scripts_blocked = "dataset scripts are no longer supported" in message trust_flag_blocked = "trust_remote_code" in message if dataset_name == "wikipedia" and (scripts_blocked or trust_flag_blocked): fallback_name = "wikimedia/wikipedia" fallback_config = config_name or "20231101.pt" print( "Dataset 'wikipedia' agora usa snapshot parquet e não aceita mais scripts remotos." f" Alternando automaticamente para {fallback_name} ({fallback_config})." ) dataset = _safe_load_dataset( fallback_name, fallback_config, split=split, hf_token=args.hf_token, trust_remote_code=False, ) elif scripts_blocked and not args.trust_remote_code: hint = ( "O dataset solicita código remoto. Reexecute com --trust-remote-code para habilitar" " scripts do autor do dataset. Apenas use se confiar na fonte." ) raise RuntimeError(hint) from exc if "gated dataset" in message or "403" in message or "401" in message: hint = ( "Dataset protegido requer autenticação. Informe --hf-token , defina HF_TOKEN/HUGGINGFACE_TOKEN" " ou utilize --hf-token-file apontando para o token salvo pelo huggingface-cli. Também é possível" " executar 'huggingface-cli login' para gerar ~/.cache/huggingface/token.\nVocê pode obter o token em" " https://huggingface.co/settings/tokens." ) raise RuntimeError(hint) from exc raise sentences: List[str] = [] text_field = args.dataset_text_field limit = args.dataset_limit for item in dataset: text = item.get(text_field) if isinstance(text, str) and text.strip(): sentences.append(text.strip()) if limit is not None and len(sentences) >= limit: break if not sentences: raise ValueError("No sentences extracted from the specified dataset") return sentences return sample_sentences() def print_gain_summary( prompts: List[str], base_fills: List[List[List[Tuple[str, float]]]], betina_fills: List[List[List[Tuple[str, float]]]], ) -> None: print("\nResumo de ganhos top-1:") for prompt, base_masks, betina_masks in zip(prompts, base_fills, betina_fills): prompt_head = prompt if len(prompt) <= 60 else f"{prompt[:57]}..." for idx, (base_group, betina_group) in enumerate(zip(base_masks, betina_masks), start=1): if not base_group or not betina_group: continue base_token, base_prob = base_group[0] betina_token, betina_prob = betina_group[0] delta = betina_prob - base_prob if base_prob > 0: rel = delta / base_prob * 100.0 rel_text = f"{rel:+.2f}%" else: rel_text = "n/d" change_desc = "mantido" if betina_token == base_token else f"{base_token} -> {betina_token}" print( f" [{prompt_head}] máscara {idx}: {change_desc} | base {base_prob:.4f} -> betina {betina_prob:.4f} ({rel_text})" ) def _prepare_debug_value(value, max_examples: int): if isinstance(value, torch.Tensor): limited = value.detach().cpu() if limited.dim() >= 1: limited = limited[:max_examples] return limited.tolist() if isinstance(value, dict): return {key: _prepare_debug_value(val, max_examples) for key, val in value.items()} if isinstance(value, (list, tuple)): return [_prepare_debug_value(item, max_examples) for item in value] if isinstance(value, (float, int, str)) or value is None: return value return str(value) def dump_square_debug(flow_states: Dict[str, object], metrics: Dict[str, float], output_path: str, max_examples: int = 1) -> Path: output = Path(output_path).expanduser() payload = { "max_examples": max(1, max_examples), "metrics": {key: float(value) if isinstance(value, (int, float)) else value for key, value in metrics.items()}, "flow_states": {key: _prepare_debug_value(val, max_examples) for key, val in flow_states.items()}, } output.parent.mkdir(parents=True, exist_ok=True) output.write_text(json.dumps(payload, indent=2), encoding="utf-8") return output def main(argv: list[str] | None = None): parser = argparse.ArgumentParser(description="Treinamento do modelo Betina anti-hallucination") parser.add_argument("--train", action="store_true", help="Executa treinamento completo") parser.add_argument("--epochs", type=int, default=5, help="Número de épocas para treinar") parser.add_argument("--batch-size", type=int, default=4, help="Tamanho do batch") parser.add_argument("--embed-dim", type=int, default=256, choices=[128, 256], help="Dimensão interna do vórtice") parser.add_argument("--lambda-vortex", type=float, default=0.1, help="Peso da loss do vórtice") parser.add_argument("--chaos-factor", type=float, default=1.0, help="Fator de caos aplicado durante o treinamento") parser.add_argument( "--eval-chaos-factor", type=float, default=1.0, help="Fator de caos usado em avaliação e inferência (permite medir diferentes regimes)", ) parser.add_argument("--learning-rate", type=float, default=1e-4, help="Learning rate para o vórtice e projetores") parser.add_argument("--mlm-learning-rate", type=float, default=5e-5, help="Learning rate para o modelo de linguagem") parser.add_argument("--freeze-mlm", action="store_true", help="Congela os pesos do modelo de linguagem durante o treino") parser.add_argument("--freeze-projectors", action="store_true", help="Congela os projetores de embedding/correção (modo inferência)") parser.add_argument( "--correction-max-norm", type=float, default=None, help="Clampa o vetor de correção Betina a esta norma L2 (<=0 desativa)", ) parser.add_argument("--output-dir", type=str, default="outputs/betina_vortex", help="Diretório para salvar o modelo") parser.add_argument("--device", type=str, default=None, help="Força execução em cuda ou cpu") parser.add_argument("--top-k", type=int, default=5, help="Top-k para avaliação de máscara") parser.add_argument("--skip-eval", action="store_true", help="Pula avaliação pós-treino") parser.add_argument("--eval-prompts", nargs="*", default=None, help="Prompts personalizados contendo [MASK] para avaliação") parser.add_argument("--dataset-file", type=str, default=None, help="Arquivo de texto com uma sentença por linha") parser.add_argument("--dataset-hf", type=str, default=None, help="Nome do dataset Hugging Face, ex.: oscar") parser.add_argument("--dataset-hf-config", type=str, default=None, help="Config do dataset Hugging Face, ex.: unshuffled_deduplicated_pt") parser.add_argument("--dataset-split", type=str, default="train[:1000]", help="Split do dataset Hugging Face") parser.add_argument("--dataset-text-field", type=str, default="text", help="Campo de texto no dataset Hugging Face") parser.add_argument("--dataset-limit", type=int, default=None, help="Limite de sentenças carregadas") parser.add_argument("--hf-token", type=str, default=None, help="Token de autenticação da Hugging Face (ou defina HF_TOKEN)") parser.add_argument( "--hf-token-file", type=str, default=None, help="Arquivo contendo o token da Hugging Face (padrão: ~/.cache/huggingface/token)", ) parser.add_argument("--trust-remote-code", action="store_true", help="Permite datasets com script remoto (requer confiança no autor)") parser.add_argument("--force-download", action="store_true", help="Força novo download dos pesos do modelo de linguagem") parser.add_argument("--disable-rotation", action="store_true", help="Desativa a rotação do delta do vórtice") parser.add_argument("--rotation-angle", type=float, default=math.pi / 4, help="Ângulo (rad) para rotacionar o delta quando ativado") parser.add_argument("--rotation-threshold", type=float, default=1e-4, help="Norma mínima do delta para aplicar rotação") parser.add_argument("--rotation-clockwise", action="store_true", help="Força rotação horária (inverte o sinal do ângulo)") parser.add_argument( "--enable-quadratic-reflection", action="store_true", help="Ativa reflexão quadrática estilo bolinha/bastão no delta do vórtice", ) parser.add_argument( "--quadratic-boundary", type=float, default=1.0, help="Magnitudes acima desse valor são refletidas (parede virtual)", ) parser.add_argument( "--quadratic-strength", type=float, default=0.5, help="Mistura (0-1) entre o delta original e o refletido quadrático", ) parser.add_argument( "--disable-triangle", action="store_true", help="Desativa o neurônio sintético triangular que confronta X, Y e contrabase", ) parser.add_argument( "--triangle-hidden-dim", type=int, default=512, help="Dimensão oculta usada dentro do neurônio triangular", ) parser.add_argument( "--triangle-max-iters", type=int, default=5, help="Iterações máximas de refinamento (porquês) do triângulo", ) parser.add_argument( "--triangle-tol", type=float, default=1e-4, help="Tolerância para encerrar refinamento triangular (quanto menor, mais perguntas)", ) parser.add_argument( "--triangle-delta-gain", type=float, default=1.0, help="Ganho aplicado ao eixo integrador do triângulo", ) parser.add_argument( "--disable-square-geometry", action="store_true", help="Desativa o giro de 180° que forma o quadrado X↔X⁻¹", ) parser.add_argument( "--square-rotation-degrees", type=float, default=180.0, help="Ângulo aplicado ao girar a matriz completa (180° gera o quadrado perfeito)", ) parser.add_argument( "--square-leak-ratio", type=float, default=0.05, help="Mistura o quadrado com a matriz original (0 mantém oposição perfeita, 1 ignora o giro)", ) parser.add_argument( "--square-jitter-std-deg", type=float, default=0.0, help="Desvio padrão em graus para injetar ruído aleatório no giro quadrado", ) parser.add_argument( "--square-debug-json", type=str, default=None, help="Se definido, salva um dump JSON com as matrizes primária/secundária e métricas do vórtice", ) parser.add_argument( "--square-debug-max", type=int, default=1, help="Número máximo de exemplos incluídos no dump quadrado", ) parser.add_argument( "--enable-lorentz-transform", action="store_true", help="Aplica transformação de Lorentz no delta final para medir o resultado físico", ) parser.add_argument( "--lorentz-beta", type=float, default=0.6, help="Fração da velocidade da luz usada no boost de Lorentz", ) parser.add_argument( "--lorentz-axis-stream", type=int, default=0, help="Stream usado como eixo espacial (0=X, 1=Y, etc.) na transformação de Lorentz", ) parser.add_argument( "--ia-config", type=str, default=None, help="Arquivo JSON descrevendo quais IAs assumem cada estágio/stream do fluxo matriz", ) parser.add_argument( "--refinement-cycles", type=int, default=0, help="Quantidade de ciclos circundantes de refinamento IA aplicados ao delta final", ) parser.add_argument( "--cycle-stage-name", type=str, default="cycle", help="Nome do estágio IA usado durante cada ciclo circundante", ) parser.add_argument("--max-seq-length", type=int, default=512, help="Comprimento máximo de tokens por exemplo (padrão BERT)") parser.add_argument("--precompute-embeddings", action="store_true", help="Codifica todas as sentenças antes do treino (requer muita RAM)") parser.add_argument("--embedding-batch-size", type=int, default=64, help="Batch interno para geração de embeddings (encode)") if argv is None: argv_list = sys.argv[1:] if "ipykernel" in sys.modules: filtered: List[str] = [] skip_next = False for item in argv_list: if skip_next: skip_next = False continue if item == "-f": skip_next = True continue if item.startswith("-f="): continue filtered.append(item) argv_list = filtered else: argv_list = list(argv) parsed, unknown = parser.parse_known_args(argv_list) if unknown: print(f"Ignorando argumentos desconhecidos: {unknown}") args = parsed args.hf_token, token_source = resolve_hf_token(args.hf_token, args.hf_token_file) if args.hf_token and token_source: print(f"Token Hugging Face detectado via {token_source}.") device = torch.device(args.device) if args.device else torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Usando dispositivo: {device}") embedding_model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" tokenizer_name = "neuralmind/bert-base-portuguese-cased" embedding_model = SentenceTransformer(embedding_model_name, device=str(device) if device.type == "cuda" else "cpu") tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token or tokenizer.sep_token or tokenizer.cls_token try: mlm_model = AutoModelForMaskedLM.from_pretrained(tokenizer_name, force_download=args.force_download) except Exception as exc: print(f"Download failed: {exc}. Try checking internet or cache.") raise sentences = load_sentences_from_args(args) dataset = PortugueseSentenceDataset( sentences, tokenizer, embedding_model, max_seq_length=args.max_seq_length, precompute_embeddings=args.precompute_embeddings, embedding_batch_size=args.embedding_batch_size, ) collate_fn = build_collate_fn(tokenizer) dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn) eval_dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn) stream_aliases: Optional[List[str]] = None ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None refinement_cycles = args.refinement_cycles config_cycle_stage = args.cycle_stage_name if args.ia_config: ia_config_data = load_ia_config_file(args.ia_config, args.embed_dim) print(f"Config IA carregada de {args.ia_config}") stream_aliases = ia_config_data.get("stream_aliases") ia_stage_config = ia_config_data.get("stage_config") # type: ignore[assignment] config_cycles = ia_config_data.get("refinement_cycles") if isinstance(config_cycles, int): refinement_cycles = config_cycles config_cycle_stage = str(ia_config_data.get("cycle_stage_name", config_cycle_stage)) vortex = VortexBetinaAntiHalluc( embed_dim=args.embed_dim, enable_rotation=not args.disable_rotation, rotation_angle=args.rotation_angle, rotation_threshold=args.rotation_threshold, rotation_clockwise=args.rotation_clockwise, enable_quadratic_reflection=args.enable_quadratic_reflection, quadratic_boundary=args.quadratic_boundary, quadratic_strength=args.quadratic_strength, stream_aliases=stream_aliases, ia_stage_config=ia_stage_config, refinement_cycles=refinement_cycles, cycle_stage_name=config_cycle_stage, enforce_square_geometry=not args.disable_square_geometry, square_rotation_degrees=args.square_rotation_degrees, square_leak_ratio=args.square_leak_ratio, square_jitter_std_degrees=args.square_jitter_std_deg, enable_lorentz_transform=args.enable_lorentz_transform, lorentz_beta=args.lorentz_beta, lorentz_axis_stream=args.lorentz_axis_stream, enable_triangle=not args.disable_triangle, triangle_hidden_dim=args.triangle_hidden_dim, triangle_max_iters=args.triangle_max_iters, triangle_tol=args.triangle_tol, triangle_delta_gain=args.triangle_delta_gain, ) trainer = BetinaTrainer( vortex=vortex, tokenizer=tokenizer, embedding_model=embedding_model, mlm_model=mlm_model, raw_embedding_dim=embedding_model.get_sentence_embedding_dimension(), embed_dim=args.embed_dim, lambda_vortex=args.lambda_vortex, learning_rate=args.learning_rate, mlm_learning_rate=args.mlm_learning_rate, freeze_mlm=args.freeze_mlm, freeze_projectors=args.freeze_projectors, correction_max_norm=args.correction_max_norm, eval_chaos_factor=args.eval_chaos_factor, chaos_factor=args.chaos_factor, device=device, max_seq_length=args.max_seq_length, ) if args.square_debug_json: square_max = max(1, args.square_debug_max) try: sample_batch = next(iter(dataloader)) except StopIteration as exc: # pragma: no cover - dataset vazio raise RuntimeError("Não é possível gerar dump quadrado: dataset vazio") from exc sample_embeddings = sample_batch["embedding"][:square_max].to(device) with torch.no_grad(): projected = trainer.embedding_projector(sample_embeddings) _, _, metrics_debug, _ = trainer.vortex(projected, chaos_factor=trainer.eval_chaos_factor) dump_square_debug( trainer.vortex.last_flow_states, metrics_debug, args.square_debug_json, max_examples=min(square_max, sample_embeddings.size(0)), ) print(f"Dump quadrado salvo em {args.square_debug_json}") if args.train: print("Iniciando treinamento...") history = trainer.train(dataloader, epochs=args.epochs) print(f"Treinamento finalizado com {len(history)} passos") trainer.save(args.output_dir) print(f"Modelos salvos em {args.output_dir}") if not args.skip_eval: ppl_base = trainer.evaluate_perplexity(eval_dataloader, apply_correction=False) ppl_betina = trainer.evaluate_perplexity(eval_dataloader, apply_correction=True) print(f"\nPerplexity sem correção: {ppl_base:.4f}") print(f"Perplexity com correção Betina: {ppl_betina:.4f}") ppl_delta = ppl_base - ppl_betina if ppl_base > 0: ppl_rel = ppl_delta / ppl_base * 100.0 print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: {ppl_rel:+.2f}%") else: print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: n/d") eval_prompts = args.eval_prompts or [ "O modelo Betina evita [MASK] durante a geração.", "A capital de Portugal é [MASK].", "A IA Betina corrige [MASK] via vórtice.", "O vórtice no Betina filtra [MASK] para reduzir alucinações.", ] print("\nPreenchimento sem correção:") base_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=False) for prompt, tokens in zip(eval_prompts, base_fills): print(prompt) for idx, group in enumerate(tokens, start=1): formatted = [f"{token} ({prob:.4f})" for token, prob in group] print(f" Mascara {idx}: {formatted}") print("\nPreenchimento com correção Betina:") betina_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=True) for prompt, tokens in zip(eval_prompts, betina_fills): print(prompt) for idx, group in enumerate(tokens, start=1): formatted = [f"{token} ({prob:.4f})" for token, prob in group] print(f" Mascara {idx}: {formatted}") print_gain_summary(eval_prompts, base_fills, betina_fills) if __name__ == "__main__": main()