Spaces:
Sleeping
Sleeping
| import argparse | |
| import json | |
| import math | |
| import os | |
| import sys | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Callable, Dict, List, Optional, Tuple | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torch.optim as optim | |
| from contextlib import nullcontext | |
| from torch.nn.utils.rnn import pad_sequence | |
| from torch.utils.data import DataLoader, Dataset | |
| try: | |
| from torch.amp import autocast as _autocast, GradScaler as _GradScaler | |
| def betina_autocast(device_type: str, enabled: bool = True): | |
| if not enabled or device_type != "cuda": | |
| return nullcontext() | |
| return _autocast(device_type=device_type, enabled=enabled) | |
| def betina_grad_scaler(device_type: str, enabled: bool = True): | |
| if not enabled or device_type != "cuda": | |
| return _NoOpGradScaler() | |
| return _GradScaler(device_type=device_type, enabled=enabled) | |
| except ImportError: # pragma: no cover | |
| from torch.cuda.amp import autocast as _autocast, GradScaler as _GradScaler | |
| def betina_autocast(device_type: str, enabled: bool = True): | |
| if not enabled or device_type != "cuda": | |
| return nullcontext() | |
| return _autocast(enabled=enabled) | |
| def betina_grad_scaler(device_type: str, enabled: bool = True): | |
| if not enabled or device_type != "cuda": | |
| return _NoOpGradScaler() | |
| return _GradScaler(enabled=enabled) | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except ImportError as exc: # pragma: no cover | |
| raise ImportError("Install sentence-transformers to run the Betina pipeline") from exc | |
| try: | |
| from transformers import AutoModelForMaskedLM, AutoTokenizer | |
| except ImportError as exc: # pragma: no cover | |
| raise ImportError("Install transformers to run the Betina pipeline") from exc | |
| try: | |
| from datasets import load_dataset # type: ignore[import-not-found] | |
| except ImportError: # pragma: no cover | |
| load_dataset = None | |
| def _safe_load_dataset( | |
| path: str, | |
| name: Optional[str], | |
| *, | |
| split: str, | |
| hf_token: Optional[str], | |
| trust_remote_code: bool, | |
| ): | |
| if load_dataset is None: | |
| raise ImportError("Install the 'datasets' package to use Hugging Face corpora") | |
| base_kwargs = {"split": split, "trust_remote_code": trust_remote_code} | |
| attempts: List[Dict[str, Optional[str]]] = [] | |
| if hf_token: | |
| attempts.append({"token": hf_token}) | |
| attempts.append({"use_auth_token": hf_token}) | |
| attempts.append({}) | |
| last_error: Optional[Exception] = None | |
| for extra in attempts: | |
| try: | |
| return load_dataset(path, name, **base_kwargs, **extra) | |
| except TypeError as err: | |
| last_error = err | |
| continue | |
| except ValueError as err: | |
| if "use_auth_token" in str(err).lower(): | |
| last_error = err | |
| continue | |
| raise | |
| if last_error: | |
| raise last_error | |
| raise RuntimeError(f"Falha ao carregar dataset {path}/{name}") | |
| def _read_hf_token_file(path: Path) -> Optional[str]: | |
| try: | |
| content = path.read_text(encoding="utf-8").strip() | |
| except OSError: | |
| return None | |
| if not content: | |
| return None | |
| first_line = content.splitlines()[0].strip() | |
| return first_line or None | |
| def resolve_hf_token(explicit_token: Optional[str], token_file: Optional[str]) -> Tuple[Optional[str], Optional[str]]: | |
| """Resolve o token HF preferindo argumento, env vars e arquivo do huggingface-cli.""" | |
| if explicit_token and explicit_token.strip(): | |
| return explicit_token.strip(), "--hf-token" | |
| env_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") | |
| if env_token and env_token.strip(): | |
| return env_token.strip(), "env" | |
| file_candidates: List[Path] = [] | |
| if token_file: | |
| file_candidates.append(Path(token_file).expanduser()) | |
| else: | |
| hf_home = os.getenv("HF_HOME") | |
| if hf_home: | |
| file_candidates.append(Path(hf_home).expanduser() / "token") | |
| file_candidates.append(Path.home() / ".cache" / "huggingface" / "token") | |
| file_candidates.append(Path.home() / ".huggingface" / "token") | |
| for candidate in file_candidates: | |
| token = _read_hf_token_file(candidate) | |
| if token: | |
| return token, str(candidate) | |
| return None, None | |
| class _NoOpGradScaler: | |
| def __init__(self): | |
| pass | |
| def scale(self, loss): | |
| return loss | |
| def step(self, optimizer): | |
| optimizer.step() | |
| def update(self): | |
| pass | |
| def unscale_(self, optimizer): | |
| pass | |
| def state_dict(self): | |
| return {} | |
| def load_state_dict(self, state): | |
| pass | |
| class CallableAgentAdapter(nn.Module): | |
| def __init__(self, fn: Callable[[torch.Tensor], torch.Tensor], name: str): | |
| super().__init__() | |
| self.fn = fn | |
| self.agent_name = name or getattr(fn, "__name__", "callable_agent") | |
| def forward(self, tensor: torch.Tensor) -> torch.Tensor: # noqa: D401 | |
| return self.fn(tensor) | |
| class MultiIntelligenceRouter(nn.Module): | |
| """Despacha cada estágio do fluxo matriz para IAs distintas por stream/etapa.""" | |
| def __init__( | |
| self, | |
| num_streams: int, | |
| *, | |
| stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, | |
| stream_aliases: Optional[List[str]] = None, | |
| ): | |
| super().__init__() | |
| self.num_streams = num_streams | |
| self.stream_aliases = stream_aliases or [f"S{idx}" for idx in range(num_streams)] | |
| self.stage_modules = nn.ModuleDict() | |
| self.stage_logs: Dict[str, List[Dict[str, str]]] = {} | |
| if stage_config: | |
| self.apply_stage_config(stage_config) | |
| def apply_stage_config(self, stage_config: Dict[str, Dict[str, nn.Module]]) -> None: | |
| for stage_name, mapping in stage_config.items(): | |
| module_dict = nn.ModuleDict() | |
| for key, module in mapping.items(): | |
| module_dict[str(key)] = self._wrap_module(stage_name, key, module) | |
| self.stage_modules[stage_name] = module_dict | |
| def register_stage(self, stage_name: str, mapping: Dict[str, nn.Module]) -> None: | |
| module_dict = nn.ModuleDict() | |
| for key, module in mapping.items(): | |
| module_dict[str(key)] = self._wrap_module(stage_name, key, module) | |
| self.stage_modules[stage_name] = module_dict | |
| def _wrap_module(self, stage: str, key: str | int, module: nn.Module | Callable) -> nn.Module: | |
| if isinstance(module, list): | |
| wrapped = [self._wrap_module(stage, f"{key}_{idx}", item) for idx, item in enumerate(module)] | |
| seq = nn.Sequential(*wrapped) | |
| if not hasattr(seq, "agent_name"): | |
| seq.agent_name = f"seq_{stage}_{key}" | |
| return seq | |
| if isinstance(module, nn.Module): | |
| if not hasattr(module, "agent_name"): | |
| module.agent_name = module.__class__.__name__ | |
| return module | |
| if callable(module): | |
| name = getattr(module, "agent_name", None) or getattr(module, "__name__", f"{stage}_{key}_fn") | |
| return CallableAgentAdapter(module, name) | |
| raise TypeError(f"Módulo IA inválido para estágio {stage}/{key}: {type(module)}") | |
| def _select_module(self, stage_dict: nn.ModuleDict, key: str | int | None) -> Optional[nn.Module]: | |
| if key is not None: | |
| candidate_key = str(key) | |
| if candidate_key in stage_dict: | |
| return stage_dict[candidate_key] | |
| for fallback in ("*", "default", "-1"): | |
| if fallback in stage_dict: | |
| return stage_dict[fallback] | |
| return None | |
| def apply(self, stage: str, tensor: torch.Tensor, *, cycle_idx: Optional[int] = None) -> torch.Tensor: | |
| stage_dict = self.stage_modules[stage] if stage in self.stage_modules else None | |
| log: List[Dict[str, str]] = [] | |
| if stage_dict is None: | |
| self.stage_logs[stage] = log | |
| return tensor | |
| if tensor.dim() == 3: | |
| outputs = [] | |
| for stream_idx in range(tensor.size(1)): | |
| module = self._select_module(stage_dict, stream_idx) | |
| if module is None and stream_idx < len(self.stream_aliases): | |
| module = self._select_module(stage_dict, self.stream_aliases[stream_idx]) | |
| chunk = tensor[:, stream_idx, :] | |
| if module is not None: | |
| chunk = module(chunk) | |
| log.append( | |
| { | |
| "stream": self.stream_aliases[stream_idx] if stream_idx < len(self.stream_aliases) else str(stream_idx), | |
| "agent": getattr(module, "agent_name", module.__class__.__name__), | |
| } | |
| ) | |
| outputs.append(chunk) | |
| stacked = torch.stack(outputs, dim=1) | |
| self.stage_logs[stage] = log | |
| return stacked | |
| module = None | |
| if cycle_idx is not None: | |
| module = self._select_module(stage_dict, cycle_idx) | |
| if module is None: | |
| module = self._select_module(stage_dict, "global") | |
| if module is None: | |
| module = self._select_module(stage_dict, None) | |
| if module is None: | |
| self.stage_logs[stage] = log | |
| return tensor | |
| updated = module(tensor) | |
| alias = f"cycle_{cycle_idx}" if cycle_idx is not None else "global" | |
| log.append({"stream": alias, "agent": getattr(module, "agent_name", module.__class__.__name__)}) | |
| self.stage_logs[stage] = log | |
| return updated | |
| def stage_signature(self, stage: str) -> str: | |
| logs = self.stage_logs.get(stage, []) | |
| if not logs: | |
| return "identity" | |
| return " | ".join(f"{entry['stream']}→{entry['agent']}" for entry in logs) | |
| def describe_all_stages(self) -> Dict[str, List[Dict[str, str]]]: | |
| return {stage: list(entries) for stage, entries in self.stage_logs.items()} | |
| class SyntheticNeuronTriangle(nn.Module): | |
| """Refina deltas considerando uma base triangular (X,Y,contrabase).""" | |
| def __init__( | |
| self, | |
| embed_dim: int, | |
| num_streams: int, | |
| *, | |
| hidden_dim: int = 512, | |
| max_iters: int = 5, | |
| tol: float = 1e-4, | |
| delta_gain: float = 1.0, | |
| ) -> None: | |
| super().__init__() | |
| self.embed_dim = embed_dim | |
| self.num_streams = num_streams | |
| self.max_iters = max(0, max_iters) | |
| self.tol = max(1e-6, tol) | |
| self.delta_gain = float(delta_gain) | |
| seed_in = embed_dim * 4 # X, Y, contrabase média, diagonal | |
| refine_in = embed_dim * 3 # delta médio, eixo integrado, diagonal | |
| self.seed_proj = nn.Sequential( | |
| nn.LayerNorm(seed_in), | |
| nn.Linear(seed_in, hidden_dim), | |
| nn.GELU(), | |
| nn.Linear(hidden_dim, embed_dim), | |
| ) | |
| self.refine_proj = nn.Sequential( | |
| nn.LayerNorm(refine_in), | |
| nn.Linear(refine_in, hidden_dim), | |
| nn.SiLU(), | |
| nn.Linear(hidden_dim, embed_dim), | |
| ) | |
| def forward( | |
| self, | |
| mat_primary: torch.Tensor, | |
| mat_secondary: torch.Tensor, | |
| base_core: torch.Tensor, | |
| ) -> Tuple[torch.Tensor, Dict[str, object]]: | |
| batch, streams, dim = mat_primary.shape | |
| x_stream = mat_primary[:, 0, :] | |
| y_stream = mat_primary[:, 1, :] if streams > 1 else mat_secondary[:, 0, :] | |
| contra_stream = mat_secondary[:, 0, :] | |
| contra_mean = mat_secondary.mean(dim=1) | |
| base_center = torch.stack([x_stream, y_stream], dim=1).mean(dim=1) | |
| diag = torch.stack([x_stream - contra_stream, y_stream - contra_mean], dim=1).mean(dim=1) | |
| seed_features = torch.cat([base_center, contra_mean, base_core.squeeze(1), diag], dim=-1) | |
| axis_vector = torch.tanh(self.seed_proj(seed_features)) | |
| axis_norm = F.normalize(axis_vector, dim=-1) | |
| base_delta = mat_primary - base_core | |
| stream_align = torch.sum( | |
| F.normalize(mat_primary, dim=-1) * axis_norm.unsqueeze(1), | |
| dim=-1, | |
| keepdim=True, | |
| ) | |
| axis_component = axis_norm.unsqueeze(1) * stream_align | |
| delta = base_delta + self.delta_gain * axis_component | |
| iterations = 0 | |
| last_change = torch.zeros(1, device=mat_primary.device) | |
| if self.max_iters > 0: | |
| for idx in range(self.max_iters): | |
| delta_mean = delta.mean(dim=1) | |
| refine_inp = torch.cat([delta_mean, axis_vector, diag], dim=-1) | |
| refine = torch.tanh(self.refine_proj(refine_inp)).unsqueeze(1) | |
| delta = delta + refine | |
| last_change = refine.norm(dim=-1).mean() | |
| iterations = idx + 1 | |
| if last_change.item() < self.tol: | |
| break | |
| debug = { | |
| "axis": axis_vector.detach(), | |
| "diag": diag.detach(), | |
| "iterations": iterations, | |
| "residual": float(last_change.detach().item()), | |
| } | |
| return delta, debug | |
| def build_builtin_agent(name: str, embed_dim: int) -> nn.Module: | |
| normalized = name.strip().lower() | |
| if normalized in {"brock", "brockman", "brock ia"}: | |
| module = nn.Sequential( | |
| nn.LayerNorm(embed_dim), | |
| nn.Linear(embed_dim, embed_dim * 2), | |
| nn.GELU(), | |
| nn.Linear(embed_dim * 2, embed_dim), | |
| ) | |
| elif normalized in {"chatgpt", "chatgpt 5.1", "chatgpt5.1", "gpt51"}: | |
| module = nn.Sequential( | |
| nn.LayerNorm(embed_dim), | |
| nn.Linear(embed_dim, embed_dim), | |
| nn.SiLU(), | |
| nn.Linear(embed_dim, embed_dim), | |
| ) | |
| elif normalized in {"code", "code ia", "coder"}: | |
| module = nn.Sequential( | |
| nn.LayerNorm(embed_dim), | |
| nn.Linear(embed_dim, embed_dim), | |
| ) | |
| elif normalized in {"critic", "mirror", "refiner"}: | |
| module = nn.Sequential( | |
| nn.LayerNorm(embed_dim), | |
| nn.Linear(embed_dim, embed_dim * 3 // 2), | |
| nn.GELU(), | |
| nn.Linear(embed_dim * 3 // 2, embed_dim), | |
| nn.LayerNorm(embed_dim), | |
| ) | |
| elif normalized in {"identity", "none"}: | |
| module = nn.Identity() | |
| else: | |
| raise ValueError(f"Agente IA desconhecido: {name}") | |
| module.agent_name = name | |
| return module | |
| def _build_custom_agent(agent_def: Dict[str, object], embed_dim: int) -> nn.Module: | |
| if "style" in agent_def: | |
| module = build_builtin_agent(str(agent_def["style"]), embed_dim) | |
| module.agent_name = str(agent_def.get("name", agent_def["style"])) | |
| return module | |
| agent_type = str(agent_def.get("type", "mlp")).lower() | |
| hidden = int(agent_def.get("hidden", embed_dim * 2)) | |
| dropout = float(agent_def.get("dropout", 0.0)) | |
| if agent_type == "mlp": | |
| layers: List[nn.Module] = [ | |
| nn.LayerNorm(embed_dim), | |
| nn.Linear(embed_dim, hidden), | |
| nn.GELU(), | |
| ] | |
| if dropout > 0: | |
| layers.append(nn.Dropout(dropout)) | |
| layers.append(nn.Linear(hidden, embed_dim)) | |
| module = nn.Sequential(*layers) | |
| elif agent_type == "linear": | |
| module = nn.Sequential(nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim)) | |
| else: | |
| raise ValueError(f"Tipo de agente custom '{agent_type}' não suportado") | |
| module.agent_name = str(agent_def.get("name", agent_type)) | |
| return module | |
| def load_ia_config_file(path: str, embed_dim: int) -> Dict[str, object]: | |
| data = json.loads(Path(path).read_text(encoding="utf-8")) | |
| stage_entries = data.get("stages", {}) | |
| if not isinstance(stage_entries, dict): | |
| raise ValueError("Campo 'stages' do arquivo IA precisa ser um objeto mapeando estágio→streams") | |
| stage_config: Dict[str, Dict[str, nn.Module]] = {} | |
| for stage, mapping in stage_entries.items(): | |
| if not isinstance(mapping, dict): | |
| raise ValueError(f"Estágio '{stage}' precisa mapear streams para agentes") | |
| stage_config[stage] = {} | |
| for stream_key, agent_def in mapping.items(): | |
| if isinstance(agent_def, str): | |
| module = build_builtin_agent(agent_def, embed_dim) | |
| elif isinstance(agent_def, dict): | |
| module = _build_custom_agent(agent_def, embed_dim) | |
| else: | |
| raise ValueError(f"Agente inválido para estágio '{stage}' stream '{stream_key}'") | |
| stage_config[stage][str(stream_key)] = module | |
| stream_aliases = data.get("stream_aliases") | |
| if stream_aliases is not None and (not isinstance(stream_aliases, list) or not all(isinstance(x, str) for x in stream_aliases)): | |
| raise ValueError("'stream_aliases' deve ser uma lista de strings") | |
| return { | |
| "stream_aliases": stream_aliases, | |
| "stage_config": stage_config, | |
| "refinement_cycles": int(data.get("refinement_cycles", 0)), | |
| "cycle_stage_name": str(data.get("cycle_stage_name", "cycle")), | |
| } | |
| # -------------------------- | |
| # Lorenz attractor generator (discrete) | |
| # -------------------------- | |
| def lorenz_step(x, y, z, sigma=10.0, rho=28.0, beta=8/3, dt=0.01): | |
| dx = sigma * (y - x) | |
| dy = x * (rho - z) - y | |
| dz = x * y - beta * z | |
| xn = x + dx * dt | |
| yn = y + dy * dt | |
| zn = z + dz * dt | |
| return xn, yn, zn | |
| def lorenz_sequence(length, init=(0.1, 0.0, 0.0), sigma=10.0, rho=28.0, beta=8/3, dt=0.01): | |
| x, y, z = init | |
| seq = [] | |
| for _ in range(length): | |
| x, y, z = lorenz_step(x, y, z, sigma, rho, beta, dt) | |
| seq.append((x, y, z)) | |
| return np.array(seq) # shape (length, 3) | |
| # -------------------------- | |
| # Rössler attractor (alternative chaos source) | |
| # -------------------------- | |
| def rossler_step(x, y, z, a=0.2, b=0.2, c=5.7, dt=0.01): | |
| dx = -y - z | |
| dy = x + a * y | |
| dz = b + z * (x - c) | |
| xn = x + dx * dt | |
| yn = y + dy * dt | |
| zn = z + dz * dt | |
| return xn, yn, zn | |
| def rossler_sequence(length, init=(0.1, 0.0, 0.0), a=0.2, b=0.2, c=5.7, dt=0.01): | |
| x, y, z = init | |
| seq = [] | |
| for _ in range(length): | |
| x, y, z = rossler_step(x, y, z, a, b, c, dt) | |
| seq.append((x, y, z)) | |
| return np.array(seq) | |
| # -------------------------- | |
| # Terminal Velocity Matching (Flow Matching inspired) | |
| # -------------------------- | |
| def compute_terminal_velocity(embeddings: torch.Tensor, target_distribution: str = "gaussian") -> torch.Tensor: | |
| """ | |
| Calcula a velocidade terminal para mover embeddings em direção a uma distribuição alvo. | |
| Inspirado em Flow Matching / Rectified Flow. | |
| """ | |
| batch_size, dim = embeddings.shape | |
| if target_distribution == "gaussian": | |
| # Alvo: Gaussiana isotrópica padrão | |
| target = torch.randn_like(embeddings) * 0.1 | |
| elif target_distribution == "uniform_sphere": | |
| # Alvo: superfície de esfera unitária | |
| target = F.normalize(torch.randn_like(embeddings), dim=-1) | |
| else: | |
| target = torch.zeros_like(embeddings) | |
| # Velocidade = direção do alvo - posição atual (normalizada) | |
| velocity = target - embeddings | |
| velocity = F.normalize(velocity, dim=-1) * embeddings.norm(dim=-1, keepdim=True) * 0.1 | |
| return velocity | |
| # -------------------------- | |
| # Spectral Energy Regularization | |
| # -------------------------- | |
| def spectral_energy_loss(embeddings: torch.Tensor, target_rank: int = 32) -> torch.Tensor: | |
| """ | |
| Penaliza energia concentrada em poucos componentes principais. | |
| Força distribuição mais uniforme do espectro singular. | |
| """ | |
| if embeddings.shape[0] < 2: | |
| return torch.tensor(0.0, device=embeddings.device) | |
| centered = embeddings - embeddings.mean(dim=0, keepdim=True) | |
| # SVD para obter valores singulares | |
| try: | |
| _, s, _ = torch.linalg.svd(centered, full_matrices=False) | |
| except RuntimeError: | |
| return torch.tensor(0.0, device=embeddings.device) | |
| # Normaliza para distribuição de energia | |
| s_normalized = s / (s.sum() + 1e-8) | |
| # Entropia do espectro (queremos maximizar = distribuição uniforme) | |
| spectral_entropy = -(s_normalized * (s_normalized + 1e-8).log()).sum() | |
| # Penalidade: quanto menor a entropia, maior a penalidade | |
| max_entropy = math.log(min(embeddings.shape[0], embeddings.shape[1])) | |
| return (max_entropy - spectral_entropy) / max_entropy | |
| # -------------------------- | |
| # Semantic Divergence Metrics | |
| # -------------------------- | |
| def compute_angular_divergence(original: torch.Tensor, perturbed: torch.Tensor) -> float: | |
| """Calcula a divergência angular média entre vetores originais e perturbados.""" | |
| cos_sim = F.cosine_similarity(original, perturbed, dim=-1) | |
| # Clamp para evitar problemas numéricos com acos | |
| cos_sim = cos_sim.clamp(-1.0, 1.0) | |
| angles = torch.acos(cos_sim) | |
| return angles.mean().item() | |
| def compute_semantic_entropy(logits: torch.Tensor, top_k: int = 10) -> float: | |
| """Calcula a entropia semântica da distribuição de probabilidade.""" | |
| probs = F.softmax(logits, dim=-1) | |
| top_probs, _ = torch.topk(probs, min(top_k, probs.shape[-1]), dim=-1) | |
| # Renormaliza top-k | |
| top_probs = top_probs / (top_probs.sum(dim=-1, keepdim=True) + 1e-8) | |
| entropy = -(top_probs * (top_probs + 1e-8).log()).sum(dim=-1) | |
| return entropy.mean().item() | |
| # -------------------------- | |
| # SIGReg-like regularizer (encourage isotropic Gaussian embeddings) | |
| # -------------------------- | |
| def sigreg_loss(embeddings): | |
| # embeddings: (B, D) | |
| mu = embeddings.mean(dim=0) # (D,) | |
| centered = embeddings - mu.unsqueeze(0) # (B, D) | |
| # empirical covariance (D x D) approx via (centered^T center) | |
| B = embeddings.shape[0] | |
| cov = (centered.t() @ centered) / (B - 1 + 1e-8) # (D, D) | |
| # penalty = norm(cov - I) + norm(mu) | |
| D = embeddings.shape[1] | |
| eye = torch.eye(D, device=embeddings.device) | |
| cov_pen = F.mse_loss(cov, eye) | |
| mu_pen = (mu.pow(2)).mean() | |
| return cov_pen + mu_pen | |
| class VortexBetinaAntiHalluc(nn.Module): | |
| def __init__( | |
| self, | |
| embed_dim: int = 256, | |
| vortex_steps: int = 10, | |
| vortex_dt: float = 0.02, | |
| num_streams: int = 3, | |
| enable_rotation: bool = True, | |
| rotation_angle: float = math.pi / 4, | |
| rotation_threshold: float = 1e-4, | |
| rotation_clockwise: bool = False, | |
| enable_quadratic_reflection: bool = False, | |
| quadratic_boundary: float = 0.3, | |
| quadratic_strength: float = 0.5, | |
| boost_small_deltas: bool = True, | |
| delta_gain: float = 1.5, | |
| reflection_push: float = 0.25, | |
| stream_aliases: Optional[List[str]] = None, | |
| ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, | |
| refinement_cycles: int = 0, | |
| cycle_stage_name: str = "cycle", | |
| enforce_square_geometry: bool = True, | |
| square_rotation_degrees: float = 180.0, | |
| square_leak_ratio: float = 0.05, | |
| square_jitter_std_degrees: float = 0.0, | |
| enable_lorentz_transform: bool = False, | |
| lorentz_beta: float = 0.6, | |
| lorentz_axis_stream: int = 0, | |
| enable_triangle: bool = True, | |
| triangle_hidden_dim: int = 512, | |
| triangle_max_iters: int = 5, | |
| triangle_tol: float = 1e-4, | |
| triangle_delta_gain: float = 1.0, | |
| ): | |
| super().__init__() | |
| self.embed_dim = embed_dim | |
| self.vortex_steps = vortex_steps | |
| self.vortex_dt = vortex_dt | |
| self.num_streams = max(1, num_streams) | |
| self.enable_rotation = enable_rotation | |
| self.rotation_angle = rotation_angle | |
| self.rotation_threshold = rotation_threshold | |
| self.rotation_clockwise = rotation_clockwise | |
| self.enable_quadratic_reflection = enable_quadratic_reflection | |
| self.quadratic_boundary = nn.Parameter(torch.tensor(float(quadratic_boundary))) | |
| self.quadratic_strength = float(min(1.0, max(0.0, quadratic_strength))) | |
| self.boost_small_deltas = boost_small_deltas | |
| self.delta_gain = max(1.0, delta_gain) | |
| self.reflection_push = max(0.0, reflection_push) | |
| self.stream_aliases = self._prepare_stream_aliases(stream_aliases) | |
| self.refinement_cycles = max(0, refinement_cycles) | |
| self.cycle_stage_name = cycle_stage_name or "cycle" | |
| self.enforce_square_geometry = enforce_square_geometry | |
| self.square_rotation_radians = math.radians(square_rotation_degrees) | |
| self.square_leak_ratio = float(min(1.0, max(0.0, square_leak_ratio))) | |
| self.square_jitter_std = math.radians(max(0.0, square_jitter_std_degrees)) | |
| beta = max(-0.999, min(0.999, lorentz_beta)) | |
| self.enable_lorentz_transform = enable_lorentz_transform | |
| self.lorentz_beta = beta | |
| self.lorentz_axis_stream = max(0, lorentz_axis_stream) | |
| self.enable_triangle = enable_triangle | |
| self.ia_router = MultiIntelligenceRouter( | |
| num_streams=self.num_streams, | |
| stage_config=ia_stage_config, | |
| stream_aliases=self.stream_aliases, | |
| ) | |
| self.triangle_module = ( | |
| SyntheticNeuronTriangle( | |
| embed_dim, | |
| self.num_streams, | |
| hidden_dim=triangle_hidden_dim, | |
| max_iters=triangle_max_iters, | |
| tol=triangle_tol, | |
| delta_gain=triangle_delta_gain, | |
| ) | |
| if self.enable_triangle | |
| else None | |
| ) | |
| self.projector = nn.Sequential( | |
| nn.LayerNorm(embed_dim), | |
| nn.Linear(embed_dim, embed_dim), | |
| ) | |
| nn.init.kaiming_normal_(self.projector[1].weight, mode="fan_out", nonlinearity="relu") | |
| nn.init.constant_(self.projector[1].bias, 0.1) | |
| # Vortex Dynamics Parameters (Chaos Injection) | |
| self.vortex_linear = nn.Parameter(torch.randn(3, embed_dim) * 0.1) | |
| self.vortex_scale = nn.Parameter(torch.ones(embed_dim)) | |
| # Terminal Velocity Matching parameters | |
| self.velocity_gate = nn.Parameter(torch.zeros(embed_dim)) | |
| self.velocity_bias = nn.Parameter(torch.zeros(embed_dim)) | |
| # Adaptive chaos parameters | |
| self.chaos_temperature = nn.Parameter(torch.tensor(1.0)) | |
| self.chaos_gate = nn.Parameter(torch.tensor(0.0)) | |
| self.attractor_selector = nn.Parameter(torch.tensor([0.7, 0.3])) # [lorenz, rossler] | |
| self.last_flow_states: Dict[str, object] = {} | |
| def _prepare_stream_aliases(self, provided: Optional[List[str]]) -> List[str]: | |
| if provided: | |
| aliases = list(provided) | |
| else: | |
| aliases = ["X", "Y", "Z", "W", "V", "U", "T", "S"] | |
| if len(aliases) < self.num_streams: | |
| aliases.extend(f"S{idx}" for idx in range(len(aliases), self.num_streams)) | |
| return aliases[: self.num_streams] | |
| def _rotate_matrix_plane(self, tensor: torch.Tensor, radians: float) -> torch.Tensor: | |
| if tensor.shape[-1] < 2: | |
| return tensor | |
| angle = radians % (2 * math.pi) | |
| if abs(angle) < 1e-9: | |
| return tensor | |
| rotated = tensor.clone() | |
| cos_theta = math.cos(angle) | |
| sin_theta = math.sin(angle) | |
| x = tensor[..., 0] | |
| y = tensor[..., 1] | |
| rotated[..., 0] = cos_theta * x - sin_theta * y | |
| rotated[..., 1] = sin_theta * x + cos_theta * y | |
| return rotated | |
| def _rotate_matrix_square(self, tensor: torch.Tensor) -> Tuple[torch.Tensor, float]: | |
| base_angle = abs(self.square_rotation_radians) % (2 * math.pi) | |
| if base_angle > math.pi: | |
| base_angle = 2 * math.pi - base_angle | |
| jitter = 0.0 | |
| if self.square_jitter_std > 0: | |
| jitter = torch.randn(1, device=tensor.device).item() * self.square_jitter_std | |
| angle = max(0.0, min(math.pi, base_angle + jitter)) | |
| if math.isclose(angle, 0.0, rel_tol=1e-6, abs_tol=1e-6): | |
| mirrored = tensor.clone() | |
| else: | |
| intensity = angle / math.pi | |
| mirrored = torch.lerp(tensor, -tensor, intensity) | |
| if self.square_leak_ratio > 0: | |
| mirrored = torch.lerp(mirrored, tensor, self.square_leak_ratio) | |
| return mirrored, angle | |
| def _select_axis_stream(self, tensor: torch.Tensor) -> torch.Tensor: | |
| stream_idx = min(self.lorentz_axis_stream, tensor.size(1) - 1) | |
| return tensor[:, stream_idx, :] | |
| def _lorentz_boost( | |
| self, | |
| spatial: torch.Tensor, | |
| reference_stream: torch.Tensor, | |
| time_like: torch.Tensor, | |
| ) -> Tuple[torch.Tensor, float, torch.Tensor]: | |
| beta = self.lorentz_beta | |
| gamma = 1.0 / math.sqrt(max(1e-8, 1.0 - beta**2)) | |
| axis = F.normalize(reference_stream, dim=-1, eps=1e-6) | |
| parallel_scalar = torch.sum(spatial * axis, dim=-1, keepdim=True) | |
| parallel_vec = parallel_scalar * axis | |
| perpendicular_vec = spatial - parallel_vec | |
| t_prime = gamma * (time_like - beta * parallel_scalar) | |
| parallel_prime = gamma * (parallel_scalar - beta * time_like) * axis | |
| updated_spatial = parallel_prime + perpendicular_vec | |
| return updated_spatial, gamma, t_prime.abs() | |
| def invert(self, vec: torch.Tensor) -> torch.Tensor: | |
| return -vec | |
| def intersection_knowledge(self, vec: torch.Tensor, base_k: torch.Tensor) -> torch.Tensor: | |
| dot = torch.sum(vec * base_k, dim=-1, keepdim=True) | |
| norm_k = torch.norm(base_k, dim=-1, keepdim=True).pow(2).clamp_min(1e-8) | |
| return (dot / norm_k) * base_k | |
| def euler_vortex(self, state: torch.Tensor) -> torch.Tensor: | |
| sigma, rho, beta, gamma = 10.0, 28.0, 8.0 / 3.0, 1.0 | |
| feature_dim = state.shape[-1] - 1 | |
| base = state[..., :-1] | |
| w = state[..., -1:] | |
| updated_base = torch.zeros_like(base) | |
| for i in range(0, feature_dim, 3): | |
| chunk = base[..., i : i + 3] | |
| if chunk.shape[-1] < 3: | |
| chunk = F.pad(chunk, (0, 3 - chunk.shape[-1])) | |
| chunk = chunk.clone() | |
| for _ in range(self.vortex_steps): | |
| x = chunk[..., 0] | |
| y = chunk[..., 1] | |
| z = chunk[..., 2] | |
| dx = sigma * (y - x) * self.vortex_dt | |
| dy = (x * (rho - z) - y) * self.vortex_dt | |
| dz = (x * y - beta * z) * self.vortex_dt | |
| chunk = chunk + torch.stack([dx, dy, dz], dim=-1) | |
| span = min(3, feature_dim - i) | |
| updated_base[..., i : i + span] = chunk[..., :span] | |
| energy = torch.norm(base, dim=-1, keepdim=True).pow(2) | |
| w_iter = w.clone() | |
| for _ in range(self.vortex_steps): | |
| dw = (gamma * energy - w_iter) * self.vortex_dt | |
| w_iter = w_iter + dw | |
| updated = state.clone() | |
| updated[..., :-1] = updated_base | |
| updated[..., -1:] = w_iter | |
| return updated | |
| def rotate_difference(self, delta: torch.Tensor, anchor: torch.Tensor) -> torch.Tensor: # noqa: ARG002 | |
| # Rotation now confined to the (x, y) plane while keeping z (and higher dims) untouched. | |
| if delta.shape[-1] < 2: | |
| return delta | |
| angle = -abs(self.rotation_angle) if self.rotation_clockwise else abs(self.rotation_angle) | |
| cos_theta = math.cos(angle) | |
| sin_theta = math.sin(angle) | |
| rotated = delta.clone() | |
| x = delta[..., 0] | |
| y = delta[..., 1] | |
| rotated[..., 0] = cos_theta * x - sin_theta * y | |
| rotated[..., 1] = sin_theta * x + cos_theta * y | |
| if delta.shape[-1] >= 3: | |
| rotated[..., 2] = delta[..., 2] # keep z static per vortex requirement | |
| return rotated | |
| def _get_quadratic_boundary(self) -> torch.Tensor: | |
| return self.quadratic_boundary.abs().clamp(0.05, 0.5) | |
| def quadratic_reflection(self, delta: torch.Tensor) -> Tuple[torch.Tensor, float]: | |
| """Reflect components between ±boundary and warp them quadratically (Pong-like bounce).""" | |
| if not self.enable_quadratic_reflection: | |
| return delta, 0.0 | |
| boundary = self._get_quadratic_boundary() | |
| period = 2.0 * boundary | |
| magnitude = delta.abs() | |
| direction = torch.sign(delta) | |
| wrapped = torch.remainder(magnitude, period) | |
| mirrored = torch.where(wrapped > boundary, period - wrapped, wrapped) | |
| normalized = (mirrored / boundary).clamp(0.0, 1.0) | |
| squared = normalized.pow(2) | |
| bounced = direction * boundary * squared | |
| blended = torch.lerp(delta, bounced, self.quadratic_strength) | |
| bounce_ratio = (magnitude > boundary).float().mean().item() | |
| return blended, bounce_ratio | |
| def forward(self, input_vec: torch.Tensor, chaos_factor: float = 1.0): | |
| """ | |
| chaos_factor: Multiplicador de agressividade (1.0 = normal, 5.0 = agressivo, 10.0 = insano) | |
| """ | |
| if input_vec.dim() == 2: | |
| batch = input_vec.size(0) | |
| mat_input = input_vec.unsqueeze(1).expand(batch, self.num_streams, self.embed_dim) | |
| elif input_vec.dim() == 3: | |
| batch, streams, dim = input_vec.shape | |
| if streams != self.num_streams or dim != self.embed_dim: | |
| raise ValueError( | |
| f"Esperava tensor (batch,{self.num_streams},{self.embed_dim}), recebi {input_vec.shape}" | |
| ) | |
| mat_input = input_vec | |
| else: | |
| raise ValueError("input_vec precisa ter 2 ou 3 dimensões") | |
| mat_flat = mat_input.reshape(-1, self.embed_dim) | |
| mat_primary = self.projector(mat_flat).reshape(-1, self.num_streams, self.embed_dim) | |
| stage_signatures: Dict[str, str] = {} | |
| mat_primary = self.ia_router.apply("base", mat_primary) | |
| stage_signatures["base"] = self.ia_router.stage_signature("base") | |
| mirrored_primary: Optional[torch.Tensor] = None | |
| square_tensor: Optional[torch.Tensor] = None | |
| square_angle_applied: Optional[float] = None | |
| if self.enforce_square_geometry: | |
| mirrored_primary, square_angle_applied = self._rotate_matrix_square(mat_primary) | |
| square_tensor = torch.stack([mat_primary, mirrored_primary], dim=2) | |
| mat_secondary = mirrored_primary | |
| else: | |
| mat_secondary = self.invert(mat_primary) | |
| mat_secondary = self.ia_router.apply("inversion", mat_secondary) | |
| stage_signatures["inversion"] = self.ia_router.stage_signature("inversion") | |
| base_k = mat_primary.mean(dim=1, keepdim=True) + 1e-4 * torch.randn_like(mat_primary[:, :1, :]) | |
| inter_primary = self.intersection_knowledge(mat_primary, base_k) | |
| inter_secondary = self.intersection_knowledge(mat_secondary, base_k) | |
| approx_inter_full = 0.5 * (inter_primary + inter_secondary) | |
| approx_inter = approx_inter_full.mean(dim=1) | |
| flow_states: Dict[str, object] = {} | |
| flow_states["matrix_primary"] = mat_primary | |
| flow_states["matrix_secondary"] = mat_secondary | |
| flow_states["base_core"] = base_k.squeeze(1) | |
| if mirrored_primary is not None: | |
| flow_states["matrix_rot_180"] = mirrored_primary | |
| if square_tensor is not None: | |
| flow_states["matrix_square"] = square_tensor | |
| flow_states["square_leak_ratio"] = self.square_leak_ratio | |
| if square_angle_applied is not None: | |
| flow_states["square_angle"] = square_angle_applied | |
| triangle_debug: Dict[str, object] = {} | |
| if self.triangle_module is not None: | |
| delta_green, triangle_debug = self.triangle_module(mat_primary, mat_secondary, base_k) | |
| else: | |
| delta_green = inter_primary - base_k | |
| flow_states["xy_bridge_matrix"] = delta_green | |
| if triangle_debug: | |
| flow_states["triangle_axis"] = triangle_debug.get("axis") | |
| flow_states["triangle_diag"] = triangle_debug.get("diag") | |
| flow_states["triangle_iterations"] = triangle_debug.get("iterations", 0) | |
| flow_states["triangle_residual"] = triangle_debug.get("residual", 0.0) | |
| delta_norm_stream = torch.norm(delta_green, dim=-1, keepdim=True) | |
| if self.boost_small_deltas: | |
| boost_mask = delta_norm_stream < self.rotation_threshold | |
| if boost_mask.any(): | |
| safe_norm = delta_norm_stream.clamp_min(1e-6) | |
| factor = (self.rotation_threshold / safe_norm) * self.delta_gain | |
| delta_green = torch.where(boost_mask, delta_green * factor, delta_green) | |
| boost_ratio = boost_mask.float().mean().item() | |
| else: | |
| boost_ratio = 0.0 | |
| flow_states["matrix_green_boost"] = delta_green | |
| if self.enable_rotation: | |
| delta_flat = delta_green.reshape(-1, self.embed_dim) | |
| base_flat = base_k.expand(-1, self.num_streams, -1).reshape(-1, self.embed_dim) | |
| delta_norm = torch.norm(delta_flat, dim=-1, keepdim=True) | |
| rotation_mask = delta_norm > self.rotation_threshold | |
| if rotation_mask.any(): | |
| rotated = self.rotate_difference(delta_flat, base_flat) | |
| delta_flat = torch.where(rotation_mask, rotated, delta_flat) | |
| delta_green = delta_flat.view(-1, self.num_streams, self.embed_dim) | |
| rotation_ratio = rotation_mask.float().mean().item() | |
| else: | |
| rotation_ratio = 0.0 | |
| flow_states["matrix_green_rot"] = delta_green | |
| boundary_val = self._get_quadratic_boundary() | |
| flow_states["quadratic_boundary"] = boundary_val.detach().item() | |
| if self.enable_quadratic_reflection and self.reflection_push > 0: | |
| norm_after_rot = torch.norm(delta_green, dim=-1, keepdim=True) | |
| push_mask = norm_after_rot < boundary_val | |
| if push_mask.any(): | |
| push_factor = 1.0 + self.reflection_push | |
| delta_green = torch.where(push_mask, delta_green * push_factor, delta_green) | |
| pre_reflect_push_ratio = push_mask.float().mean().item() | |
| else: | |
| pre_reflect_push_ratio = 0.0 | |
| if self.enable_quadratic_reflection: | |
| delta_green, reflection_ratio = self.quadratic_reflection(delta_green) | |
| else: | |
| reflection_ratio = 0.0 | |
| flow_states["matrix_green_reflect"] = delta_green | |
| mirror_reference = self.intersection_knowledge(mat_secondary, base_k) | |
| matrix_black = 0.5 * (2 * base_k - delta_green + mirror_reference) | |
| matrix_black = self.ia_router.apply("mirror", matrix_black) | |
| stage_signatures["mirror"] = self.ia_router.stage_signature("mirror") | |
| if self.enforce_square_geometry: | |
| x_stream = mat_primary[:, 0, :] | |
| x_output = mat_secondary[:, 0, :] | |
| matrix_black[:, 0, :] = x_output | |
| flow_states["square_input"] = x_stream | |
| flow_states["square_output"] = x_output | |
| flow_states["matrix_black_square"] = matrix_black | |
| flow_states["matrix_black"] = matrix_black | |
| delta_inter = matrix_black.mean(dim=1) | |
| cycle_signatures: List[str] = [] | |
| if self.refinement_cycles > 0: | |
| refined_delta = delta_inter | |
| for cycle_idx in range(self.refinement_cycles): | |
| refined_delta = self.ia_router.apply( | |
| self.cycle_stage_name, | |
| refined_delta, | |
| cycle_idx=cycle_idx, | |
| ) | |
| cycle_signatures.append(self.ia_router.stage_signature(self.cycle_stage_name)) | |
| delta_inter = refined_delta | |
| stage_signatures[self.cycle_stage_name] = cycle_signatures[-1] if cycle_signatures else "identity" | |
| flow_states["delta_pre_lorentz"] = delta_inter | |
| flow_states["ia_cycle_signatures"] = cycle_signatures | |
| w = torch.norm(delta_inter, dim=-1, keepdim=True) | |
| lorentz_gamma = 1.0 | |
| if self.enable_lorentz_transform: | |
| reference_stream = self._select_axis_stream(mat_primary) | |
| delta_inter, lorentz_gamma, w = self._lorentz_boost(delta_inter, reference_stream, w) | |
| flow_states["lorentz_reference"] = reference_stream | |
| flow_states["delta_lorentz"] = delta_inter | |
| flow_states["lorentz_gamma"] = lorentz_gamma | |
| flow_states["lorentz_time"] = w | |
| delta_before_chaos = delta_inter | |
| # APLICAÇÃO DO FATOR CAOS (OVERDRIVE) - VORTEX DYNAMICS V2 | |
| if chaos_factor != 1.0: | |
| batch_size = delta_inter.shape[0] | |
| # 1. Generate chaos sequences from both attractors | |
| lor_seq_np = lorenz_sequence(batch_size, init=(0.1, 0.0, 0.0)) | |
| ros_seq_np = rossler_sequence(batch_size, init=(0.1, 0.0, 0.0)) | |
| # Normalize sequences | |
| lor_seq_np = (lor_seq_np - lor_seq_np.mean(axis=0)) / (lor_seq_np.std(axis=0) + 1e-8) | |
| ros_seq_np = (ros_seq_np - ros_seq_np.mean(axis=0)) / (ros_seq_np.std(axis=0) + 1e-8) | |
| lor_tensor = torch.tensor(lor_seq_np, dtype=delta_inter.dtype, device=delta_inter.device) | |
| ros_tensor = torch.tensor(ros_seq_np, dtype=delta_inter.dtype, device=delta_inter.device) | |
| # 2. Adaptive attractor mixing (learnable weights) | |
| attractor_weights = F.softmax(self.attractor_selector, dim=0) | |
| mixed_chaos = attractor_weights[0] * lor_tensor + attractor_weights[1] * ros_tensor | |
| # 3. Map 3D Chaos -> Embedding Dimension | |
| perturb = mixed_chaos @ self.vortex_linear | |
| perturb = perturb * self.vortex_scale | |
| # 4. Terminal Velocity Matching (Flow-like correction) | |
| velocity = compute_terminal_velocity(delta_inter, target_distribution="gaussian") | |
| gated_velocity = velocity * torch.sigmoid(self.velocity_gate) + self.velocity_bias | |
| # 5. Normalize perturbation magnitude | |
| emb_norm = delta_inter.norm(dim=1, keepdim=True) + 1e-8 | |
| pert_norm = perturb.norm(dim=1, keepdim=True) + 1e-8 | |
| normalized_perturb = perturb * (emb_norm / pert_norm) | |
| # 6. Adaptive temperature scaling + learned gate | |
| chaos_scale = torch.sigmoid(self.chaos_temperature + self.chaos_gate) | |
| effective_chaos = chaos_factor * chaos_scale | |
| # Gate chaos intensity if semantic entropy explodes (uncertain corrections) | |
| delta_entropy = compute_semantic_entropy(delta_inter.unsqueeze(1)) | |
| chaos_entropy_gate = 1.0 if delta_entropy < 1.0 else 0.5 | |
| effective_chaos = effective_chaos * chaos_entropy_gate | |
| # 7. Apply combined perturbation: chaos + velocity flow | |
| delta_inter = delta_inter + effective_chaos * normalized_perturb + 0.1 * gated_velocity | |
| # Store chaos metrics | |
| flow_states["attractor_mix"] = attractor_weights.detach().cpu().tolist() | |
| flow_states["effective_chaos"] = effective_chaos.item() | |
| flow_states["chaos_scale"] = chaos_scale.item() | |
| flow_states["chaos_entropy_gate"] = chaos_entropy_gate | |
| flow_states["angular_divergence"] = compute_angular_divergence(delta_before_chaos, delta_inter) | |
| # Gain boost to increase boundary hits when chaos is controlled | |
| delta_inter = delta_inter * 1.5 | |
| flow_states["delta_output"] = delta_inter | |
| state = torch.cat([approx_inter, delta_inter, w], dim=-1) | |
| evolved = self.euler_vortex(state) | |
| flow_states["output_corner"] = evolved[..., :-1] | |
| flow_states["ia_stage_logs"] = self.ia_router.describe_all_stages() | |
| self.last_flow_states = flow_states | |
| overlap = F.cosine_similarity(inter_primary.view(-1, self.embed_dim), inter_secondary.view(-1, self.embed_dim), dim=-1) | |
| overlap = overlap.view(-1, self.num_streams).mean(dim=1) | |
| annulation = torch.norm(mat_primary - (-mat_secondary), dim=-1).pow(2).mean(dim=1) | |
| vortex_sink = evolved[..., -1] | |
| hall_penalty = (1 - overlap).clamp_min(0.0) | |
| approx_pull = ( | |
| torch.norm(mat_primary.mean(dim=1) - approx_inter, dim=-1).pow(2) | |
| + torch.norm(mat_secondary.mean(dim=1) - approx_inter, dim=-1).pow(2) | |
| ) | |
| # SIGReg Regularization (Isotropic Gaussian Enforcement) | |
| sig_loss = sigreg_loss(delta_inter) | |
| # Spectral Energy Regularization (distribui energia uniformemente) | |
| spectral_loss = spectral_energy_loss(delta_inter) | |
| delta_probs = F.softmax(delta_inter, dim=-1) | |
| semantic_entropy = -(delta_probs * (delta_probs + 1e-8).log()).sum(dim=-1).mean() | |
| semantic_entropy_val = semantic_entropy.item() | |
| boundary_val = self._get_quadratic_boundary() | |
| boundary_reg = (0.2 - boundary_val).clamp_min(0.0).pow(2) | |
| loss = ( | |
| annulation.mean() | |
| + 0.5 * hall_penalty.mean() | |
| + 0.25 * approx_pull.mean() | |
| - 0.1 * vortex_sink.mean() | |
| + 0.05 * sig_loss | |
| + 0.02 * spectral_loss # Força distribuição espectral uniforme | |
| + 0.02 * boundary_reg | |
| + 0.05 * semantic_entropy | |
| ) | |
| metrics = { | |
| "annulation": annulation.mean().item(), | |
| "cosine_overlap": overlap.mean().item(), | |
| "vortex_energy": vortex_sink.mean().item(), | |
| "sigreg_loss": sig_loss.item(), | |
| "spectral_loss": spectral_loss.item() if isinstance(spectral_loss, torch.Tensor) else spectral_loss, | |
| "boundary_reg": boundary_reg.mean().item() if isinstance(boundary_reg, torch.Tensor) else boundary_reg, | |
| "rotation_ratio": rotation_ratio, | |
| "approx_alignment": approx_pull.mean().item(), | |
| "reflection_ratio": reflection_ratio, | |
| "reflect_ratio": reflection_ratio, | |
| "boost_ratio": boost_ratio, | |
| "reflection_push_ratio": pre_reflect_push_ratio, | |
| "ia_base": stage_signatures.get("base", "identity"), | |
| "ia_inversion": stage_signatures.get("inversion", "identity"), | |
| "ia_mirror": stage_signatures.get("mirror", "identity"), | |
| "ia_cycle": " || ".join(cycle_signatures) if cycle_signatures else stage_signatures.get(self.cycle_stage_name, "identity"), | |
| "lorentz_gamma": lorentz_gamma, | |
| "square_angle_deg": math.degrees(square_angle_applied) if square_angle_applied is not None else 0.0, | |
| "square_leak_ratio": self.square_leak_ratio, | |
| "angular_divergence": flow_states.get("angular_divergence", 0.0), | |
| "attractor_mix": flow_states.get("attractor_mix", [1.0, 0.0]), | |
| "effective_chaos": flow_states.get("effective_chaos", 1.0), | |
| "semantic_entropy_approx": semantic_entropy_val, | |
| "triangle_iters": triangle_debug.get("iterations", 0) if triangle_debug else 0, | |
| "triangle_residual": triangle_debug.get("residual", 0.0) if triangle_debug else 0.0, | |
| } | |
| delta_norm = torch.norm(delta_inter, dim=-1) | |
| metrics["delta_norm_mean"] = delta_norm.mean().item() | |
| metrics["delta_norm_max"] = delta_norm.max().item() | |
| return evolved, loss, metrics, delta_inter | |
| class PortugueseSentenceDataset(Dataset): | |
| def __init__( | |
| self, | |
| sentences: List[str], | |
| tokenizer: AutoTokenizer, | |
| embedding_model: SentenceTransformer, | |
| mask_prob: float = 0.15, | |
| max_seq_length: int = 512, | |
| precompute_embeddings: bool = False, | |
| embedding_batch_size: int = 64, | |
| ): | |
| self.sentences = sentences | |
| self.tokenizer = tokenizer | |
| self.embedding_model = embedding_model | |
| self.mask_prob = mask_prob | |
| self.max_seq_length = max_seq_length | |
| self.precompute_embeddings = precompute_embeddings | |
| self.embedding_batch_size = max(1, embedding_batch_size) | |
| self.embeddings: Optional[torch.Tensor] = None | |
| self._embedding_cache: Dict[int, torch.Tensor] = {} | |
| if self.precompute_embeddings: | |
| self._precompute_all_embeddings() | |
| def __len__(self) -> int: | |
| return len(self.sentences) | |
| def _mask_tokens(self, input_ids: torch.Tensor, special_mask: torch.Tensor) -> Dict[str, torch.Tensor]: | |
| labels = input_ids.clone() | |
| probability_matrix = torch.full(labels.shape, self.mask_prob) | |
| probability_matrix.masked_fill_(special_mask.bool(), 0.0) | |
| masked_indices = torch.bernoulli(probability_matrix).bool() | |
| if not masked_indices.any(): | |
| candidate_positions = (~special_mask.bool()).nonzero(as_tuple=False).view(-1) | |
| choice = candidate_positions[torch.randint(0, candidate_positions.numel(), (1,)).item()] | |
| masked_indices[choice] = True | |
| labels[~masked_indices] = -100 | |
| input_ids = input_ids.clone() | |
| input_ids[masked_indices] = self.tokenizer.mask_token_id | |
| return {"input_ids": input_ids, "labels": labels} | |
| def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]: | |
| sentence = self.sentences[idx] | |
| encoding = self.tokenizer( | |
| sentence, | |
| return_tensors="pt", | |
| return_special_tokens_mask=True, | |
| truncation=True, | |
| max_length=self.max_seq_length, | |
| ) | |
| input_ids = encoding["input_ids"].squeeze(0) | |
| attention_mask = encoding["attention_mask"].squeeze(0) | |
| special_mask = encoding["special_tokens_mask"].squeeze(0) | |
| masked = self._mask_tokens(input_ids, special_mask) | |
| embedding = self._get_embedding(idx) | |
| return { | |
| "input_ids": masked["input_ids"], | |
| "attention_mask": attention_mask, | |
| "labels": masked["labels"], | |
| "embedding": embedding, | |
| } | |
| def _precompute_all_embeddings(self) -> None: | |
| chunks: List[torch.Tensor] = [] | |
| total = len(self.sentences) | |
| for start in range(0, total, self.embedding_batch_size): | |
| batch = self.sentences[start : start + self.embedding_batch_size] | |
| batch_embeds = self.embedding_model.encode( | |
| batch, | |
| convert_to_tensor=True, | |
| show_progress_bar=False, | |
| batch_size=self.embedding_batch_size, | |
| ) | |
| chunks.append(batch_embeds.float().cpu()) | |
| self.embeddings = torch.cat(chunks, dim=0) if chunks else torch.empty(0) | |
| def _compute_single_embedding(self, sentence: str) -> torch.Tensor: | |
| embed = self.embedding_model.encode( | |
| sentence, | |
| convert_to_tensor=True, | |
| show_progress_bar=False, | |
| batch_size=1, | |
| ) | |
| return embed.float().cpu() | |
| def _get_embedding(self, idx: int) -> torch.Tensor: | |
| if self.embeddings is not None: | |
| return self.embeddings[idx] | |
| if idx not in self._embedding_cache: | |
| self._embedding_cache[idx] = self._compute_single_embedding(self.sentences[idx]) | |
| return self._embedding_cache[idx] | |
| def build_collate_fn(tokenizer: AutoTokenizer): | |
| pad_id = tokenizer.pad_token_id | |
| def collate(batch: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]: | |
| input_ids = pad_sequence([item["input_ids"] for item in batch], batch_first=True, padding_value=pad_id) | |
| attention_mask = pad_sequence([item["attention_mask"] for item in batch], batch_first=True, padding_value=0) | |
| labels = pad_sequence([item["labels"] for item in batch], batch_first=True, padding_value=-100) | |
| embeddings = torch.stack([item["embedding"] for item in batch]) | |
| return { | |
| "input_ids": input_ids, | |
| "attention_mask": attention_mask, | |
| "labels": labels, | |
| "embedding": embeddings, | |
| } | |
| return collate | |
| class BetinaTrainer: | |
| def __init__( | |
| self, | |
| vortex: VortexBetinaAntiHalluc, | |
| tokenizer: AutoTokenizer, | |
| embedding_model: SentenceTransformer, | |
| mlm_model: AutoModelForMaskedLM, | |
| raw_embedding_dim: int, | |
| embed_dim: int, | |
| lambda_vortex: float = 0.5, | |
| learning_rate: float = 1e-4, | |
| mlm_learning_rate: float = 5e-5, | |
| freeze_mlm: bool = False, | |
| freeze_projectors: bool = False, | |
| correction_max_norm: float | None = None, | |
| chaos_factor: float = 1.0, | |
| eval_chaos_factor: float = 1.0, | |
| device: torch.device | None = None, | |
| max_seq_length: int = 512, | |
| ): | |
| self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.vortex = vortex.to(self.device) | |
| self.tokenizer = tokenizer | |
| self.embedding_model = embedding_model | |
| self.mlm_model = mlm_model.to(self.device) | |
| self.lambda_vortex = lambda_vortex | |
| self.chaos_factor = chaos_factor | |
| self.eval_chaos_factor = eval_chaos_factor | |
| self.embedding_projector = nn.Linear(raw_embedding_dim, embed_dim).to(self.device) | |
| hidden_size = self.mlm_model.config.hidden_size | |
| self.correction_projector = nn.Linear(embed_dim, hidden_size).to(self.device) | |
| self.freeze_projectors = freeze_projectors | |
| self.correction_max_norm = correction_max_norm if correction_max_norm and correction_max_norm > 0 else None | |
| self.max_seq_length = max_seq_length | |
| mlm_params = list(self.mlm_model.parameters()) | |
| if freeze_mlm: | |
| for param in mlm_params: | |
| param.requires_grad = False | |
| projector_params = list(self.embedding_projector.parameters()) + list(self.correction_projector.parameters()) | |
| if freeze_projectors: | |
| for param in projector_params: | |
| param.requires_grad = False | |
| trainable_projectors = [] if freeze_projectors else projector_params | |
| vortex_params = list(self.vortex.parameters()) + trainable_projectors | |
| optimizer_groups = [ | |
| {"params": vortex_params, "lr": learning_rate}, | |
| ] | |
| if not freeze_mlm: | |
| optimizer_groups.append({"params": mlm_params, "lr": mlm_learning_rate}) | |
| self.optimizer = optim.AdamW(optimizer_groups) | |
| self.freeze_mlm = freeze_mlm | |
| self.scaler = betina_grad_scaler(self.device.type, enabled=self.device.type == "cuda") | |
| def _project_correction(self, delta: torch.Tensor) -> torch.Tensor: | |
| correction = self.correction_projector(delta) | |
| if self.correction_max_norm is not None: | |
| dim = correction.dim() - 1 if correction.dim() > 0 else 0 | |
| correction = correction.renorm(p=2, dim=dim, maxnorm=self.correction_max_norm) | |
| return correction | |
| def train(self, dataloader: DataLoader, epochs: int = 5, grad_clip: float = 1.0) -> List[Dict[str, float]]: | |
| history: List[Dict[str, float]] = [] | |
| self.vortex.train() | |
| self.mlm_model.train() | |
| for epoch in range(epochs): | |
| for step, batch in enumerate(dataloader): | |
| input_ids = batch["input_ids"].to(self.device) | |
| attention_mask = batch["attention_mask"].to(self.device) | |
| labels = batch["labels"].to(self.device) | |
| embeds = batch["embedding"].to(self.device) | |
| self.optimizer.zero_grad(set_to_none=True) | |
| with betina_autocast(self.device.type, enabled=self.device.type == "cuda"): | |
| projected = self.embedding_projector(embeds) | |
| _, vortex_loss, metrics, delta = self.vortex(projected, chaos_factor=self.chaos_factor) | |
| if self.freeze_mlm: | |
| with torch.no_grad(): | |
| outputs = self.mlm_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| output_hidden_states=True, | |
| return_dict=True, | |
| ) | |
| else: | |
| outputs = self.mlm_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| output_hidden_states=True, | |
| return_dict=True, | |
| ) | |
| hidden = outputs.hidden_states[-1] | |
| correction = self._project_correction(delta).unsqueeze(1) | |
| attention_mask_f = attention_mask.unsqueeze(-1).float() | |
| mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() | |
| weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus | |
| corrected_hidden = hidden + correction * weight_mask | |
| if hasattr(self.mlm_model, "cls"): | |
| logits = self.mlm_model.cls(corrected_hidden) | |
| else: | |
| logits = self.mlm_model.get_output_embeddings()(corrected_hidden) | |
| mask_positions = labels != -100 | |
| if mask_positions.any(): | |
| mlm_loss = F.cross_entropy(logits[mask_positions], labels[mask_positions]) | |
| else: | |
| mlm_loss = torch.zeros(1, device=self.device) | |
| total_loss = mlm_loss + self.lambda_vortex * vortex_loss | |
| self.scaler.scale(total_loss).backward() | |
| torch.nn.utils.clip_grad_norm_(self.parameters(), grad_clip) | |
| self.scaler.step(self.optimizer) | |
| self.scaler.update() | |
| perplexity = torch.exp(mlm_loss.detach()) | |
| record = { | |
| "epoch": epoch + 1, | |
| "step": step + 1, | |
| "mlm_loss": mlm_loss.detach().item(), | |
| "vortex_loss": vortex_loss.detach().item(), | |
| "total_loss": total_loss.detach().item(), | |
| "perplexity": perplexity.item(), | |
| "vortex_energy": metrics["vortex_energy"], | |
| "cosine_overlap": metrics["cosine_overlap"], | |
| "rotation_ratio": metrics["rotation_ratio"], | |
| "approx_alignment": metrics["approx_alignment"], | |
| "reflection_ratio": metrics["reflection_ratio"], | |
| "boost_ratio": metrics.get("boost_ratio", 0.0), | |
| "reflection_push_ratio": metrics.get("reflection_push_ratio", 0.0), | |
| } | |
| history.append(record) | |
| if step % 10 == 0: | |
| print( | |
| f"Epoch {record['epoch']:03d} Step {record['step']:04d} | " | |
| f"Total {record['total_loss']:.4f} | MLM {record['mlm_loss']:.4f} | " | |
| f"PPL {record['perplexity']:.4f} | " | |
| f"Vortex {record['vortex_loss']:.4f} | Overlap {record['cosine_overlap']:.4f} | " | |
| f"Energy {record['vortex_energy']:.4f} | Rotation {record['rotation_ratio']:.3f} | " | |
| f"Reflect {record['reflection_ratio']:.3f} | Boost {record['boost_ratio']:.3f} | " | |
| f"PreReflect {record['reflection_push_ratio']:.3f} | Approx {record['approx_alignment']:.4f}" | |
| ) | |
| return history | |
| def parameters(self): | |
| for module in (self.vortex, self.embedding_projector, self.correction_projector, self.mlm_model): | |
| for param in module.parameters(): | |
| if param.requires_grad: | |
| yield param | |
| def evaluate_perplexity(self, dataloader: DataLoader, apply_correction: bool = True) -> float: | |
| self.vortex.eval() | |
| self.mlm_model.eval() | |
| total_loss = 0.0 | |
| total_tokens = 0 | |
| for batch in dataloader: | |
| input_ids = batch["input_ids"].to(self.device) | |
| attention_mask = batch["attention_mask"].to(self.device) | |
| labels = batch["labels"].to(self.device) | |
| embeds = batch["embedding"].to(self.device) | |
| outputs = self.mlm_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| output_hidden_states=True, | |
| return_dict=True, | |
| ) | |
| hidden = outputs.hidden_states[-1] | |
| if apply_correction: | |
| projected = self.embedding_projector(embeds) | |
| _, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor) | |
| correction = self._project_correction(delta).unsqueeze(1) | |
| attention_mask_f = attention_mask.unsqueeze(-1).float() | |
| mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() | |
| weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus | |
| hidden = hidden + correction * weight_mask | |
| if hasattr(self.mlm_model, "cls"): | |
| logits = self.mlm_model.cls(hidden) | |
| else: | |
| logits = self.mlm_model.get_output_embeddings()(hidden) | |
| mask_positions = labels != -100 | |
| if mask_positions.any(): | |
| loss = F.cross_entropy(logits[mask_positions], labels[mask_positions], reduction="sum") | |
| total_loss += loss.item() | |
| total_tokens += mask_positions.sum().item() | |
| if total_tokens == 0: | |
| return float("inf") | |
| return math.exp(total_loss / total_tokens) | |
| def save(self, output_dir: str) -> None: | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| torch.save(self.vortex.state_dict(), output_path / "vortex.pt") | |
| torch.save(self.embedding_projector.state_dict(), output_path / "embedding_projector.pt") | |
| torch.save(self.correction_projector.state_dict(), output_path / "correction_projector.pt") | |
| self.mlm_model.save_pretrained(output_path / "mlm") | |
| self.tokenizer.save_pretrained(output_path / "mlm") | |
| def fill_masks( | |
| self, | |
| texts: List[str], | |
| top_k: int = 5, | |
| apply_correction: bool = True, | |
| ) -> List[List[List[Tuple[str, float]]]]: | |
| self.vortex.eval() | |
| self.mlm_model.eval() | |
| encodings = self.tokenizer( | |
| texts, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=self.max_seq_length, | |
| ) | |
| input_ids = encodings["input_ids"].to(self.device) | |
| attention_mask = encodings["attention_mask"].to(self.device) | |
| outputs = self.mlm_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| output_hidden_states=True, | |
| return_dict=True, | |
| ) | |
| hidden = outputs.hidden_states[-1] | |
| if apply_correction: | |
| embeds = self.embedding_model.encode(texts, convert_to_tensor=True, show_progress_bar=False).to(self.device) | |
| projected = self.embedding_projector(embeds) | |
| _, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor) | |
| correction = self._project_correction(delta).unsqueeze(1) | |
| attention_mask_f = attention_mask.unsqueeze(-1).float() | |
| mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() | |
| weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus | |
| hidden = hidden + correction * weight_mask | |
| if hasattr(self.mlm_model, "cls"): | |
| logits = self.mlm_model.cls(hidden) | |
| else: | |
| logits = self.mlm_model.get_output_embeddings()(hidden) | |
| mask_positions = (input_ids == self.tokenizer.mask_token_id) | |
| results: List[List[List[Tuple[str, float]]]] = [] | |
| for batch_index in range(input_ids.size(0)): | |
| batch_tokens: List[List[Tuple[str, float]]] = [] | |
| positions = mask_positions[batch_index].nonzero(as_tuple=False).view(-1) | |
| for position in positions: | |
| token_logits = logits[batch_index, position] | |
| token_probs = F.softmax(token_logits, dim=-1) | |
| topk = torch.topk(token_probs, top_k) | |
| decoded: List[Tuple[str, float]] = [] | |
| for token_id, prob in zip(topk.indices, topk.values): | |
| token = self.tokenizer.decode([token_id]).strip() | |
| decoded.append((token, prob.item())) | |
| batch_tokens.append(decoded) | |
| results.append(batch_tokens) | |
| return results | |
| def run_demo(embed_dim: int = 4, batch_size: int = 3, seed: int = 123) -> None: | |
| torch.manual_seed(seed) | |
| model = VortexBetinaAntiHalluc(embed_dim=embed_dim) | |
| inputs = torch.randn(batch_size, embed_dim) | |
| evolved, loss, metrics, delta = model(inputs) | |
| print("Dimensão de entrada:", embed_dim) | |
| print("Inputs:", inputs) | |
| print("Estado evoluído shape:", evolved.shape) | |
| print("Delta shape:", delta.shape) | |
| print("Loss:", loss.item()) | |
| for key, value in metrics.items(): | |
| print(f"{key}: {value}") | |
| def sample_sentences() -> List[str]: | |
| return [ | |
| "O céu de Lisboa estava completamente claro naquela manhã.", | |
| "A inteligência coletiva da equipe resolveu o problema rapidamente.", | |
| "O gato preto dormia tranquilo sobre o sofá da sala.", | |
| "A orquestra executou a sinfonia com uma precisão impressionante.", | |
| "Os dados indicam uma redução consistente nas alucinações do modelo.", | |
| "A pesquisa científica requer paciência, rigor e curiosidade constante.", | |
| "A ponte antiga foi restaurada para preservar o patrimônio cultural.", | |
| "O sistema Betina ajusta embeddings para evitar distorções semânticas.", | |
| ] | |
| def load_sentences_from_args(args: argparse.Namespace) -> List[str]: | |
| if args.dataset_file: | |
| path = Path(args.dataset_file) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Dataset file not found: {path}") | |
| sentences = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] | |
| if not sentences: | |
| raise ValueError(f"Dataset file {path} is empty") | |
| return sentences if args.dataset_limit is None else sentences[: args.dataset_limit] | |
| if args.dataset_hf: | |
| if load_dataset is None: | |
| raise ImportError("Install the 'datasets' package to use --dataset-hf option") | |
| split = args.dataset_split | |
| if args.dataset_limit and ":" not in split: | |
| split = f"{split}[:{args.dataset_limit}]" | |
| dataset_name = args.dataset_hf | |
| config_name: Optional[str] = args.dataset_hf_config or None | |
| try: | |
| dataset = _safe_load_dataset( | |
| dataset_name, | |
| config_name, | |
| split=split, | |
| hf_token=args.hf_token, | |
| trust_remote_code=args.trust_remote_code, | |
| ) | |
| except Exception as exc: | |
| message = str(exc).lower() | |
| scripts_blocked = "dataset scripts are no longer supported" in message | |
| trust_flag_blocked = "trust_remote_code" in message | |
| if dataset_name == "wikipedia" and (scripts_blocked or trust_flag_blocked): | |
| fallback_name = "wikimedia/wikipedia" | |
| fallback_config = config_name or "20231101.pt" | |
| print( | |
| "Dataset 'wikipedia' agora usa snapshot parquet e não aceita mais scripts remotos." | |
| f" Alternando automaticamente para {fallback_name} ({fallback_config})." | |
| ) | |
| dataset = _safe_load_dataset( | |
| fallback_name, | |
| fallback_config, | |
| split=split, | |
| hf_token=args.hf_token, | |
| trust_remote_code=False, | |
| ) | |
| elif scripts_blocked and not args.trust_remote_code: | |
| hint = ( | |
| "O dataset solicita código remoto. Reexecute com --trust-remote-code para habilitar" | |
| " scripts do autor do dataset. Apenas use se confiar na fonte." | |
| ) | |
| raise RuntimeError(hint) from exc | |
| if "gated dataset" in message or "403" in message or "401" in message: | |
| hint = ( | |
| "Dataset protegido requer autenticação. Informe --hf-token <TOKEN>, defina HF_TOKEN/HUGGINGFACE_TOKEN" | |
| " ou utilize --hf-token-file apontando para o token salvo pelo huggingface-cli. Também é possível" | |
| " executar 'huggingface-cli login' para gerar ~/.cache/huggingface/token.\nVocê pode obter o token em" | |
| " https://huggingface.co/settings/tokens." | |
| ) | |
| raise RuntimeError(hint) from exc | |
| raise | |
| sentences: List[str] = [] | |
| text_field = args.dataset_text_field | |
| limit = args.dataset_limit | |
| for item in dataset: | |
| text = item.get(text_field) | |
| if isinstance(text, str) and text.strip(): | |
| sentences.append(text.strip()) | |
| if limit is not None and len(sentences) >= limit: | |
| break | |
| if not sentences: | |
| raise ValueError("No sentences extracted from the specified dataset") | |
| return sentences | |
| return sample_sentences() | |
| def print_gain_summary( | |
| prompts: List[str], | |
| base_fills: List[List[List[Tuple[str, float]]]], | |
| betina_fills: List[List[List[Tuple[str, float]]]], | |
| ) -> None: | |
| print("\nResumo de ganhos top-1:") | |
| for prompt, base_masks, betina_masks in zip(prompts, base_fills, betina_fills): | |
| prompt_head = prompt if len(prompt) <= 60 else f"{prompt[:57]}..." | |
| for idx, (base_group, betina_group) in enumerate(zip(base_masks, betina_masks), start=1): | |
| if not base_group or not betina_group: | |
| continue | |
| base_token, base_prob = base_group[0] | |
| betina_token, betina_prob = betina_group[0] | |
| delta = betina_prob - base_prob | |
| if base_prob > 0: | |
| rel = delta / base_prob * 100.0 | |
| rel_text = f"{rel:+.2f}%" | |
| else: | |
| rel_text = "n/d" | |
| change_desc = "mantido" if betina_token == base_token else f"{base_token} -> {betina_token}" | |
| print( | |
| f" [{prompt_head}] máscara {idx}: {change_desc} | base {base_prob:.4f} -> betina {betina_prob:.4f} ({rel_text})" | |
| ) | |
| def _prepare_debug_value(value, max_examples: int): | |
| if isinstance(value, torch.Tensor): | |
| limited = value.detach().cpu() | |
| if limited.dim() >= 1: | |
| limited = limited[:max_examples] | |
| return limited.tolist() | |
| if isinstance(value, dict): | |
| return {key: _prepare_debug_value(val, max_examples) for key, val in value.items()} | |
| if isinstance(value, (list, tuple)): | |
| return [_prepare_debug_value(item, max_examples) for item in value] | |
| if isinstance(value, (float, int, str)) or value is None: | |
| return value | |
| return str(value) | |
| def dump_square_debug(flow_states: Dict[str, object], metrics: Dict[str, float], output_path: str, max_examples: int = 1) -> Path: | |
| output = Path(output_path).expanduser() | |
| payload = { | |
| "max_examples": max(1, max_examples), | |
| "metrics": {key: float(value) if isinstance(value, (int, float)) else value for key, value in metrics.items()}, | |
| "flow_states": {key: _prepare_debug_value(val, max_examples) for key, val in flow_states.items()}, | |
| } | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| output.write_text(json.dumps(payload, indent=2), encoding="utf-8") | |
| return output | |
| def main(argv: list[str] | None = None): | |
| parser = argparse.ArgumentParser(description="Treinamento do modelo Betina anti-hallucination") | |
| parser.add_argument("--train", action="store_true", help="Executa treinamento completo") | |
| parser.add_argument("--epochs", type=int, default=5, help="Número de épocas para treinar") | |
| parser.add_argument("--batch-size", type=int, default=4, help="Tamanho do batch") | |
| parser.add_argument("--embed-dim", type=int, default=256, choices=[128, 256], help="Dimensão interna do vórtice") | |
| parser.add_argument("--lambda-vortex", type=float, default=0.1, help="Peso da loss do vórtice") | |
| parser.add_argument("--chaos-factor", type=float, default=1.0, help="Fator de caos aplicado durante o treinamento") | |
| parser.add_argument( | |
| "--eval-chaos-factor", | |
| type=float, | |
| default=1.0, | |
| help="Fator de caos usado em avaliação e inferência (permite medir diferentes regimes)", | |
| ) | |
| parser.add_argument("--learning-rate", type=float, default=1e-4, help="Learning rate para o vórtice e projetores") | |
| parser.add_argument("--mlm-learning-rate", type=float, default=5e-5, help="Learning rate para o modelo de linguagem") | |
| parser.add_argument("--freeze-mlm", action="store_true", help="Congela os pesos do modelo de linguagem durante o treino") | |
| parser.add_argument("--freeze-projectors", action="store_true", help="Congela os projetores de embedding/correção (modo inferência)") | |
| parser.add_argument( | |
| "--correction-max-norm", | |
| type=float, | |
| default=None, | |
| help="Clampa o vetor de correção Betina a esta norma L2 (<=0 desativa)", | |
| ) | |
| parser.add_argument("--output-dir", type=str, default="outputs/betina_vortex", help="Diretório para salvar o modelo") | |
| parser.add_argument("--device", type=str, default=None, help="Força execução em cuda ou cpu") | |
| parser.add_argument("--top-k", type=int, default=5, help="Top-k para avaliação de máscara") | |
| parser.add_argument("--skip-eval", action="store_true", help="Pula avaliação pós-treino") | |
| parser.add_argument("--eval-prompts", nargs="*", default=None, help="Prompts personalizados contendo [MASK] para avaliação") | |
| parser.add_argument("--dataset-file", type=str, default=None, help="Arquivo de texto com uma sentença por linha") | |
| parser.add_argument("--dataset-hf", type=str, default=None, help="Nome do dataset Hugging Face, ex.: oscar") | |
| parser.add_argument("--dataset-hf-config", type=str, default=None, help="Config do dataset Hugging Face, ex.: unshuffled_deduplicated_pt") | |
| parser.add_argument("--dataset-split", type=str, default="train[:1000]", help="Split do dataset Hugging Face") | |
| parser.add_argument("--dataset-text-field", type=str, default="text", help="Campo de texto no dataset Hugging Face") | |
| parser.add_argument("--dataset-limit", type=int, default=None, help="Limite de sentenças carregadas") | |
| parser.add_argument("--hf-token", type=str, default=None, help="Token de autenticação da Hugging Face (ou defina HF_TOKEN)") | |
| parser.add_argument( | |
| "--hf-token-file", | |
| type=str, | |
| default=None, | |
| help="Arquivo contendo o token da Hugging Face (padrão: ~/.cache/huggingface/token)", | |
| ) | |
| parser.add_argument("--trust-remote-code", action="store_true", help="Permite datasets com script remoto (requer confiança no autor)") | |
| parser.add_argument("--force-download", action="store_true", help="Força novo download dos pesos do modelo de linguagem") | |
| parser.add_argument("--disable-rotation", action="store_true", help="Desativa a rotação do delta do vórtice") | |
| parser.add_argument("--rotation-angle", type=float, default=math.pi / 4, help="Ângulo (rad) para rotacionar o delta quando ativado") | |
| parser.add_argument("--rotation-threshold", type=float, default=1e-4, help="Norma mínima do delta para aplicar rotação") | |
| parser.add_argument("--rotation-clockwise", action="store_true", help="Força rotação horária (inverte o sinal do ângulo)") | |
| parser.add_argument( | |
| "--enable-quadratic-reflection", | |
| action="store_true", | |
| help="Ativa reflexão quadrática estilo bolinha/bastão no delta do vórtice", | |
| ) | |
| parser.add_argument( | |
| "--quadratic-boundary", | |
| type=float, | |
| default=1.0, | |
| help="Magnitudes acima desse valor são refletidas (parede virtual)", | |
| ) | |
| parser.add_argument( | |
| "--quadratic-strength", | |
| type=float, | |
| default=0.5, | |
| help="Mistura (0-1) entre o delta original e o refletido quadrático", | |
| ) | |
| parser.add_argument( | |
| "--disable-triangle", | |
| action="store_true", | |
| help="Desativa o neurônio sintético triangular que confronta X, Y e contrabase", | |
| ) | |
| parser.add_argument( | |
| "--triangle-hidden-dim", | |
| type=int, | |
| default=512, | |
| help="Dimensão oculta usada dentro do neurônio triangular", | |
| ) | |
| parser.add_argument( | |
| "--triangle-max-iters", | |
| type=int, | |
| default=5, | |
| help="Iterações máximas de refinamento (porquês) do triângulo", | |
| ) | |
| parser.add_argument( | |
| "--triangle-tol", | |
| type=float, | |
| default=1e-4, | |
| help="Tolerância para encerrar refinamento triangular (quanto menor, mais perguntas)", | |
| ) | |
| parser.add_argument( | |
| "--triangle-delta-gain", | |
| type=float, | |
| default=1.0, | |
| help="Ganho aplicado ao eixo integrador do triângulo", | |
| ) | |
| parser.add_argument( | |
| "--disable-square-geometry", | |
| action="store_true", | |
| help="Desativa o giro de 180° que forma o quadrado X↔X⁻¹", | |
| ) | |
| parser.add_argument( | |
| "--square-rotation-degrees", | |
| type=float, | |
| default=180.0, | |
| help="Ângulo aplicado ao girar a matriz completa (180° gera o quadrado perfeito)", | |
| ) | |
| parser.add_argument( | |
| "--square-leak-ratio", | |
| type=float, | |
| default=0.05, | |
| help="Mistura o quadrado com a matriz original (0 mantém oposição perfeita, 1 ignora o giro)", | |
| ) | |
| parser.add_argument( | |
| "--square-jitter-std-deg", | |
| type=float, | |
| default=0.0, | |
| help="Desvio padrão em graus para injetar ruído aleatório no giro quadrado", | |
| ) | |
| parser.add_argument( | |
| "--square-debug-json", | |
| type=str, | |
| default=None, | |
| help="Se definido, salva um dump JSON com as matrizes primária/secundária e métricas do vórtice", | |
| ) | |
| parser.add_argument( | |
| "--square-debug-max", | |
| type=int, | |
| default=1, | |
| help="Número máximo de exemplos incluídos no dump quadrado", | |
| ) | |
| parser.add_argument( | |
| "--enable-lorentz-transform", | |
| action="store_true", | |
| help="Aplica transformação de Lorentz no delta final para medir o resultado físico", | |
| ) | |
| parser.add_argument( | |
| "--lorentz-beta", | |
| type=float, | |
| default=0.6, | |
| help="Fração da velocidade da luz usada no boost de Lorentz", | |
| ) | |
| parser.add_argument( | |
| "--lorentz-axis-stream", | |
| type=int, | |
| default=0, | |
| help="Stream usado como eixo espacial (0=X, 1=Y, etc.) na transformação de Lorentz", | |
| ) | |
| parser.add_argument( | |
| "--ia-config", | |
| type=str, | |
| default=None, | |
| help="Arquivo JSON descrevendo quais IAs assumem cada estágio/stream do fluxo matriz", | |
| ) | |
| parser.add_argument( | |
| "--refinement-cycles", | |
| type=int, | |
| default=0, | |
| help="Quantidade de ciclos circundantes de refinamento IA aplicados ao delta final", | |
| ) | |
| parser.add_argument( | |
| "--cycle-stage-name", | |
| type=str, | |
| default="cycle", | |
| help="Nome do estágio IA usado durante cada ciclo circundante", | |
| ) | |
| parser.add_argument("--max-seq-length", type=int, default=512, help="Comprimento máximo de tokens por exemplo (padrão BERT)") | |
| parser.add_argument("--precompute-embeddings", action="store_true", help="Codifica todas as sentenças antes do treino (requer muita RAM)") | |
| parser.add_argument("--embedding-batch-size", type=int, default=64, help="Batch interno para geração de embeddings (encode)") | |
| if argv is None: | |
| argv_list = sys.argv[1:] | |
| if "ipykernel" in sys.modules: | |
| filtered: List[str] = [] | |
| skip_next = False | |
| for item in argv_list: | |
| if skip_next: | |
| skip_next = False | |
| continue | |
| if item == "-f": | |
| skip_next = True | |
| continue | |
| if item.startswith("-f="): | |
| continue | |
| filtered.append(item) | |
| argv_list = filtered | |
| else: | |
| argv_list = list(argv) | |
| parsed, unknown = parser.parse_known_args(argv_list) | |
| if unknown: | |
| print(f"Ignorando argumentos desconhecidos: {unknown}") | |
| args = parsed | |
| args.hf_token, token_source = resolve_hf_token(args.hf_token, args.hf_token_file) | |
| if args.hf_token and token_source: | |
| print(f"Token Hugging Face detectado via {token_source}.") | |
| device = torch.device(args.device) if args.device else torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Usando dispositivo: {device}") | |
| embedding_model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" | |
| tokenizer_name = "neuralmind/bert-base-portuguese-cased" | |
| embedding_model = SentenceTransformer(embedding_model_name, device=str(device) if device.type == "cuda" else "cpu") | |
| tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token or tokenizer.sep_token or tokenizer.cls_token | |
| try: | |
| mlm_model = AutoModelForMaskedLM.from_pretrained(tokenizer_name, force_download=args.force_download) | |
| except Exception as exc: | |
| print(f"Download failed: {exc}. Try checking internet or cache.") | |
| raise | |
| sentences = load_sentences_from_args(args) | |
| dataset = PortugueseSentenceDataset( | |
| sentences, | |
| tokenizer, | |
| embedding_model, | |
| max_seq_length=args.max_seq_length, | |
| precompute_embeddings=args.precompute_embeddings, | |
| embedding_batch_size=args.embedding_batch_size, | |
| ) | |
| collate_fn = build_collate_fn(tokenizer) | |
| dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn) | |
| eval_dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn) | |
| stream_aliases: Optional[List[str]] = None | |
| ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None | |
| refinement_cycles = args.refinement_cycles | |
| config_cycle_stage = args.cycle_stage_name | |
| if args.ia_config: | |
| ia_config_data = load_ia_config_file(args.ia_config, args.embed_dim) | |
| print(f"Config IA carregada de {args.ia_config}") | |
| stream_aliases = ia_config_data.get("stream_aliases") | |
| ia_stage_config = ia_config_data.get("stage_config") # type: ignore[assignment] | |
| config_cycles = ia_config_data.get("refinement_cycles") | |
| if isinstance(config_cycles, int): | |
| refinement_cycles = config_cycles | |
| config_cycle_stage = str(ia_config_data.get("cycle_stage_name", config_cycle_stage)) | |
| vortex = VortexBetinaAntiHalluc( | |
| embed_dim=args.embed_dim, | |
| enable_rotation=not args.disable_rotation, | |
| rotation_angle=args.rotation_angle, | |
| rotation_threshold=args.rotation_threshold, | |
| rotation_clockwise=args.rotation_clockwise, | |
| enable_quadratic_reflection=args.enable_quadratic_reflection, | |
| quadratic_boundary=args.quadratic_boundary, | |
| quadratic_strength=args.quadratic_strength, | |
| stream_aliases=stream_aliases, | |
| ia_stage_config=ia_stage_config, | |
| refinement_cycles=refinement_cycles, | |
| cycle_stage_name=config_cycle_stage, | |
| enforce_square_geometry=not args.disable_square_geometry, | |
| square_rotation_degrees=args.square_rotation_degrees, | |
| square_leak_ratio=args.square_leak_ratio, | |
| square_jitter_std_degrees=args.square_jitter_std_deg, | |
| enable_lorentz_transform=args.enable_lorentz_transform, | |
| lorentz_beta=args.lorentz_beta, | |
| lorentz_axis_stream=args.lorentz_axis_stream, | |
| enable_triangle=not args.disable_triangle, | |
| triangle_hidden_dim=args.triangle_hidden_dim, | |
| triangle_max_iters=args.triangle_max_iters, | |
| triangle_tol=args.triangle_tol, | |
| triangle_delta_gain=args.triangle_delta_gain, | |
| ) | |
| trainer = BetinaTrainer( | |
| vortex=vortex, | |
| tokenizer=tokenizer, | |
| embedding_model=embedding_model, | |
| mlm_model=mlm_model, | |
| raw_embedding_dim=embedding_model.get_sentence_embedding_dimension(), | |
| embed_dim=args.embed_dim, | |
| lambda_vortex=args.lambda_vortex, | |
| learning_rate=args.learning_rate, | |
| mlm_learning_rate=args.mlm_learning_rate, | |
| freeze_mlm=args.freeze_mlm, | |
| freeze_projectors=args.freeze_projectors, | |
| correction_max_norm=args.correction_max_norm, | |
| eval_chaos_factor=args.eval_chaos_factor, | |
| chaos_factor=args.chaos_factor, | |
| device=device, | |
| max_seq_length=args.max_seq_length, | |
| ) | |
| if args.square_debug_json: | |
| square_max = max(1, args.square_debug_max) | |
| try: | |
| sample_batch = next(iter(dataloader)) | |
| except StopIteration as exc: # pragma: no cover - dataset vazio | |
| raise RuntimeError("Não é possível gerar dump quadrado: dataset vazio") from exc | |
| sample_embeddings = sample_batch["embedding"][:square_max].to(device) | |
| with torch.no_grad(): | |
| projected = trainer.embedding_projector(sample_embeddings) | |
| _, _, metrics_debug, _ = trainer.vortex(projected, chaos_factor=trainer.eval_chaos_factor) | |
| dump_square_debug( | |
| trainer.vortex.last_flow_states, | |
| metrics_debug, | |
| args.square_debug_json, | |
| max_examples=min(square_max, sample_embeddings.size(0)), | |
| ) | |
| print(f"Dump quadrado salvo em {args.square_debug_json}") | |
| if args.train: | |
| print("Iniciando treinamento...") | |
| history = trainer.train(dataloader, epochs=args.epochs) | |
| print(f"Treinamento finalizado com {len(history)} passos") | |
| trainer.save(args.output_dir) | |
| print(f"Modelos salvos em {args.output_dir}") | |
| if not args.skip_eval: | |
| ppl_base = trainer.evaluate_perplexity(eval_dataloader, apply_correction=False) | |
| ppl_betina = trainer.evaluate_perplexity(eval_dataloader, apply_correction=True) | |
| print(f"\nPerplexity sem correção: {ppl_base:.4f}") | |
| print(f"Perplexity com correção Betina: {ppl_betina:.4f}") | |
| ppl_delta = ppl_base - ppl_betina | |
| if ppl_base > 0: | |
| ppl_rel = ppl_delta / ppl_base * 100.0 | |
| print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: {ppl_rel:+.2f}%") | |
| else: | |
| print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: n/d") | |
| eval_prompts = args.eval_prompts or [ | |
| "O modelo Betina evita [MASK] durante a geração.", | |
| "A capital de Portugal é [MASK].", | |
| "A IA Betina corrige [MASK] via vórtice.", | |
| "O vórtice no Betina filtra [MASK] para reduzir alucinações.", | |
| ] | |
| print("\nPreenchimento sem correção:") | |
| base_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=False) | |
| for prompt, tokens in zip(eval_prompts, base_fills): | |
| print(prompt) | |
| for idx, group in enumerate(tokens, start=1): | |
| formatted = [f"{token} ({prob:.4f})" for token, prob in group] | |
| print(f" Mascara {idx}: {formatted}") | |
| print("\nPreenchimento com correção Betina:") | |
| betina_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=True) | |
| for prompt, tokens in zip(eval_prompts, betina_fills): | |
| print(prompt) | |
| for idx, group in enumerate(tokens, start=1): | |
| formatted = [f"{token} ({prob:.4f})" for token, prob in group] | |
| print(f" Mascara {idx}: {formatted}") | |
| print_gain_summary(eval_prompts, base_fills, betina_fills) | |
| if __name__ == "__main__": | |
| main() |