diff --git "a/bettina.py" "b/bettina.py" --- "a/bettina.py" +++ "b/bettina.py" @@ -1,1543 +1,1943 @@ -import argparse -import json -import math -import os -import sys -from pathlib import Path -from typing import Callable, Dict, List, Optional, Tuple - -import torch -import torch.nn as nn -import torch.nn.functional as F -import torch.optim as optim -from contextlib import nullcontext -from torch.nn.utils.rnn import pad_sequence -from torch.utils.data import DataLoader, Dataset - -try: - from torch.amp import autocast as _autocast, GradScaler as _GradScaler - - def betina_autocast(device_type: str, enabled: bool = True): - if not enabled or device_type != "cuda": - return nullcontext() - return _autocast(device_type=device_type, enabled=enabled) - - def betina_grad_scaler(device_type: str, enabled: bool = True): - if not enabled or device_type != "cuda": - return _NoOpGradScaler() - return _GradScaler(device_type=device_type, enabled=enabled) - -except ImportError: # pragma: no cover - from torch.cuda.amp import autocast as _autocast, GradScaler as _GradScaler - - def betina_autocast(device_type: str, enabled: bool = True): - if not enabled or device_type != "cuda": - return nullcontext() - return _autocast(enabled=enabled) - - def betina_grad_scaler(device_type: str, enabled: bool = True): - if not enabled or device_type != "cuda": - return _NoOpGradScaler() - return _GradScaler(enabled=enabled) - -try: - from sentence_transformers import SentenceTransformer -except ImportError as exc: # pragma: no cover - raise ImportError("Install sentence-transformers to run the Betina pipeline") from exc - -try: - from transformers import AutoModelForMaskedLM, AutoTokenizer -except ImportError as exc: # pragma: no cover - raise ImportError("Install transformers to run the Betina pipeline") from exc - -try: - from datasets import load_dataset # type: ignore[import-not-found] -except ImportError: # pragma: no cover - load_dataset = None - - -def _safe_load_dataset( - path: str, - name: Optional[str], - *, - split: str, - hf_token: Optional[str], - trust_remote_code: bool, -): - if load_dataset is None: - raise ImportError("Install the 'datasets' package to use Hugging Face corpora") - base_kwargs = {"split": split, "trust_remote_code": trust_remote_code} - attempts: List[Dict[str, Optional[str]]] = [] - if hf_token: - attempts.append({"token": hf_token}) - attempts.append({"use_auth_token": hf_token}) - attempts.append({}) - last_error: Optional[Exception] = None - for extra in attempts: - try: - return load_dataset(path, name, **base_kwargs, **extra) - except TypeError as err: - last_error = err - continue - except ValueError as err: - if "use_auth_token" in str(err).lower(): - last_error = err - continue - raise - if last_error: - raise last_error - raise RuntimeError(f"Falha ao carregar dataset {path}/{name}") - - -def _read_hf_token_file(path: Path) -> Optional[str]: - try: - content = path.read_text(encoding="utf-8").strip() - except OSError: - return None - if not content: - return None - first_line = content.splitlines()[0].strip() - return first_line or None - - -def resolve_hf_token(explicit_token: Optional[str], token_file: Optional[str]) -> Tuple[Optional[str], Optional[str]]: - """Resolve o token HF preferindo argumento, env vars e arquivo do huggingface-cli.""" - if explicit_token and explicit_token.strip(): - return explicit_token.strip(), "--hf-token" - env_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") - if env_token and env_token.strip(): - return env_token.strip(), "env" - file_candidates: List[Path] = [] - if token_file: - file_candidates.append(Path(token_file).expanduser()) - else: - hf_home = os.getenv("HF_HOME") - if hf_home: - file_candidates.append(Path(hf_home).expanduser() / "token") - file_candidates.append(Path.home() / ".cache" / "huggingface" / "token") - file_candidates.append(Path.home() / ".huggingface" / "token") - for candidate in file_candidates: - token = _read_hf_token_file(candidate) - if token: - return token, str(candidate) - return None, None - - -class _NoOpGradScaler: - def __init__(self): - pass - - def scale(self, loss): - return loss - - def step(self, optimizer): - optimizer.step() - - def update(self): - pass - - def unscale_(self, optimizer): - pass - - def state_dict(self): - return {} - - def load_state_dict(self, state): - pass - - -class CallableAgentAdapter(nn.Module): - def __init__(self, fn: Callable[[torch.Tensor], torch.Tensor], name: str): - super().__init__() - self.fn = fn - self.agent_name = name or getattr(fn, "__name__", "callable_agent") - - def forward(self, tensor: torch.Tensor) -> torch.Tensor: # noqa: D401 - return self.fn(tensor) - - -class MultiIntelligenceRouter(nn.Module): - """Despacha cada estágio do fluxo matriz para IAs distintas por stream/etapa.""" - - def __init__( - self, - num_streams: int, - *, - stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, - stream_aliases: Optional[List[str]] = None, - ): - super().__init__() - self.num_streams = num_streams - self.stream_aliases = stream_aliases or [f"S{idx}" for idx in range(num_streams)] - self.stage_modules = nn.ModuleDict() - self.stage_logs: Dict[str, List[Dict[str, str]]] = {} - if stage_config: - self.apply_stage_config(stage_config) - - def apply_stage_config(self, stage_config: Dict[str, Dict[str, nn.Module]]) -> None: - for stage_name, mapping in stage_config.items(): - module_dict = nn.ModuleDict() - for key, module in mapping.items(): - module_dict[str(key)] = self._wrap_module(stage_name, key, module) - self.stage_modules[stage_name] = module_dict - - def register_stage(self, stage_name: str, mapping: Dict[str, nn.Module]) -> None: - module_dict = nn.ModuleDict() - for key, module in mapping.items(): - module_dict[str(key)] = self._wrap_module(stage_name, key, module) - self.stage_modules[stage_name] = module_dict - - def _wrap_module(self, stage: str, key: str | int, module: nn.Module | Callable) -> nn.Module: - if isinstance(module, list): - wrapped = [self._wrap_module(stage, f"{key}_{idx}", item) for idx, item in enumerate(module)] - seq = nn.Sequential(*wrapped) - if not hasattr(seq, "agent_name"): - seq.agent_name = f"seq_{stage}_{key}" - return seq - if isinstance(module, nn.Module): - if not hasattr(module, "agent_name"): - module.agent_name = module.__class__.__name__ - return module - if callable(module): - name = getattr(module, "agent_name", None) or getattr(module, "__name__", f"{stage}_{key}_fn") - return CallableAgentAdapter(module, name) - raise TypeError(f"Módulo IA inválido para estágio {stage}/{key}: {type(module)}") - - def _select_module(self, stage_dict: nn.ModuleDict, key: str | int | None) -> Optional[nn.Module]: - if key is not None: - candidate_key = str(key) - if candidate_key in stage_dict: - return stage_dict[candidate_key] - for fallback in ("*", "default", "-1"): - if fallback in stage_dict: - return stage_dict[fallback] - return None - - def apply(self, stage: str, tensor: torch.Tensor, *, cycle_idx: Optional[int] = None) -> torch.Tensor: - stage_dict = self.stage_modules[stage] if stage in self.stage_modules else None - log: List[Dict[str, str]] = [] - if stage_dict is None: - self.stage_logs[stage] = log - return tensor - if tensor.dim() == 3: - outputs = [] - for stream_idx in range(tensor.size(1)): - module = self._select_module(stage_dict, stream_idx) - if module is None and stream_idx < len(self.stream_aliases): - module = self._select_module(stage_dict, self.stream_aliases[stream_idx]) - chunk = tensor[:, stream_idx, :] - if module is not None: - chunk = module(chunk) - log.append( - { - "stream": self.stream_aliases[stream_idx] if stream_idx < len(self.stream_aliases) else str(stream_idx), - "agent": getattr(module, "agent_name", module.__class__.__name__), - } - ) - outputs.append(chunk) - stacked = torch.stack(outputs, dim=1) - self.stage_logs[stage] = log - return stacked - module = None - if cycle_idx is not None: - module = self._select_module(stage_dict, cycle_idx) - if module is None: - module = self._select_module(stage_dict, "global") - if module is None: - module = self._select_module(stage_dict, None) - if module is None: - self.stage_logs[stage] = log - return tensor - updated = module(tensor) - alias = f"cycle_{cycle_idx}" if cycle_idx is not None else "global" - log.append({"stream": alias, "agent": getattr(module, "agent_name", module.__class__.__name__)}) - self.stage_logs[stage] = log - return updated - - def stage_signature(self, stage: str) -> str: - logs = self.stage_logs.get(stage, []) - if not logs: - return "identity" - return " | ".join(f"{entry['stream']}→{entry['agent']}" for entry in logs) - - def describe_all_stages(self) -> Dict[str, List[Dict[str, str]]]: - return {stage: list(entries) for stage, entries in self.stage_logs.items()} - - -def build_builtin_agent(name: str, embed_dim: int) -> nn.Module: - normalized = name.strip().lower() - if normalized in {"brock", "brockman", "brock ia"}: - module = nn.Sequential( - nn.LayerNorm(embed_dim), - nn.Linear(embed_dim, embed_dim * 2), - nn.GELU(), - nn.Linear(embed_dim * 2, embed_dim), - ) - elif normalized in {"chatgpt", "chatgpt 5.1", "chatgpt5.1", "gpt51"}: - module = nn.Sequential( - nn.LayerNorm(embed_dim), - nn.Linear(embed_dim, embed_dim), - nn.SiLU(), - nn.Linear(embed_dim, embed_dim), - ) - elif normalized in {"code", "code ia", "coder"}: - module = nn.Sequential( - nn.LayerNorm(embed_dim), - nn.Linear(embed_dim, embed_dim), - ) - elif normalized in {"critic", "mirror", "refiner"}: - module = nn.Sequential( - nn.LayerNorm(embed_dim), - nn.Linear(embed_dim, embed_dim * 3 // 2), - nn.GELU(), - nn.Linear(embed_dim * 3 // 2, embed_dim), - nn.LayerNorm(embed_dim), - ) - elif normalized in {"identity", "none"}: - module = nn.Identity() - else: - raise ValueError(f"Agente IA desconhecido: {name}") - module.agent_name = name - return module - - -def _build_custom_agent(agent_def: Dict[str, object], embed_dim: int) -> nn.Module: - if "style" in agent_def: - module = build_builtin_agent(str(agent_def["style"]), embed_dim) - module.agent_name = str(agent_def.get("name", agent_def["style"])) - return module - agent_type = str(agent_def.get("type", "mlp")).lower() - hidden = int(agent_def.get("hidden", embed_dim * 2)) - dropout = float(agent_def.get("dropout", 0.0)) - if agent_type == "mlp": - layers: List[nn.Module] = [ - nn.LayerNorm(embed_dim), - nn.Linear(embed_dim, hidden), - nn.GELU(), - ] - if dropout > 0: - layers.append(nn.Dropout(dropout)) - layers.append(nn.Linear(hidden, embed_dim)) - module = nn.Sequential(*layers) - elif agent_type == "linear": - module = nn.Sequential(nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim)) - else: - raise ValueError(f"Tipo de agente custom '{agent_type}' não suportado") - module.agent_name = str(agent_def.get("name", agent_type)) - return module - - -def load_ia_config_file(path: str, embed_dim: int) -> Dict[str, object]: - data = json.loads(Path(path).read_text(encoding="utf-8")) - stage_entries = data.get("stages", {}) - if not isinstance(stage_entries, dict): - raise ValueError("Campo 'stages' do arquivo IA precisa ser um objeto mapeando estágio→streams") - stage_config: Dict[str, Dict[str, nn.Module]] = {} - for stage, mapping in stage_entries.items(): - if not isinstance(mapping, dict): - raise ValueError(f"Estágio '{stage}' precisa mapear streams para agentes") - stage_config[stage] = {} - for stream_key, agent_def in mapping.items(): - if isinstance(agent_def, str): - module = build_builtin_agent(agent_def, embed_dim) - elif isinstance(agent_def, dict): - module = _build_custom_agent(agent_def, embed_dim) - else: - raise ValueError(f"Agente inválido para estágio '{stage}' stream '{stream_key}'") - stage_config[stage][str(stream_key)] = module - stream_aliases = data.get("stream_aliases") - if stream_aliases is not None and (not isinstance(stream_aliases, list) or not all(isinstance(x, str) for x in stream_aliases)): - raise ValueError("'stream_aliases' deve ser uma lista de strings") - return { - "stream_aliases": stream_aliases, - "stage_config": stage_config, - "refinement_cycles": int(data.get("refinement_cycles", 0)), - "cycle_stage_name": str(data.get("cycle_stage_name", "cycle")), - } - - -class VortexBetinaAntiHalluc(nn.Module): - def __init__( - self, - embed_dim: int = 256, - vortex_steps: int = 10, - vortex_dt: float = 0.02, - num_streams: int = 3, - enable_rotation: bool = True, - rotation_angle: float = math.pi / 4, - rotation_threshold: float = 1e-4, - rotation_clockwise: bool = False, - enable_quadratic_reflection: bool = False, - quadratic_boundary: float = 1.0, - quadratic_strength: float = 0.5, - boost_small_deltas: bool = True, - delta_gain: float = 1.5, - reflection_push: float = 0.25, - stream_aliases: Optional[List[str]] = None, - ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, - refinement_cycles: int = 0, - cycle_stage_name: str = "cycle", - enforce_square_geometry: bool = True, - square_rotation_degrees: float = 180.0, - square_leak_ratio: float = 0.05, - square_jitter_std_degrees: float = 0.0, - enable_lorentz_transform: bool = False, - lorentz_beta: float = 0.6, - lorentz_axis_stream: int = 0, - ): - super().__init__() - self.embed_dim = embed_dim - self.vortex_steps = vortex_steps - self.vortex_dt = vortex_dt - self.num_streams = max(1, num_streams) - self.enable_rotation = enable_rotation - self.rotation_angle = rotation_angle - self.rotation_threshold = rotation_threshold - self.rotation_clockwise = rotation_clockwise - self.enable_quadratic_reflection = enable_quadratic_reflection - self.quadratic_boundary = quadratic_boundary - self.quadratic_strength = float(min(1.0, max(0.0, quadratic_strength))) - self.boost_small_deltas = boost_small_deltas - self.delta_gain = max(1.0, delta_gain) - self.reflection_push = max(0.0, reflection_push) - self.stream_aliases = self._prepare_stream_aliases(stream_aliases) - self.refinement_cycles = max(0, refinement_cycles) - self.cycle_stage_name = cycle_stage_name or "cycle" - self.enforce_square_geometry = enforce_square_geometry - self.square_rotation_radians = math.radians(square_rotation_degrees) - self.square_leak_ratio = float(min(1.0, max(0.0, square_leak_ratio))) - self.square_jitter_std = math.radians(max(0.0, square_jitter_std_degrees)) - beta = max(-0.999, min(0.999, lorentz_beta)) - self.enable_lorentz_transform = enable_lorentz_transform - self.lorentz_beta = beta - self.lorentz_axis_stream = max(0, lorentz_axis_stream) - self.ia_router = MultiIntelligenceRouter( - num_streams=self.num_streams, - stage_config=ia_stage_config, - stream_aliases=self.stream_aliases, - ) - self.projector = nn.Sequential( - nn.LayerNorm(embed_dim), - nn.Linear(embed_dim, embed_dim), - ) - self.last_flow_states: Dict[str, object] = {} - - def _prepare_stream_aliases(self, provided: Optional[List[str]]) -> List[str]: - if provided: - aliases = list(provided) - else: - aliases = ["X", "Y", "Z", "W", "V", "U", "T", "S"] - if len(aliases) < self.num_streams: - aliases.extend(f"S{idx}" for idx in range(len(aliases), self.num_streams)) - return aliases[: self.num_streams] - - def _rotate_matrix_plane(self, tensor: torch.Tensor, radians: float) -> torch.Tensor: - if tensor.shape[-1] < 2: - return tensor - angle = radians % (2 * math.pi) - if abs(angle) < 1e-9: - return tensor - rotated = tensor.clone() - cos_theta = math.cos(angle) - sin_theta = math.sin(angle) - x = tensor[..., 0] - y = tensor[..., 1] - rotated[..., 0] = cos_theta * x - sin_theta * y - rotated[..., 1] = sin_theta * x + cos_theta * y - return rotated - - def _rotate_matrix_square(self, tensor: torch.Tensor) -> Tuple[torch.Tensor, float]: - base_angle = abs(self.square_rotation_radians) % (2 * math.pi) - if base_angle > math.pi: - base_angle = 2 * math.pi - base_angle - jitter = 0.0 - if self.square_jitter_std > 0: - jitter = torch.randn(1, device=tensor.device).item() * self.square_jitter_std - angle = max(0.0, min(math.pi, base_angle + jitter)) - if math.isclose(angle, 0.0, rel_tol=1e-6, abs_tol=1e-6): - mirrored = tensor.clone() - else: - intensity = angle / math.pi - mirrored = torch.lerp(tensor, -tensor, intensity) - if self.square_leak_ratio > 0: - mirrored = torch.lerp(mirrored, tensor, self.square_leak_ratio) - return mirrored, angle - - def _select_axis_stream(self, tensor: torch.Tensor) -> torch.Tensor: - stream_idx = min(self.lorentz_axis_stream, tensor.size(1) - 1) - return tensor[:, stream_idx, :] - - def _lorentz_boost( - self, - spatial: torch.Tensor, - reference_stream: torch.Tensor, - time_like: torch.Tensor, - ) -> Tuple[torch.Tensor, float, torch.Tensor]: - beta = self.lorentz_beta - gamma = 1.0 / math.sqrt(max(1e-8, 1.0 - beta**2)) - axis = F.normalize(reference_stream, dim=-1, eps=1e-6) - parallel_scalar = torch.sum(spatial * axis, dim=-1, keepdim=True) - parallel_vec = parallel_scalar * axis - perpendicular_vec = spatial - parallel_vec - t_prime = gamma * (time_like - beta * parallel_scalar) - parallel_prime = gamma * (parallel_scalar - beta * time_like) * axis - updated_spatial = parallel_prime + perpendicular_vec - return updated_spatial, gamma, t_prime.abs() - - def invert(self, vec: torch.Tensor) -> torch.Tensor: - return -vec - - def intersection_knowledge(self, vec: torch.Tensor, base_k: torch.Tensor) -> torch.Tensor: - dot = torch.sum(vec * base_k, dim=-1, keepdim=True) - norm_k = torch.norm(base_k, dim=-1, keepdim=True).pow(2).clamp_min(1e-8) - return (dot / norm_k) * base_k - - def euler_vortex(self, state: torch.Tensor) -> torch.Tensor: - sigma, rho, beta, gamma = 10.0, 28.0, 8.0 / 3.0, 1.0 - feature_dim = state.shape[-1] - 1 - base = state[..., :-1] - w = state[..., -1:] - updated_base = torch.zeros_like(base) - for i in range(0, feature_dim, 3): - chunk = base[..., i : i + 3] - if chunk.shape[-1] < 3: - chunk = F.pad(chunk, (0, 3 - chunk.shape[-1])) - chunk = chunk.clone() - for _ in range(self.vortex_steps): - x = chunk[..., 0] - y = chunk[..., 1] - z = chunk[..., 2] - dx = sigma * (y - x) * self.vortex_dt - dy = (x * (rho - z) - y) * self.vortex_dt - dz = (x * y - beta * z) * self.vortex_dt - chunk = chunk + torch.stack([dx, dy, dz], dim=-1) - span = min(3, feature_dim - i) - updated_base[..., i : i + span] = chunk[..., :span] - energy = torch.norm(base, dim=-1, keepdim=True).pow(2) - w_iter = w.clone() - for _ in range(self.vortex_steps): - dw = (gamma * energy - w_iter) * self.vortex_dt - w_iter = w_iter + dw - updated = state.clone() - updated[..., :-1] = updated_base - updated[..., -1:] = w_iter - return updated - - def rotate_difference(self, delta: torch.Tensor, anchor: torch.Tensor) -> torch.Tensor: # noqa: ARG002 - # Rotation now confined to the (x, y) plane while keeping z (and higher dims) untouched. - if delta.shape[-1] < 2: - return delta - angle = -abs(self.rotation_angle) if self.rotation_clockwise else abs(self.rotation_angle) - cos_theta = math.cos(angle) - sin_theta = math.sin(angle) - rotated = delta.clone() - x = delta[..., 0] - y = delta[..., 1] - rotated[..., 0] = cos_theta * x - sin_theta * y - rotated[..., 1] = sin_theta * x + cos_theta * y - if delta.shape[-1] >= 3: - rotated[..., 2] = delta[..., 2] # keep z static per vortex requirement - return rotated - - def quadratic_reflection(self, delta: torch.Tensor) -> Tuple[torch.Tensor, float]: - """Reflect components between ±boundary and warp them quadratically (Pong-like bounce).""" - if not self.enable_quadratic_reflection or self.quadratic_boundary <= 0: - return delta, 0.0 - boundary = self.quadratic_boundary - period = 2.0 * boundary - magnitude = delta.abs() - direction = torch.sign(delta) - wrapped = torch.remainder(magnitude, period) - mirrored = torch.where(wrapped > boundary, period - wrapped, wrapped) - normalized = (mirrored / boundary).clamp(0.0, 1.0) - squared = normalized.pow(2) - bounced = direction * boundary * squared - blended = torch.lerp(delta, bounced, self.quadratic_strength) - bounce_ratio = (magnitude > boundary).float().mean().item() - return blended, bounce_ratio - - def forward(self, input_vec: torch.Tensor, chaos_factor: float = 1.0): - """ - chaos_factor: Multiplicador de agressividade (1.0 = normal, 5.0 = agressivo, 10.0 = insano) - """ - if input_vec.dim() == 2: - batch = input_vec.size(0) - mat_input = input_vec.unsqueeze(1).expand(batch, self.num_streams, self.embed_dim) - elif input_vec.dim() == 3: - batch, streams, dim = input_vec.shape - if streams != self.num_streams or dim != self.embed_dim: - raise ValueError( - f"Esperava tensor (batch,{self.num_streams},{self.embed_dim}), recebi {input_vec.shape}" - ) - mat_input = input_vec - else: - raise ValueError("input_vec precisa ter 2 ou 3 dimensões") - - mat_flat = mat_input.reshape(-1, self.embed_dim) - mat_primary = self.projector(mat_flat).reshape(-1, self.num_streams, self.embed_dim) - stage_signatures: Dict[str, str] = {} - mat_primary = self.ia_router.apply("base", mat_primary) - stage_signatures["base"] = self.ia_router.stage_signature("base") - mirrored_primary: Optional[torch.Tensor] = None - square_tensor: Optional[torch.Tensor] = None - square_angle_applied: Optional[float] = None - if self.enforce_square_geometry: - mirrored_primary, square_angle_applied = self._rotate_matrix_square(mat_primary) - square_tensor = torch.stack([mat_primary, mirrored_primary], dim=2) - mat_secondary = mirrored_primary - else: - mat_secondary = self.invert(mat_primary) - mat_secondary = self.ia_router.apply("inversion", mat_secondary) - stage_signatures["inversion"] = self.ia_router.stage_signature("inversion") - base_k = mat_primary.mean(dim=1, keepdim=True) + 1e-4 * torch.randn_like(mat_primary[:, :1, :]) - inter_primary = self.intersection_knowledge(mat_primary, base_k) - inter_secondary = self.intersection_knowledge(mat_secondary, base_k) - approx_inter_full = 0.5 * (inter_primary + inter_secondary) - approx_inter = approx_inter_full.mean(dim=1) - flow_states: Dict[str, object] = {} - flow_states["matrix_primary"] = mat_primary - flow_states["matrix_secondary"] = mat_secondary - flow_states["base_core"] = base_k.squeeze(1) - if mirrored_primary is not None: - flow_states["matrix_rot_180"] = mirrored_primary - if square_tensor is not None: - flow_states["matrix_square"] = square_tensor - flow_states["square_leak_ratio"] = self.square_leak_ratio - if square_angle_applied is not None: - flow_states["square_angle"] = square_angle_applied - - delta_green = inter_primary - base_k - flow_states["xy_bridge_matrix"] = delta_green - - delta_norm_stream = torch.norm(delta_green, dim=-1, keepdim=True) - if self.boost_small_deltas: - boost_mask = delta_norm_stream < self.rotation_threshold - if boost_mask.any(): - safe_norm = delta_norm_stream.clamp_min(1e-6) - factor = (self.rotation_threshold / safe_norm) * self.delta_gain - delta_green = torch.where(boost_mask, delta_green * factor, delta_green) - boost_ratio = boost_mask.float().mean().item() - else: - boost_ratio = 0.0 - flow_states["matrix_green_boost"] = delta_green - - if self.enable_rotation: - delta_flat = delta_green.reshape(-1, self.embed_dim) - base_flat = base_k.expand(-1, self.num_streams, -1).reshape(-1, self.embed_dim) - delta_norm = torch.norm(delta_flat, dim=-1, keepdim=True) - rotation_mask = delta_norm > self.rotation_threshold - if rotation_mask.any(): - rotated = self.rotate_difference(delta_flat, base_flat) - delta_flat = torch.where(rotation_mask, rotated, delta_flat) - delta_green = delta_flat.view(-1, self.num_streams, self.embed_dim) - rotation_ratio = rotation_mask.float().mean().item() - else: - rotation_ratio = 0.0 - flow_states["matrix_green_rot"] = delta_green - - if self.enable_quadratic_reflection and self.reflection_push > 0: - norm_after_rot = torch.norm(delta_green, dim=-1, keepdim=True) - push_mask = norm_after_rot < self.quadratic_boundary - if push_mask.any(): - push_factor = 1.0 + self.reflection_push - delta_green = torch.where(push_mask, delta_green * push_factor, delta_green) - pre_reflect_push_ratio = push_mask.float().mean().item() - else: - pre_reflect_push_ratio = 0.0 - - if self.enable_quadratic_reflection: - delta_green, reflection_ratio = self.quadratic_reflection(delta_green) - else: - reflection_ratio = 0.0 - flow_states["matrix_green_reflect"] = delta_green - - mirror_reference = self.intersection_knowledge(mat_secondary, base_k) - matrix_black = 0.5 * (2 * base_k - delta_green + mirror_reference) - matrix_black = self.ia_router.apply("mirror", matrix_black) - stage_signatures["mirror"] = self.ia_router.stage_signature("mirror") - if self.enforce_square_geometry: - x_stream = mat_primary[:, 0, :] - x_output = mat_secondary[:, 0, :] - matrix_black[:, 0, :] = x_output - flow_states["square_input"] = x_stream - flow_states["square_output"] = x_output - flow_states["matrix_black_square"] = matrix_black - - flow_states["matrix_black"] = matrix_black - - delta_inter = matrix_black.mean(dim=1) - cycle_signatures: List[str] = [] - if self.refinement_cycles > 0: - refined_delta = delta_inter - for cycle_idx in range(self.refinement_cycles): - refined_delta = self.ia_router.apply( - self.cycle_stage_name, - refined_delta, - cycle_idx=cycle_idx, - ) - cycle_signatures.append(self.ia_router.stage_signature(self.cycle_stage_name)) - delta_inter = refined_delta - stage_signatures[self.cycle_stage_name] = cycle_signatures[-1] if cycle_signatures else "identity" - flow_states["delta_pre_lorentz"] = delta_inter - flow_states["ia_cycle_signatures"] = cycle_signatures - - w = torch.norm(delta_inter, dim=-1, keepdim=True) - lorentz_gamma = 1.0 - if self.enable_lorentz_transform: - reference_stream = self._select_axis_stream(mat_primary) - delta_inter, lorentz_gamma, w = self._lorentz_boost(delta_inter, reference_stream, w) - flow_states["lorentz_reference"] = reference_stream - flow_states["delta_lorentz"] = delta_inter - flow_states["lorentz_gamma"] = lorentz_gamma - flow_states["lorentz_time"] = w - - # APLICAÇÃO DO FATOR CAOS (OVERDRIVE) - # Se chaos_factor > 1, amplificamos o delta final para forçar a correção - if chaos_factor != 1.0: - delta_inter = delta_inter * chaos_factor - - flow_states["delta_output"] = delta_inter - - state = torch.cat([approx_inter, delta_inter, w], dim=-1) - evolved = self.euler_vortex(state) - flow_states["output_corner"] = evolved[..., :-1] - flow_states["ia_stage_logs"] = self.ia_router.describe_all_stages() - self.last_flow_states = flow_states - overlap = F.cosine_similarity(inter_primary.view(-1, self.embed_dim), inter_secondary.view(-1, self.embed_dim), dim=-1) - overlap = overlap.view(-1, self.num_streams).mean(dim=1) - annulation = torch.norm(mat_primary - (-mat_secondary), dim=-1).pow(2).mean(dim=1) - vortex_sink = evolved[..., -1] - hall_penalty = (1 - overlap).clamp_min(0.0) - approx_pull = ( - torch.norm(mat_primary.mean(dim=1) - approx_inter, dim=-1).pow(2) - + torch.norm(mat_secondary.mean(dim=1) - approx_inter, dim=-1).pow(2) - ) - loss = ( - annulation.mean() - + 0.5 * hall_penalty.mean() - + 0.25 * approx_pull.mean() - - 0.1 * vortex_sink.mean() - ) - metrics = { - "annulation": annulation.mean().item(), - "cosine_overlap": overlap.mean().item(), - "vortex_energy": vortex_sink.mean().item(), - "rotation_ratio": rotation_ratio, - "approx_alignment": approx_pull.mean().item(), - "reflection_ratio": reflection_ratio, - "boost_ratio": boost_ratio, - "reflection_push_ratio": pre_reflect_push_ratio, - "ia_base": stage_signatures.get("base", "identity"), - "ia_inversion": stage_signatures.get("inversion", "identity"), - "ia_mirror": stage_signatures.get("mirror", "identity"), - "ia_cycle": " || ".join(cycle_signatures) if cycle_signatures else stage_signatures.get(self.cycle_stage_name, "identity"), - "lorentz_gamma": lorentz_gamma, - "square_angle_deg": math.degrees(square_angle_applied) if square_angle_applied is not None else 0.0, - "square_leak_ratio": self.square_leak_ratio, - } - return evolved, loss, metrics, delta_inter - - -class PortugueseSentenceDataset(Dataset): - def __init__( - self, - sentences: List[str], - tokenizer: AutoTokenizer, - embedding_model: SentenceTransformer, - mask_prob: float = 0.15, - max_seq_length: int = 512, - precompute_embeddings: bool = False, - embedding_batch_size: int = 64, - ): - self.sentences = sentences - self.tokenizer = tokenizer - self.embedding_model = embedding_model - self.mask_prob = mask_prob - self.max_seq_length = max_seq_length - self.precompute_embeddings = precompute_embeddings - self.embedding_batch_size = max(1, embedding_batch_size) - self.embeddings: Optional[torch.Tensor] = None - self._embedding_cache: Dict[int, torch.Tensor] = {} - if self.precompute_embeddings: - self._precompute_all_embeddings() - - def __len__(self) -> int: - return len(self.sentences) - - def _mask_tokens(self, input_ids: torch.Tensor, special_mask: torch.Tensor) -> Dict[str, torch.Tensor]: - labels = input_ids.clone() - probability_matrix = torch.full(labels.shape, self.mask_prob) - probability_matrix.masked_fill_(special_mask.bool(), 0.0) - masked_indices = torch.bernoulli(probability_matrix).bool() - if not masked_indices.any(): - candidate_positions = (~special_mask.bool()).nonzero(as_tuple=False).view(-1) - choice = candidate_positions[torch.randint(0, candidate_positions.numel(), (1,)).item()] - masked_indices[choice] = True - labels[~masked_indices] = -100 - input_ids = input_ids.clone() - input_ids[masked_indices] = self.tokenizer.mask_token_id - return {"input_ids": input_ids, "labels": labels} - - def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]: - sentence = self.sentences[idx] - encoding = self.tokenizer( - sentence, - return_tensors="pt", - return_special_tokens_mask=True, - truncation=True, - max_length=self.max_seq_length, - ) - input_ids = encoding["input_ids"].squeeze(0) - attention_mask = encoding["attention_mask"].squeeze(0) - special_mask = encoding["special_tokens_mask"].squeeze(0) - masked = self._mask_tokens(input_ids, special_mask) - embedding = self._get_embedding(idx) - return { - "input_ids": masked["input_ids"], - "attention_mask": attention_mask, - "labels": masked["labels"], - "embedding": embedding, - } - - @torch.no_grad() - def _precompute_all_embeddings(self) -> None: - chunks: List[torch.Tensor] = [] - total = len(self.sentences) - for start in range(0, total, self.embedding_batch_size): - batch = self.sentences[start : start + self.embedding_batch_size] - batch_embeds = self.embedding_model.encode( - batch, - convert_to_tensor=True, - show_progress_bar=False, - batch_size=self.embedding_batch_size, - ) - chunks.append(batch_embeds.float().cpu()) - self.embeddings = torch.cat(chunks, dim=0) if chunks else torch.empty(0) - - @torch.no_grad() - def _compute_single_embedding(self, sentence: str) -> torch.Tensor: - embed = self.embedding_model.encode( - sentence, - convert_to_tensor=True, - show_progress_bar=False, - batch_size=1, - ) - return embed.float().cpu() - - def _get_embedding(self, idx: int) -> torch.Tensor: - if self.embeddings is not None: - return self.embeddings[idx] - if idx not in self._embedding_cache: - self._embedding_cache[idx] = self._compute_single_embedding(self.sentences[idx]) - return self._embedding_cache[idx] - - -def build_collate_fn(tokenizer: AutoTokenizer): - pad_id = tokenizer.pad_token_id - - def collate(batch: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]: - input_ids = pad_sequence([item["input_ids"] for item in batch], batch_first=True, padding_value=pad_id) - attention_mask = pad_sequence([item["attention_mask"] for item in batch], batch_first=True, padding_value=0) - labels = pad_sequence([item["labels"] for item in batch], batch_first=True, padding_value=-100) - embeddings = torch.stack([item["embedding"] for item in batch]) - return { - "input_ids": input_ids, - "attention_mask": attention_mask, - "labels": labels, - "embedding": embeddings, - } - - return collate - - -class BetinaTrainer: - def __init__( - self, - vortex: VortexBetinaAntiHalluc, - tokenizer: AutoTokenizer, - embedding_model: SentenceTransformer, - mlm_model: AutoModelForMaskedLM, - raw_embedding_dim: int, - embed_dim: int, - lambda_vortex: float = 0.5, - learning_rate: float = 1e-4, - mlm_learning_rate: float = 5e-5, - freeze_mlm: bool = False, - freeze_projectors: bool = False, - correction_max_norm: float | None = None, - device: torch.device | None = None, - max_seq_length: int = 512, - ): - self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") - self.vortex = vortex.to(self.device) - self.tokenizer = tokenizer - self.embedding_model = embedding_model - self.mlm_model = mlm_model.to(self.device) - self.lambda_vortex = lambda_vortex - self.embedding_projector = nn.Linear(raw_embedding_dim, embed_dim).to(self.device) - hidden_size = self.mlm_model.config.hidden_size - self.correction_projector = nn.Linear(embed_dim, hidden_size).to(self.device) - self.freeze_projectors = freeze_projectors - self.correction_max_norm = correction_max_norm if correction_max_norm and correction_max_norm > 0 else None - self.max_seq_length = max_seq_length - mlm_params = list(self.mlm_model.parameters()) - if freeze_mlm: - for param in mlm_params: - param.requires_grad = False - projector_params = list(self.embedding_projector.parameters()) + list(self.correction_projector.parameters()) - if freeze_projectors: - for param in projector_params: - param.requires_grad = False - trainable_projectors = [] if freeze_projectors else projector_params - vortex_params = list(self.vortex.parameters()) + trainable_projectors - optimizer_groups = [ - {"params": vortex_params, "lr": learning_rate}, - ] - if not freeze_mlm: - optimizer_groups.append({"params": mlm_params, "lr": mlm_learning_rate}) - self.optimizer = optim.AdamW(optimizer_groups) - self.freeze_mlm = freeze_mlm - self.scaler = betina_grad_scaler(self.device.type, enabled=self.device.type == "cuda") - - def _project_correction(self, delta: torch.Tensor) -> torch.Tensor: - correction = self.correction_projector(delta) - if self.correction_max_norm is not None: - dim = correction.dim() - 1 if correction.dim() > 0 else 0 - correction = correction.renorm(p=2, dim=dim, maxnorm=self.correction_max_norm) - return correction - - def train(self, dataloader: DataLoader, epochs: int = 5, grad_clip: float = 1.0) -> List[Dict[str, float]]: - history: List[Dict[str, float]] = [] - self.vortex.train() - self.mlm_model.train() - for epoch in range(epochs): - for step, batch in enumerate(dataloader): - input_ids = batch["input_ids"].to(self.device) - attention_mask = batch["attention_mask"].to(self.device) - labels = batch["labels"].to(self.device) - embeds = batch["embedding"].to(self.device) - self.optimizer.zero_grad(set_to_none=True) - with betina_autocast(self.device.type, enabled=self.device.type == "cuda"): - projected = self.embedding_projector(embeds) - _, vortex_loss, metrics, delta = self.vortex(projected) - if self.freeze_mlm: - with torch.no_grad(): - outputs = self.mlm_model( - input_ids=input_ids, - attention_mask=attention_mask, - output_hidden_states=True, - return_dict=True, - ) - else: - outputs = self.mlm_model( - input_ids=input_ids, - attention_mask=attention_mask, - output_hidden_states=True, - return_dict=True, - ) - hidden = outputs.hidden_states[-1] - correction = self._project_correction(delta).unsqueeze(1) - corrected_hidden = hidden + correction - if hasattr(self.mlm_model, "cls"): - logits = self.mlm_model.cls(corrected_hidden) - else: - logits = self.mlm_model.get_output_embeddings()(corrected_hidden) - mask_positions = labels != -100 - if mask_positions.any(): - mlm_loss = F.cross_entropy(logits[mask_positions], labels[mask_positions]) - else: - mlm_loss = torch.zeros(1, device=self.device) - total_loss = mlm_loss + self.lambda_vortex * vortex_loss - self.scaler.scale(total_loss).backward() - torch.nn.utils.clip_grad_norm_(self.parameters(), grad_clip) - self.scaler.step(self.optimizer) - self.scaler.update() - perplexity = torch.exp(mlm_loss.detach()) - record = { - "epoch": epoch + 1, - "step": step + 1, - "mlm_loss": mlm_loss.detach().item(), - "vortex_loss": vortex_loss.detach().item(), - "total_loss": total_loss.detach().item(), - "perplexity": perplexity.item(), - "vortex_energy": metrics["vortex_energy"], - "cosine_overlap": metrics["cosine_overlap"], - "rotation_ratio": metrics["rotation_ratio"], - "approx_alignment": metrics["approx_alignment"], - "reflection_ratio": metrics["reflection_ratio"], - "boost_ratio": metrics.get("boost_ratio", 0.0), - "reflection_push_ratio": metrics.get("reflection_push_ratio", 0.0), - } - history.append(record) - if step % 10 == 0: - print( - f"Epoch {record['epoch']:03d} Step {record['step']:04d} | " - f"Total {record['total_loss']:.4f} | MLM {record['mlm_loss']:.4f} | " - f"PPL {record['perplexity']:.4f} | " - f"Vortex {record['vortex_loss']:.4f} | Overlap {record['cosine_overlap']:.4f} | " - f"Energy {record['vortex_energy']:.4f} | Rotation {record['rotation_ratio']:.3f} | " - f"Reflect {record['reflection_ratio']:.3f} | Boost {record['boost_ratio']:.3f} | " - f"PreReflect {record['reflection_push_ratio']:.3f} | Approx {record['approx_alignment']:.4f}" - ) - return history - - def parameters(self): - for module in (self.vortex, self.embedding_projector, self.correction_projector, self.mlm_model): - for param in module.parameters(): - if param.requires_grad: - yield param - - @torch.no_grad() - def evaluate_perplexity(self, dataloader: DataLoader, apply_correction: bool = True) -> float: - self.vortex.eval() - self.mlm_model.eval() - total_loss = 0.0 - total_tokens = 0 - for batch in dataloader: - input_ids = batch["input_ids"].to(self.device) - attention_mask = batch["attention_mask"].to(self.device) - labels = batch["labels"].to(self.device) - embeds = batch["embedding"].to(self.device) - outputs = self.mlm_model( - input_ids=input_ids, - attention_mask=attention_mask, - output_hidden_states=True, - return_dict=True, - ) - hidden = outputs.hidden_states[-1] - if apply_correction: - projected = self.embedding_projector(embeds) - _, _, _, delta = self.vortex(projected) - correction = self._project_correction(delta).unsqueeze(1) - hidden = hidden + correction - if hasattr(self.mlm_model, "cls"): - logits = self.mlm_model.cls(hidden) - else: - logits = self.mlm_model.get_output_embeddings()(hidden) - mask_positions = labels != -100 - if mask_positions.any(): - loss = F.cross_entropy(logits[mask_positions], labels[mask_positions], reduction="sum") - total_loss += loss.item() - total_tokens += mask_positions.sum().item() - if total_tokens == 0: - return float("inf") - return math.exp(total_loss / total_tokens) - - @torch.no_grad() - def save(self, output_dir: str) -> None: - output_path = Path(output_dir) - output_path.mkdir(parents=True, exist_ok=True) - torch.save(self.vortex.state_dict(), output_path / "vortex.pt") - torch.save(self.embedding_projector.state_dict(), output_path / "embedding_projector.pt") - torch.save(self.correction_projector.state_dict(), output_path / "correction_projector.pt") - self.mlm_model.save_pretrained(output_path / "mlm") - self.tokenizer.save_pretrained(output_path / "mlm") - - @torch.no_grad() - def fill_masks( - self, - texts: List[str], - top_k: int = 5, - apply_correction: bool = True, - ) -> List[List[List[Tuple[str, float]]]]: - self.vortex.eval() - self.mlm_model.eval() - encodings = self.tokenizer( - texts, - return_tensors="pt", - padding=True, - truncation=True, - max_length=self.max_seq_length, - ) - input_ids = encodings["input_ids"].to(self.device) - attention_mask = encodings["attention_mask"].to(self.device) - outputs = self.mlm_model( - input_ids=input_ids, - attention_mask=attention_mask, - output_hidden_states=True, - return_dict=True, - ) - hidden = outputs.hidden_states[-1] - if apply_correction: - embeds = self.embedding_model.encode(texts, convert_to_tensor=True, show_progress_bar=False).to(self.device) - projected = self.embedding_projector(embeds) - _, _, _, delta = self.vortex(projected) - correction = self._project_correction(delta).unsqueeze(1) - hidden = hidden + correction - if hasattr(self.mlm_model, "cls"): - logits = self.mlm_model.cls(hidden) - else: - logits = self.mlm_model.get_output_embeddings()(hidden) - mask_positions = (input_ids == self.tokenizer.mask_token_id) - results: List[List[List[Tuple[str, float]]]] = [] - for batch_index in range(input_ids.size(0)): - batch_tokens: List[List[Tuple[str, float]]] = [] - positions = mask_positions[batch_index].nonzero(as_tuple=False).view(-1) - for position in positions: - token_logits = logits[batch_index, position] - token_probs = F.softmax(token_logits, dim=-1) - topk = torch.topk(token_probs, top_k) - decoded: List[Tuple[str, float]] = [] - for token_id, prob in zip(topk.indices, topk.values): - token = self.tokenizer.decode([token_id]).strip() - decoded.append((token, prob.item())) - batch_tokens.append(decoded) - results.append(batch_tokens) - return results - - -def run_demo(embed_dim: int = 4, batch_size: int = 3, seed: int = 123) -> None: - torch.manual_seed(seed) - model = VortexBetinaAntiHalluc(embed_dim=embed_dim) - inputs = torch.randn(batch_size, embed_dim) - evolved, loss, metrics, delta = model(inputs) - print("Dimensão de entrada:", embed_dim) - print("Inputs:", inputs) - print("Estado evoluído shape:", evolved.shape) - print("Delta shape:", delta.shape) - print("Loss:", loss.item()) - for key, value in metrics.items(): - print(f"{key}: {value}") - - -def sample_sentences() -> List[str]: - return [ - "O céu de Lisboa estava completamente claro naquela manhã.", - "A inteligência coletiva da equipe resolveu o problema rapidamente.", - "O gato preto dormia tranquilo sobre o sofá da sala.", - "A orquestra executou a sinfonia com uma precisão impressionante.", - "Os dados indicam uma redução consistente nas alucinações do modelo.", - "A pesquisa científica requer paciência, rigor e curiosidade constante.", - "A ponte antiga foi restaurada para preservar o patrimônio cultural.", - "O sistema Betina ajusta embeddings para evitar distorções semânticas.", - ] - - -def load_sentences_from_args(args: argparse.Namespace) -> List[str]: - if args.dataset_file: - path = Path(args.dataset_file) - if not path.exists(): - raise FileNotFoundError(f"Dataset file not found: {path}") - sentences = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] - if not sentences: - raise ValueError(f"Dataset file {path} is empty") - return sentences if args.dataset_limit is None else sentences[: args.dataset_limit] - if args.dataset_hf: - if load_dataset is None: - raise ImportError("Install the 'datasets' package to use --dataset-hf option") - split = args.dataset_split - if args.dataset_limit and ":" not in split: - split = f"{split}[:{args.dataset_limit}]" - dataset_name = args.dataset_hf - config_name: Optional[str] = args.dataset_hf_config or None - try: - dataset = _safe_load_dataset( - dataset_name, - config_name, - split=split, - hf_token=args.hf_token, - trust_remote_code=args.trust_remote_code, - ) - except Exception as exc: - message = str(exc).lower() - scripts_blocked = "dataset scripts are no longer supported" in message - trust_flag_blocked = "trust_remote_code" in message - if dataset_name == "wikipedia" and (scripts_blocked or trust_flag_blocked): - fallback_name = "wikimedia/wikipedia" - fallback_config = config_name or "20231101.pt" - print( - "Dataset 'wikipedia' agora usa snapshot parquet e não aceita mais scripts remotos." - f" Alternando automaticamente para {fallback_name} ({fallback_config})." - ) - dataset = _safe_load_dataset( - fallback_name, - fallback_config, - split=split, - hf_token=args.hf_token, - trust_remote_code=False, - ) - elif scripts_blocked and not args.trust_remote_code: - hint = ( - "O dataset solicita código remoto. Reexecute com --trust-remote-code para habilitar" - " scripts do autor do dataset. Apenas use se confiar na fonte." - ) - raise RuntimeError(hint) from exc - if "gated dataset" in message or "403" in message or "401" in message: - hint = ( - "Dataset protegido requer autenticação. Informe --hf-token , defina HF_TOKEN/HUGGINGFACE_TOKEN" - " ou utilize --hf-token-file apontando para o token salvo pelo huggingface-cli. Também é possível" - " executar 'huggingface-cli login' para gerar ~/.cache/huggingface/token.\nVocê pode obter o token em" - " https://huggingface.co/settings/tokens." - ) - raise RuntimeError(hint) from exc - raise - sentences: List[str] = [] - text_field = args.dataset_text_field - limit = args.dataset_limit - for item in dataset: - text = item.get(text_field) - if isinstance(text, str) and text.strip(): - sentences.append(text.strip()) - if limit is not None and len(sentences) >= limit: - break - if not sentences: - raise ValueError("No sentences extracted from the specified dataset") - return sentences - return sample_sentences() - - -def print_gain_summary( - prompts: List[str], - base_fills: List[List[List[Tuple[str, float]]]], - betina_fills: List[List[List[Tuple[str, float]]]], -) -> None: - print("\nResumo de ganhos top-1:") - for prompt, base_masks, betina_masks in zip(prompts, base_fills, betina_fills): - prompt_head = prompt if len(prompt) <= 60 else f"{prompt[:57]}..." - for idx, (base_group, betina_group) in enumerate(zip(base_masks, betina_masks), start=1): - if not base_group or not betina_group: - continue - base_token, base_prob = base_group[0] - betina_token, betina_prob = betina_group[0] - delta = betina_prob - base_prob - if base_prob > 0: - rel = delta / base_prob * 100.0 - rel_text = f"{rel:+.2f}%" - else: - rel_text = "n/d" - change_desc = "mantido" if betina_token == base_token else f"{base_token} -> {betina_token}" - print( - f" [{prompt_head}] máscara {idx}: {change_desc} | base {base_prob:.4f} -> betina {betina_prob:.4f} ({rel_text})" - ) - - -def _prepare_debug_value(value, max_examples: int): - if isinstance(value, torch.Tensor): - limited = value.detach().cpu() - if limited.dim() >= 1: - limited = limited[:max_examples] - return limited.tolist() - if isinstance(value, dict): - return {key: _prepare_debug_value(val, max_examples) for key, val in value.items()} - if isinstance(value, (list, tuple)): - return [_prepare_debug_value(item, max_examples) for item in value] - if isinstance(value, (float, int, str)) or value is None: - return value - return str(value) - - -def dump_square_debug(flow_states: Dict[str, object], metrics: Dict[str, float], output_path: str, max_examples: int = 1) -> Path: - output = Path(output_path).expanduser() - payload = { - "max_examples": max(1, max_examples), - "metrics": {key: float(value) if isinstance(value, (int, float)) else value for key, value in metrics.items()}, - "flow_states": {key: _prepare_debug_value(val, max_examples) for key, val in flow_states.items()}, - } - output.parent.mkdir(parents=True, exist_ok=True) - output.write_text(json.dumps(payload, indent=2), encoding="utf-8") - return output - - -def main(argv: list[str] | None = None): - parser = argparse.ArgumentParser(description="Treinamento do modelo Betina anti-hallucination") - parser.add_argument("--train", action="store_true", help="Executa treinamento completo") - parser.add_argument("--epochs", type=int, default=5, help="Número de épocas para treinar") - parser.add_argument("--batch-size", type=int, default=4, help="Tamanho do batch") - parser.add_argument("--embed-dim", type=int, default=256, choices=[128, 256], help="Dimensão interna do vórtice") - parser.add_argument("--lambda-vortex", type=float, default=0.1, help="Peso da loss do vórtice") - parser.add_argument("--learning-rate", type=float, default=1e-4, help="Learning rate para o vórtice e projetores") - parser.add_argument("--mlm-learning-rate", type=float, default=5e-5, help="Learning rate para o modelo de linguagem") - parser.add_argument("--freeze-mlm", action="store_true", help="Congela os pesos do modelo de linguagem durante o treino") - parser.add_argument("--freeze-projectors", action="store_true", help="Congela os projetores de embedding/correção (modo inferência)") - parser.add_argument( - "--correction-max-norm", - type=float, - default=None, - help="Clampa o vetor de correção Betina a esta norma L2 (<=0 desativa)", - ) - parser.add_argument("--output-dir", type=str, default="outputs/betina_vortex", help="Diretório para salvar o modelo") - parser.add_argument("--device", type=str, default=None, help="Força execução em cuda ou cpu") - parser.add_argument("--top-k", type=int, default=5, help="Top-k para avaliação de máscara") - parser.add_argument("--skip-eval", action="store_true", help="Pula avaliação pós-treino") - parser.add_argument("--eval-prompts", nargs="*", default=None, help="Prompts personalizados contendo [MASK] para avaliação") - parser.add_argument("--dataset-file", type=str, default=None, help="Arquivo de texto com uma sentença por linha") - parser.add_argument("--dataset-hf", type=str, default=None, help="Nome do dataset Hugging Face, ex.: oscar") - parser.add_argument("--dataset-hf-config", type=str, default=None, help="Config do dataset Hugging Face, ex.: unshuffled_deduplicated_pt") - parser.add_argument("--dataset-split", type=str, default="train[:1000]", help="Split do dataset Hugging Face") - parser.add_argument("--dataset-text-field", type=str, default="text", help="Campo de texto no dataset Hugging Face") - parser.add_argument("--dataset-limit", type=int, default=None, help="Limite de sentenças carregadas") - parser.add_argument("--hf-token", type=str, default=None, help="Token de autenticação da Hugging Face (ou defina HF_TOKEN)") - parser.add_argument( - "--hf-token-file", - type=str, - default=None, - help="Arquivo contendo o token da Hugging Face (padrão: ~/.cache/huggingface/token)", - ) - parser.add_argument("--trust-remote-code", action="store_true", help="Permite datasets com script remoto (requer confiança no autor)") - parser.add_argument("--force-download", action="store_true", help="Força novo download dos pesos do modelo de linguagem") - parser.add_argument("--disable-rotation", action="store_true", help="Desativa a rotação do delta do vórtice") - parser.add_argument("--rotation-angle", type=float, default=math.pi / 4, help="Ângulo (rad) para rotacionar o delta quando ativado") - parser.add_argument("--rotation-threshold", type=float, default=1e-4, help="Norma mínima do delta para aplicar rotação") - parser.add_argument("--rotation-clockwise", action="store_true", help="Força rotação horária (inverte o sinal do ângulo)") - parser.add_argument( - "--enable-quadratic-reflection", - action="store_true", - help="Ativa reflexão quadrática estilo bolinha/bastão no delta do vórtice", - ) - parser.add_argument( - "--quadratic-boundary", - type=float, - default=1.0, - help="Magnitudes acima desse valor são refletidas (parede virtual)", - ) - parser.add_argument( - "--quadratic-strength", - type=float, - default=0.5, - help="Mistura (0-1) entre o delta original e o refletido quadrático", - ) - parser.add_argument( - "--disable-square-geometry", - action="store_true", - help="Desativa o giro de 180° que forma o quadrado X↔X⁻¹", - ) - parser.add_argument( - "--square-rotation-degrees", - type=float, - default=180.0, - help="Ângulo aplicado ao girar a matriz completa (180° gera o quadrado perfeito)", - ) - parser.add_argument( - "--square-leak-ratio", - type=float, - default=0.05, - help="Mistura o quadrado com a matriz original (0 mantém oposição perfeita, 1 ignora o giro)", - ) - parser.add_argument( - "--square-jitter-std-deg", - type=float, - default=0.0, - help="Desvio padrão em graus para injetar ruído aleatório no giro quadrado", - ) - parser.add_argument( - "--square-debug-json", - type=str, - default=None, - help="Se definido, salva um dump JSON com as matrizes primária/secundária e métricas do vórtice", - ) - parser.add_argument( - "--square-debug-max", - type=int, - default=1, - help="Número máximo de exemplos incluídos no dump quadrado", - ) - parser.add_argument( - "--enable-lorentz-transform", - action="store_true", - help="Aplica transformação de Lorentz no delta final para medir o resultado físico", - ) - parser.add_argument( - "--lorentz-beta", - type=float, - default=0.6, - help="Fração da velocidade da luz usada no boost de Lorentz", - ) - parser.add_argument( - "--lorentz-axis-stream", - type=int, - default=0, - help="Stream usado como eixo espacial (0=X, 1=Y, etc.) na transformação de Lorentz", - ) - parser.add_argument( - "--ia-config", - type=str, - default=None, - help="Arquivo JSON descrevendo quais IAs assumem cada estágio/stream do fluxo matriz", - ) - parser.add_argument( - "--refinement-cycles", - type=int, - default=0, - help="Quantidade de ciclos circundantes de refinamento IA aplicados ao delta final", - ) - parser.add_argument( - "--cycle-stage-name", - type=str, - default="cycle", - help="Nome do estágio IA usado durante cada ciclo circundante", - ) - parser.add_argument("--max-seq-length", type=int, default=512, help="Comprimento máximo de tokens por exemplo (padrão BERT)") - parser.add_argument("--precompute-embeddings", action="store_true", help="Codifica todas as sentenças antes do treino (requer muita RAM)") - parser.add_argument("--embedding-batch-size", type=int, default=64, help="Batch interno para geração de embeddings (encode)") - if argv is None: - argv_list = sys.argv[1:] - if "ipykernel" in sys.modules: - filtered: List[str] = [] - skip_next = False - for item in argv_list: - if skip_next: - skip_next = False - continue - if item == "-f": - skip_next = True - continue - if item.startswith("-f="): - continue - filtered.append(item) - argv_list = filtered - else: - argv_list = list(argv) - - parsed, unknown = parser.parse_known_args(argv_list) - if unknown: - print(f"Ignorando argumentos desconhecidos: {unknown}") - args = parsed - - args.hf_token, token_source = resolve_hf_token(args.hf_token, args.hf_token_file) - if args.hf_token and token_source: - print(f"Token Hugging Face detectado via {token_source}.") - - device = torch.device(args.device) if args.device else torch.device("cuda" if torch.cuda.is_available() else "cpu") - print(f"Usando dispositivo: {device}") - - embedding_model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" - tokenizer_name = "neuralmind/bert-base-portuguese-cased" - - embedding_model = SentenceTransformer(embedding_model_name, device=str(device) if device.type == "cuda" else "cpu") - tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) - if tokenizer.pad_token is None: - tokenizer.pad_token = tokenizer.eos_token or tokenizer.sep_token or tokenizer.cls_token - try: - mlm_model = AutoModelForMaskedLM.from_pretrained(tokenizer_name, force_download=args.force_download) - except Exception as exc: - print(f"Download failed: {exc}. Try checking internet or cache.") - raise - - sentences = load_sentences_from_args(args) - dataset = PortugueseSentenceDataset( - sentences, - tokenizer, - embedding_model, - max_seq_length=args.max_seq_length, - precompute_embeddings=args.precompute_embeddings, - embedding_batch_size=args.embedding_batch_size, - ) - collate_fn = build_collate_fn(tokenizer) - dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn) - eval_dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn) - - stream_aliases: Optional[List[str]] = None - ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None - refinement_cycles = args.refinement_cycles - config_cycle_stage = args.cycle_stage_name - if args.ia_config: - ia_config_data = load_ia_config_file(args.ia_config, args.embed_dim) - print(f"Config IA carregada de {args.ia_config}") - stream_aliases = ia_config_data.get("stream_aliases") - ia_stage_config = ia_config_data.get("stage_config") # type: ignore[assignment] - config_cycles = ia_config_data.get("refinement_cycles") - if isinstance(config_cycles, int): - refinement_cycles = config_cycles - config_cycle_stage = str(ia_config_data.get("cycle_stage_name", config_cycle_stage)) - - vortex = VortexBetinaAntiHalluc( - embed_dim=args.embed_dim, - enable_rotation=not args.disable_rotation, - rotation_angle=args.rotation_angle, - rotation_threshold=args.rotation_threshold, - rotation_clockwise=args.rotation_clockwise, - enable_quadratic_reflection=args.enable_quadratic_reflection, - quadratic_boundary=args.quadratic_boundary, - quadratic_strength=args.quadratic_strength, - stream_aliases=stream_aliases, - ia_stage_config=ia_stage_config, - refinement_cycles=refinement_cycles, - cycle_stage_name=config_cycle_stage, - enforce_square_geometry=not args.disable_square_geometry, - square_rotation_degrees=args.square_rotation_degrees, - square_leak_ratio=args.square_leak_ratio, - square_jitter_std_degrees=args.square_jitter_std_deg, - enable_lorentz_transform=args.enable_lorentz_transform, - lorentz_beta=args.lorentz_beta, - lorentz_axis_stream=args.lorentz_axis_stream, - ) - trainer = BetinaTrainer( - vortex=vortex, - tokenizer=tokenizer, - embedding_model=embedding_model, - mlm_model=mlm_model, - raw_embedding_dim=embedding_model.get_sentence_embedding_dimension(), - embed_dim=args.embed_dim, - lambda_vortex=args.lambda_vortex, - learning_rate=args.learning_rate, - mlm_learning_rate=args.mlm_learning_rate, - freeze_mlm=args.freeze_mlm, - freeze_projectors=args.freeze_projectors, - correction_max_norm=args.correction_max_norm, - device=device, - max_seq_length=args.max_seq_length, - ) - - if args.square_debug_json: - square_max = max(1, args.square_debug_max) - try: - sample_batch = next(iter(dataloader)) - except StopIteration as exc: # pragma: no cover - dataset vazio - raise RuntimeError("Não é possível gerar dump quadrado: dataset vazio") from exc - sample_embeddings = sample_batch["embedding"][:square_max].to(device) - with torch.no_grad(): - projected = trainer.embedding_projector(sample_embeddings) - _, _, metrics_debug, _ = trainer.vortex(projected) - dump_square_debug( - trainer.vortex.last_flow_states, - metrics_debug, - args.square_debug_json, - max_examples=min(square_max, sample_embeddings.size(0)), - ) - print(f"Dump quadrado salvo em {args.square_debug_json}") - - if args.train: - print("Iniciando treinamento...") - history = trainer.train(dataloader, epochs=args.epochs) - print(f"Treinamento finalizado com {len(history)} passos") - trainer.save(args.output_dir) - print(f"Modelos salvos em {args.output_dir}") - - if not args.skip_eval: - ppl_base = trainer.evaluate_perplexity(eval_dataloader, apply_correction=False) - ppl_betina = trainer.evaluate_perplexity(eval_dataloader, apply_correction=True) - print(f"\nPerplexity sem correção: {ppl_base:.4f}") - print(f"Perplexity com correção Betina: {ppl_betina:.4f}") - ppl_delta = ppl_base - ppl_betina - if ppl_base > 0: - ppl_rel = ppl_delta / ppl_base * 100.0 - print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: {ppl_rel:+.2f}%") - else: - print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: n/d") - - eval_prompts = args.eval_prompts or [ - "O modelo Betina evita [MASK] durante a geração.", - "A capital de Portugal é [MASK].", - "A IA Betina corrige [MASK] via vórtice.", - "O vórtice no Betina filtra [MASK] para reduzir alucinações.", - ] - print("\nPreenchimento sem correção:") - base_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=False) - for prompt, tokens in zip(eval_prompts, base_fills): - print(prompt) - for idx, group in enumerate(tokens, start=1): - formatted = [f"{token} ({prob:.4f})" for token, prob in group] - print(f" Mascara {idx}: {formatted}") - print("\nPreenchimento com correção Betina:") - betina_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=True) - for prompt, tokens in zip(eval_prompts, betina_fills): - print(prompt) - for idx, group in enumerate(tokens, start=1): - formatted = [f"{token} ({prob:.4f})" for token, prob in group] - print(f" Mascara {idx}: {formatted}") - - print_gain_summary(eval_prompts, base_fills, betina_fills) - - -if __name__ == "__main__": +import argparse +import json +import math +import os +import sys +import numpy as np +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple + +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from contextlib import nullcontext +from torch.nn.utils.rnn import pad_sequence +from torch.utils.data import DataLoader, Dataset + +try: + from torch.amp import autocast as _autocast, GradScaler as _GradScaler + + def betina_autocast(device_type: str, enabled: bool = True): + if not enabled or device_type != "cuda": + return nullcontext() + return _autocast(device_type=device_type, enabled=enabled) + + def betina_grad_scaler(device_type: str, enabled: bool = True): + if not enabled or device_type != "cuda": + return _NoOpGradScaler() + return _GradScaler(device_type=device_type, enabled=enabled) + +except ImportError: # pragma: no cover + from torch.cuda.amp import autocast as _autocast, GradScaler as _GradScaler + + def betina_autocast(device_type: str, enabled: bool = True): + if not enabled or device_type != "cuda": + return nullcontext() + return _autocast(enabled=enabled) + + def betina_grad_scaler(device_type: str, enabled: bool = True): + if not enabled or device_type != "cuda": + return _NoOpGradScaler() + return _GradScaler(enabled=enabled) + +try: + from sentence_transformers import SentenceTransformer +except ImportError as exc: # pragma: no cover + raise ImportError("Install sentence-transformers to run the Betina pipeline") from exc + +try: + from transformers import AutoModelForMaskedLM, AutoTokenizer +except ImportError as exc: # pragma: no cover + raise ImportError("Install transformers to run the Betina pipeline") from exc + +try: + from datasets import load_dataset # type: ignore[import-not-found] +except ImportError: # pragma: no cover + load_dataset = None + + +def _safe_load_dataset( + path: str, + name: Optional[str], + *, + split: str, + hf_token: Optional[str], + trust_remote_code: bool, +): + if load_dataset is None: + raise ImportError("Install the 'datasets' package to use Hugging Face corpora") + base_kwargs = {"split": split, "trust_remote_code": trust_remote_code} + attempts: List[Dict[str, Optional[str]]] = [] + if hf_token: + attempts.append({"token": hf_token}) + attempts.append({"use_auth_token": hf_token}) + attempts.append({}) + last_error: Optional[Exception] = None + for extra in attempts: + try: + return load_dataset(path, name, **base_kwargs, **extra) + except TypeError as err: + last_error = err + continue + except ValueError as err: + if "use_auth_token" in str(err).lower(): + last_error = err + continue + raise + if last_error: + raise last_error + raise RuntimeError(f"Falha ao carregar dataset {path}/{name}") + + +def _read_hf_token_file(path: Path) -> Optional[str]: + try: + content = path.read_text(encoding="utf-8").strip() + except OSError: + return None + if not content: + return None + first_line = content.splitlines()[0].strip() + return first_line or None + + +def resolve_hf_token(explicit_token: Optional[str], token_file: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + """Resolve o token HF preferindo argumento, env vars e arquivo do huggingface-cli.""" + if explicit_token and explicit_token.strip(): + return explicit_token.strip(), "--hf-token" + env_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") + if env_token and env_token.strip(): + return env_token.strip(), "env" + file_candidates: List[Path] = [] + if token_file: + file_candidates.append(Path(token_file).expanduser()) + else: + hf_home = os.getenv("HF_HOME") + if hf_home: + file_candidates.append(Path(hf_home).expanduser() / "token") + file_candidates.append(Path.home() / ".cache" / "huggingface" / "token") + file_candidates.append(Path.home() / ".huggingface" / "token") + for candidate in file_candidates: + token = _read_hf_token_file(candidate) + if token: + return token, str(candidate) + return None, None + + +class _NoOpGradScaler: + def __init__(self): + pass + + def scale(self, loss): + return loss + + def step(self, optimizer): + optimizer.step() + + def update(self): + pass + + def unscale_(self, optimizer): + pass + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + +class CallableAgentAdapter(nn.Module): + def __init__(self, fn: Callable[[torch.Tensor], torch.Tensor], name: str): + super().__init__() + self.fn = fn + self.agent_name = name or getattr(fn, "__name__", "callable_agent") + + def forward(self, tensor: torch.Tensor) -> torch.Tensor: # noqa: D401 + return self.fn(tensor) + + +class MultiIntelligenceRouter(nn.Module): + """Despacha cada estágio do fluxo matriz para IAs distintas por stream/etapa.""" + + def __init__( + self, + num_streams: int, + *, + stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, + stream_aliases: Optional[List[str]] = None, + ): + super().__init__() + self.num_streams = num_streams + self.stream_aliases = stream_aliases or [f"S{idx}" for idx in range(num_streams)] + self.stage_modules = nn.ModuleDict() + self.stage_logs: Dict[str, List[Dict[str, str]]] = {} + if stage_config: + self.apply_stage_config(stage_config) + + def apply_stage_config(self, stage_config: Dict[str, Dict[str, nn.Module]]) -> None: + for stage_name, mapping in stage_config.items(): + module_dict = nn.ModuleDict() + for key, module in mapping.items(): + module_dict[str(key)] = self._wrap_module(stage_name, key, module) + self.stage_modules[stage_name] = module_dict + + def register_stage(self, stage_name: str, mapping: Dict[str, nn.Module]) -> None: + module_dict = nn.ModuleDict() + for key, module in mapping.items(): + module_dict[str(key)] = self._wrap_module(stage_name, key, module) + self.stage_modules[stage_name] = module_dict + + def _wrap_module(self, stage: str, key: str | int, module: nn.Module | Callable) -> nn.Module: + if isinstance(module, list): + wrapped = [self._wrap_module(stage, f"{key}_{idx}", item) for idx, item in enumerate(module)] + seq = nn.Sequential(*wrapped) + if not hasattr(seq, "agent_name"): + seq.agent_name = f"seq_{stage}_{key}" + return seq + if isinstance(module, nn.Module): + if not hasattr(module, "agent_name"): + module.agent_name = module.__class__.__name__ + return module + if callable(module): + name = getattr(module, "agent_name", None) or getattr(module, "__name__", f"{stage}_{key}_fn") + return CallableAgentAdapter(module, name) + raise TypeError(f"Módulo IA inválido para estágio {stage}/{key}: {type(module)}") + + def _select_module(self, stage_dict: nn.ModuleDict, key: str | int | None) -> Optional[nn.Module]: + if key is not None: + candidate_key = str(key) + if candidate_key in stage_dict: + return stage_dict[candidate_key] + for fallback in ("*", "default", "-1"): + if fallback in stage_dict: + return stage_dict[fallback] + return None + + def apply(self, stage: str, tensor: torch.Tensor, *, cycle_idx: Optional[int] = None) -> torch.Tensor: + stage_dict = self.stage_modules[stage] if stage in self.stage_modules else None + log: List[Dict[str, str]] = [] + if stage_dict is None: + self.stage_logs[stage] = log + return tensor + if tensor.dim() == 3: + outputs = [] + for stream_idx in range(tensor.size(1)): + module = self._select_module(stage_dict, stream_idx) + if module is None and stream_idx < len(self.stream_aliases): + module = self._select_module(stage_dict, self.stream_aliases[stream_idx]) + chunk = tensor[:, stream_idx, :] + if module is not None: + chunk = module(chunk) + log.append( + { + "stream": self.stream_aliases[stream_idx] if stream_idx < len(self.stream_aliases) else str(stream_idx), + "agent": getattr(module, "agent_name", module.__class__.__name__), + } + ) + outputs.append(chunk) + stacked = torch.stack(outputs, dim=1) + self.stage_logs[stage] = log + return stacked + module = None + if cycle_idx is not None: + module = self._select_module(stage_dict, cycle_idx) + if module is None: + module = self._select_module(stage_dict, "global") + if module is None: + module = self._select_module(stage_dict, None) + if module is None: + self.stage_logs[stage] = log + return tensor + updated = module(tensor) + alias = f"cycle_{cycle_idx}" if cycle_idx is not None else "global" + log.append({"stream": alias, "agent": getattr(module, "agent_name", module.__class__.__name__)}) + self.stage_logs[stage] = log + return updated + + def stage_signature(self, stage: str) -> str: + logs = self.stage_logs.get(stage, []) + if not logs: + return "identity" + return " | ".join(f"{entry['stream']}→{entry['agent']}" for entry in logs) + + def describe_all_stages(self) -> Dict[str, List[Dict[str, str]]]: + return {stage: list(entries) for stage, entries in self.stage_logs.items()} + + +class SyntheticNeuronTriangle(nn.Module): + """Refina deltas considerando uma base triangular (X,Y,contrabase).""" + + def __init__( + self, + embed_dim: int, + num_streams: int, + *, + hidden_dim: int = 512, + max_iters: int = 5, + tol: float = 1e-4, + delta_gain: float = 1.0, + ) -> None: + super().__init__() + self.embed_dim = embed_dim + self.num_streams = num_streams + self.max_iters = max(0, max_iters) + self.tol = max(1e-6, tol) + self.delta_gain = float(delta_gain) + seed_in = embed_dim * 4 # X, Y, contrabase média, diagonal + refine_in = embed_dim * 3 # delta médio, eixo integrado, diagonal + self.seed_proj = nn.Sequential( + nn.LayerNorm(seed_in), + nn.Linear(seed_in, hidden_dim), + nn.GELU(), + nn.Linear(hidden_dim, embed_dim), + ) + self.refine_proj = nn.Sequential( + nn.LayerNorm(refine_in), + nn.Linear(refine_in, hidden_dim), + nn.SiLU(), + nn.Linear(hidden_dim, embed_dim), + ) + + def forward( + self, + mat_primary: torch.Tensor, + mat_secondary: torch.Tensor, + base_core: torch.Tensor, + ) -> Tuple[torch.Tensor, Dict[str, object]]: + batch, streams, dim = mat_primary.shape + x_stream = mat_primary[:, 0, :] + y_stream = mat_primary[:, 1, :] if streams > 1 else mat_secondary[:, 0, :] + contra_stream = mat_secondary[:, 0, :] + contra_mean = mat_secondary.mean(dim=1) + base_center = torch.stack([x_stream, y_stream], dim=1).mean(dim=1) + diag = torch.stack([x_stream - contra_stream, y_stream - contra_mean], dim=1).mean(dim=1) + seed_features = torch.cat([base_center, contra_mean, base_core.squeeze(1), diag], dim=-1) + axis_vector = torch.tanh(self.seed_proj(seed_features)) + axis_norm = F.normalize(axis_vector, dim=-1) + base_delta = mat_primary - base_core + stream_align = torch.sum( + F.normalize(mat_primary, dim=-1) * axis_norm.unsqueeze(1), + dim=-1, + keepdim=True, + ) + axis_component = axis_norm.unsqueeze(1) * stream_align + delta = base_delta + self.delta_gain * axis_component + iterations = 0 + last_change = torch.zeros(1, device=mat_primary.device) + if self.max_iters > 0: + for idx in range(self.max_iters): + delta_mean = delta.mean(dim=1) + refine_inp = torch.cat([delta_mean, axis_vector, diag], dim=-1) + refine = torch.tanh(self.refine_proj(refine_inp)).unsqueeze(1) + delta = delta + refine + last_change = refine.norm(dim=-1).mean() + iterations = idx + 1 + if last_change.item() < self.tol: + break + debug = { + "axis": axis_vector.detach(), + "diag": diag.detach(), + "iterations": iterations, + "residual": float(last_change.detach().item()), + } + return delta, debug + + +def build_builtin_agent(name: str, embed_dim: int) -> nn.Module: + normalized = name.strip().lower() + if normalized in {"brock", "brockman", "brock ia"}: + module = nn.Sequential( + nn.LayerNorm(embed_dim), + nn.Linear(embed_dim, embed_dim * 2), + nn.GELU(), + nn.Linear(embed_dim * 2, embed_dim), + ) + elif normalized in {"chatgpt", "chatgpt 5.1", "chatgpt5.1", "gpt51"}: + module = nn.Sequential( + nn.LayerNorm(embed_dim), + nn.Linear(embed_dim, embed_dim), + nn.SiLU(), + nn.Linear(embed_dim, embed_dim), + ) + elif normalized in {"code", "code ia", "coder"}: + module = nn.Sequential( + nn.LayerNorm(embed_dim), + nn.Linear(embed_dim, embed_dim), + ) + elif normalized in {"critic", "mirror", "refiner"}: + module = nn.Sequential( + nn.LayerNorm(embed_dim), + nn.Linear(embed_dim, embed_dim * 3 // 2), + nn.GELU(), + nn.Linear(embed_dim * 3 // 2, embed_dim), + nn.LayerNorm(embed_dim), + ) + elif normalized in {"identity", "none"}: + module = nn.Identity() + else: + raise ValueError(f"Agente IA desconhecido: {name}") + module.agent_name = name + return module + + +def _build_custom_agent(agent_def: Dict[str, object], embed_dim: int) -> nn.Module: + if "style" in agent_def: + module = build_builtin_agent(str(agent_def["style"]), embed_dim) + module.agent_name = str(agent_def.get("name", agent_def["style"])) + return module + agent_type = str(agent_def.get("type", "mlp")).lower() + hidden = int(agent_def.get("hidden", embed_dim * 2)) + dropout = float(agent_def.get("dropout", 0.0)) + if agent_type == "mlp": + layers: List[nn.Module] = [ + nn.LayerNorm(embed_dim), + nn.Linear(embed_dim, hidden), + nn.GELU(), + ] + if dropout > 0: + layers.append(nn.Dropout(dropout)) + layers.append(nn.Linear(hidden, embed_dim)) + module = nn.Sequential(*layers) + elif agent_type == "linear": + module = nn.Sequential(nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim)) + else: + raise ValueError(f"Tipo de agente custom '{agent_type}' não suportado") + module.agent_name = str(agent_def.get("name", agent_type)) + return module + + +def load_ia_config_file(path: str, embed_dim: int) -> Dict[str, object]: + data = json.loads(Path(path).read_text(encoding="utf-8")) + stage_entries = data.get("stages", {}) + if not isinstance(stage_entries, dict): + raise ValueError("Campo 'stages' do arquivo IA precisa ser um objeto mapeando estágio→streams") + stage_config: Dict[str, Dict[str, nn.Module]] = {} + for stage, mapping in stage_entries.items(): + if not isinstance(mapping, dict): + raise ValueError(f"Estágio '{stage}' precisa mapear streams para agentes") + stage_config[stage] = {} + for stream_key, agent_def in mapping.items(): + if isinstance(agent_def, str): + module = build_builtin_agent(agent_def, embed_dim) + elif isinstance(agent_def, dict): + module = _build_custom_agent(agent_def, embed_dim) + else: + raise ValueError(f"Agente inválido para estágio '{stage}' stream '{stream_key}'") + stage_config[stage][str(stream_key)] = module + stream_aliases = data.get("stream_aliases") + if stream_aliases is not None and (not isinstance(stream_aliases, list) or not all(isinstance(x, str) for x in stream_aliases)): + raise ValueError("'stream_aliases' deve ser uma lista de strings") + return { + "stream_aliases": stream_aliases, + "stage_config": stage_config, + "refinement_cycles": int(data.get("refinement_cycles", 0)), + "cycle_stage_name": str(data.get("cycle_stage_name", "cycle")), + } + + +# -------------------------- +# Lorenz attractor generator (discrete) +# -------------------------- +def lorenz_step(x, y, z, sigma=10.0, rho=28.0, beta=8/3, dt=0.01): + dx = sigma * (y - x) + dy = x * (rho - z) - y + dz = x * y - beta * z + xn = x + dx * dt + yn = y + dy * dt + zn = z + dz * dt + return xn, yn, zn + +def lorenz_sequence(length, init=(0.1, 0.0, 0.0), sigma=10.0, rho=28.0, beta=8/3, dt=0.01): + x, y, z = init + seq = [] + for _ in range(length): + x, y, z = lorenz_step(x, y, z, sigma, rho, beta, dt) + seq.append((x, y, z)) + return np.array(seq) # shape (length, 3) + + +# -------------------------- +# Rössler attractor (alternative chaos source) +# -------------------------- +def rossler_step(x, y, z, a=0.2, b=0.2, c=5.7, dt=0.01): + dx = -y - z + dy = x + a * y + dz = b + z * (x - c) + xn = x + dx * dt + yn = y + dy * dt + zn = z + dz * dt + return xn, yn, zn + +def rossler_sequence(length, init=(0.1, 0.0, 0.0), a=0.2, b=0.2, c=5.7, dt=0.01): + x, y, z = init + seq = [] + for _ in range(length): + x, y, z = rossler_step(x, y, z, a, b, c, dt) + seq.append((x, y, z)) + return np.array(seq) + + +# -------------------------- +# Terminal Velocity Matching (Flow Matching inspired) +# -------------------------- +def compute_terminal_velocity(embeddings: torch.Tensor, target_distribution: str = "gaussian") -> torch.Tensor: + """ + Calcula a velocidade terminal para mover embeddings em direção a uma distribuição alvo. + Inspirado em Flow Matching / Rectified Flow. + """ + batch_size, dim = embeddings.shape + if target_distribution == "gaussian": + # Alvo: Gaussiana isotrópica padrão + target = torch.randn_like(embeddings) * 0.1 + elif target_distribution == "uniform_sphere": + # Alvo: superfície de esfera unitária + target = F.normalize(torch.randn_like(embeddings), dim=-1) + else: + target = torch.zeros_like(embeddings) + + # Velocidade = direção do alvo - posição atual (normalizada) + velocity = target - embeddings + velocity = F.normalize(velocity, dim=-1) * embeddings.norm(dim=-1, keepdim=True) * 0.1 + return velocity + + +# -------------------------- +# Spectral Energy Regularization +# -------------------------- +def spectral_energy_loss(embeddings: torch.Tensor, target_rank: int = 32) -> torch.Tensor: + """ + Penaliza energia concentrada em poucos componentes principais. + Força distribuição mais uniforme do espectro singular. + """ + if embeddings.shape[0] < 2: + return torch.tensor(0.0, device=embeddings.device) + + centered = embeddings - embeddings.mean(dim=0, keepdim=True) + # SVD para obter valores singulares + try: + _, s, _ = torch.linalg.svd(centered, full_matrices=False) + except RuntimeError: + return torch.tensor(0.0, device=embeddings.device) + + # Normaliza para distribuição de energia + s_normalized = s / (s.sum() + 1e-8) + + # Entropia do espectro (queremos maximizar = distribuição uniforme) + spectral_entropy = -(s_normalized * (s_normalized + 1e-8).log()).sum() + + # Penalidade: quanto menor a entropia, maior a penalidade + max_entropy = math.log(min(embeddings.shape[0], embeddings.shape[1])) + return (max_entropy - spectral_entropy) / max_entropy + + +# -------------------------- +# Semantic Divergence Metrics +# -------------------------- +def compute_angular_divergence(original: torch.Tensor, perturbed: torch.Tensor) -> float: + """Calcula a divergência angular média entre vetores originais e perturbados.""" + cos_sim = F.cosine_similarity(original, perturbed, dim=-1) + # Clamp para evitar problemas numéricos com acos + cos_sim = cos_sim.clamp(-1.0, 1.0) + angles = torch.acos(cos_sim) + return angles.mean().item() + + +def compute_semantic_entropy(logits: torch.Tensor, top_k: int = 10) -> float: + """Calcula a entropia semântica da distribuição de probabilidade.""" + probs = F.softmax(logits, dim=-1) + top_probs, _ = torch.topk(probs, min(top_k, probs.shape[-1]), dim=-1) + # Renormaliza top-k + top_probs = top_probs / (top_probs.sum(dim=-1, keepdim=True) + 1e-8) + entropy = -(top_probs * (top_probs + 1e-8).log()).sum(dim=-1) + return entropy.mean().item() + + +# -------------------------- +# SIGReg-like regularizer (encourage isotropic Gaussian embeddings) +# -------------------------- +def sigreg_loss(embeddings): + # embeddings: (B, D) + mu = embeddings.mean(dim=0) # (D,) + centered = embeddings - mu.unsqueeze(0) # (B, D) + # empirical covariance (D x D) approx via (centered^T center) + B = embeddings.shape[0] + cov = (centered.t() @ centered) / (B - 1 + 1e-8) # (D, D) + # penalty = norm(cov - I) + norm(mu) + D = embeddings.shape[1] + eye = torch.eye(D, device=embeddings.device) + cov_pen = F.mse_loss(cov, eye) + mu_pen = (mu.pow(2)).mean() + return cov_pen + mu_pen + + +class VortexBetinaAntiHalluc(nn.Module): + def __init__( + self, + embed_dim: int = 256, + vortex_steps: int = 10, + vortex_dt: float = 0.02, + num_streams: int = 3, + enable_rotation: bool = True, + rotation_angle: float = math.pi / 4, + rotation_threshold: float = 1e-4, + rotation_clockwise: bool = False, + enable_quadratic_reflection: bool = False, + quadratic_boundary: float = 0.3, + quadratic_strength: float = 0.5, + boost_small_deltas: bool = True, + delta_gain: float = 1.5, + reflection_push: float = 0.25, + stream_aliases: Optional[List[str]] = None, + ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None, + refinement_cycles: int = 0, + cycle_stage_name: str = "cycle", + enforce_square_geometry: bool = True, + square_rotation_degrees: float = 180.0, + square_leak_ratio: float = 0.05, + square_jitter_std_degrees: float = 0.0, + enable_lorentz_transform: bool = False, + lorentz_beta: float = 0.6, + lorentz_axis_stream: int = 0, + enable_triangle: bool = True, + triangle_hidden_dim: int = 512, + triangle_max_iters: int = 5, + triangle_tol: float = 1e-4, + triangle_delta_gain: float = 1.0, + ): + super().__init__() + self.embed_dim = embed_dim + self.vortex_steps = vortex_steps + self.vortex_dt = vortex_dt + self.num_streams = max(1, num_streams) + self.enable_rotation = enable_rotation + self.rotation_angle = rotation_angle + self.rotation_threshold = rotation_threshold + self.rotation_clockwise = rotation_clockwise + self.enable_quadratic_reflection = enable_quadratic_reflection + self.quadratic_boundary = nn.Parameter(torch.tensor(float(quadratic_boundary))) + self.quadratic_strength = float(min(1.0, max(0.0, quadratic_strength))) + self.boost_small_deltas = boost_small_deltas + self.delta_gain = max(1.0, delta_gain) + self.reflection_push = max(0.0, reflection_push) + self.stream_aliases = self._prepare_stream_aliases(stream_aliases) + self.refinement_cycles = max(0, refinement_cycles) + self.cycle_stage_name = cycle_stage_name or "cycle" + self.enforce_square_geometry = enforce_square_geometry + self.square_rotation_radians = math.radians(square_rotation_degrees) + self.square_leak_ratio = float(min(1.0, max(0.0, square_leak_ratio))) + self.square_jitter_std = math.radians(max(0.0, square_jitter_std_degrees)) + beta = max(-0.999, min(0.999, lorentz_beta)) + self.enable_lorentz_transform = enable_lorentz_transform + self.lorentz_beta = beta + self.lorentz_axis_stream = max(0, lorentz_axis_stream) + self.enable_triangle = enable_triangle + self.ia_router = MultiIntelligenceRouter( + num_streams=self.num_streams, + stage_config=ia_stage_config, + stream_aliases=self.stream_aliases, + ) + self.triangle_module = ( + SyntheticNeuronTriangle( + embed_dim, + self.num_streams, + hidden_dim=triangle_hidden_dim, + max_iters=triangle_max_iters, + tol=triangle_tol, + delta_gain=triangle_delta_gain, + ) + if self.enable_triangle + else None + ) + self.projector = nn.Sequential( + nn.LayerNorm(embed_dim), + nn.Linear(embed_dim, embed_dim), + ) + nn.init.kaiming_normal_(self.projector[1].weight, mode="fan_out", nonlinearity="relu") + nn.init.constant_(self.projector[1].bias, 0.1) + + # Vortex Dynamics Parameters (Chaos Injection) + self.vortex_linear = nn.Parameter(torch.randn(3, embed_dim) * 0.1) + self.vortex_scale = nn.Parameter(torch.ones(embed_dim)) + + # Terminal Velocity Matching parameters + self.velocity_gate = nn.Parameter(torch.zeros(embed_dim)) + self.velocity_bias = nn.Parameter(torch.zeros(embed_dim)) + + # Adaptive chaos parameters + self.chaos_temperature = nn.Parameter(torch.tensor(1.0)) + self.chaos_gate = nn.Parameter(torch.tensor(0.0)) + self.attractor_selector = nn.Parameter(torch.tensor([0.7, 0.3])) # [lorenz, rossler] + + self.last_flow_states: Dict[str, object] = {} + + def _prepare_stream_aliases(self, provided: Optional[List[str]]) -> List[str]: + if provided: + aliases = list(provided) + else: + aliases = ["X", "Y", "Z", "W", "V", "U", "T", "S"] + if len(aliases) < self.num_streams: + aliases.extend(f"S{idx}" for idx in range(len(aliases), self.num_streams)) + return aliases[: self.num_streams] + + def _rotate_matrix_plane(self, tensor: torch.Tensor, radians: float) -> torch.Tensor: + if tensor.shape[-1] < 2: + return tensor + angle = radians % (2 * math.pi) + if abs(angle) < 1e-9: + return tensor + rotated = tensor.clone() + cos_theta = math.cos(angle) + sin_theta = math.sin(angle) + x = tensor[..., 0] + y = tensor[..., 1] + rotated[..., 0] = cos_theta * x - sin_theta * y + rotated[..., 1] = sin_theta * x + cos_theta * y + return rotated + + def _rotate_matrix_square(self, tensor: torch.Tensor) -> Tuple[torch.Tensor, float]: + base_angle = abs(self.square_rotation_radians) % (2 * math.pi) + if base_angle > math.pi: + base_angle = 2 * math.pi - base_angle + jitter = 0.0 + if self.square_jitter_std > 0: + jitter = torch.randn(1, device=tensor.device).item() * self.square_jitter_std + angle = max(0.0, min(math.pi, base_angle + jitter)) + if math.isclose(angle, 0.0, rel_tol=1e-6, abs_tol=1e-6): + mirrored = tensor.clone() + else: + intensity = angle / math.pi + mirrored = torch.lerp(tensor, -tensor, intensity) + if self.square_leak_ratio > 0: + mirrored = torch.lerp(mirrored, tensor, self.square_leak_ratio) + return mirrored, angle + + def _select_axis_stream(self, tensor: torch.Tensor) -> torch.Tensor: + stream_idx = min(self.lorentz_axis_stream, tensor.size(1) - 1) + return tensor[:, stream_idx, :] + + def _lorentz_boost( + self, + spatial: torch.Tensor, + reference_stream: torch.Tensor, + time_like: torch.Tensor, + ) -> Tuple[torch.Tensor, float, torch.Tensor]: + beta = self.lorentz_beta + gamma = 1.0 / math.sqrt(max(1e-8, 1.0 - beta**2)) + axis = F.normalize(reference_stream, dim=-1, eps=1e-6) + parallel_scalar = torch.sum(spatial * axis, dim=-1, keepdim=True) + parallel_vec = parallel_scalar * axis + perpendicular_vec = spatial - parallel_vec + t_prime = gamma * (time_like - beta * parallel_scalar) + parallel_prime = gamma * (parallel_scalar - beta * time_like) * axis + updated_spatial = parallel_prime + perpendicular_vec + return updated_spatial, gamma, t_prime.abs() + + def invert(self, vec: torch.Tensor) -> torch.Tensor: + return -vec + + def intersection_knowledge(self, vec: torch.Tensor, base_k: torch.Tensor) -> torch.Tensor: + dot = torch.sum(vec * base_k, dim=-1, keepdim=True) + norm_k = torch.norm(base_k, dim=-1, keepdim=True).pow(2).clamp_min(1e-8) + return (dot / norm_k) * base_k + + def euler_vortex(self, state: torch.Tensor) -> torch.Tensor: + sigma, rho, beta, gamma = 10.0, 28.0, 8.0 / 3.0, 1.0 + feature_dim = state.shape[-1] - 1 + base = state[..., :-1] + w = state[..., -1:] + updated_base = torch.zeros_like(base) + for i in range(0, feature_dim, 3): + chunk = base[..., i : i + 3] + if chunk.shape[-1] < 3: + chunk = F.pad(chunk, (0, 3 - chunk.shape[-1])) + chunk = chunk.clone() + for _ in range(self.vortex_steps): + x = chunk[..., 0] + y = chunk[..., 1] + z = chunk[..., 2] + dx = sigma * (y - x) * self.vortex_dt + dy = (x * (rho - z) - y) * self.vortex_dt + dz = (x * y - beta * z) * self.vortex_dt + chunk = chunk + torch.stack([dx, dy, dz], dim=-1) + span = min(3, feature_dim - i) + updated_base[..., i : i + span] = chunk[..., :span] + energy = torch.norm(base, dim=-1, keepdim=True).pow(2) + w_iter = w.clone() + for _ in range(self.vortex_steps): + dw = (gamma * energy - w_iter) * self.vortex_dt + w_iter = w_iter + dw + updated = state.clone() + updated[..., :-1] = updated_base + updated[..., -1:] = w_iter + return updated + + def rotate_difference(self, delta: torch.Tensor, anchor: torch.Tensor) -> torch.Tensor: # noqa: ARG002 + # Rotation now confined to the (x, y) plane while keeping z (and higher dims) untouched. + if delta.shape[-1] < 2: + return delta + angle = -abs(self.rotation_angle) if self.rotation_clockwise else abs(self.rotation_angle) + cos_theta = math.cos(angle) + sin_theta = math.sin(angle) + rotated = delta.clone() + x = delta[..., 0] + y = delta[..., 1] + rotated[..., 0] = cos_theta * x - sin_theta * y + rotated[..., 1] = sin_theta * x + cos_theta * y + if delta.shape[-1] >= 3: + rotated[..., 2] = delta[..., 2] # keep z static per vortex requirement + return rotated + + def _get_quadratic_boundary(self) -> torch.Tensor: + return self.quadratic_boundary.abs().clamp(0.05, 0.5) + + def quadratic_reflection(self, delta: torch.Tensor) -> Tuple[torch.Tensor, float]: + """Reflect components between ±boundary and warp them quadratically (Pong-like bounce).""" + if not self.enable_quadratic_reflection: + return delta, 0.0 + boundary = self._get_quadratic_boundary() + period = 2.0 * boundary + magnitude = delta.abs() + direction = torch.sign(delta) + wrapped = torch.remainder(magnitude, period) + mirrored = torch.where(wrapped > boundary, period - wrapped, wrapped) + normalized = (mirrored / boundary).clamp(0.0, 1.0) + squared = normalized.pow(2) + bounced = direction * boundary * squared + blended = torch.lerp(delta, bounced, self.quadratic_strength) + bounce_ratio = (magnitude > boundary).float().mean().item() + return blended, bounce_ratio + + def forward(self, input_vec: torch.Tensor, chaos_factor: float = 1.0): + """ + chaos_factor: Multiplicador de agressividade (1.0 = normal, 5.0 = agressivo, 10.0 = insano) + """ + if input_vec.dim() == 2: + batch = input_vec.size(0) + mat_input = input_vec.unsqueeze(1).expand(batch, self.num_streams, self.embed_dim) + elif input_vec.dim() == 3: + batch, streams, dim = input_vec.shape + if streams != self.num_streams or dim != self.embed_dim: + raise ValueError( + f"Esperava tensor (batch,{self.num_streams},{self.embed_dim}), recebi {input_vec.shape}" + ) + mat_input = input_vec + else: + raise ValueError("input_vec precisa ter 2 ou 3 dimensões") + + mat_flat = mat_input.reshape(-1, self.embed_dim) + mat_primary = self.projector(mat_flat).reshape(-1, self.num_streams, self.embed_dim) + stage_signatures: Dict[str, str] = {} + mat_primary = self.ia_router.apply("base", mat_primary) + stage_signatures["base"] = self.ia_router.stage_signature("base") + mirrored_primary: Optional[torch.Tensor] = None + square_tensor: Optional[torch.Tensor] = None + square_angle_applied: Optional[float] = None + if self.enforce_square_geometry: + mirrored_primary, square_angle_applied = self._rotate_matrix_square(mat_primary) + square_tensor = torch.stack([mat_primary, mirrored_primary], dim=2) + mat_secondary = mirrored_primary + else: + mat_secondary = self.invert(mat_primary) + mat_secondary = self.ia_router.apply("inversion", mat_secondary) + stage_signatures["inversion"] = self.ia_router.stage_signature("inversion") + base_k = mat_primary.mean(dim=1, keepdim=True) + 1e-4 * torch.randn_like(mat_primary[:, :1, :]) + inter_primary = self.intersection_knowledge(mat_primary, base_k) + inter_secondary = self.intersection_knowledge(mat_secondary, base_k) + approx_inter_full = 0.5 * (inter_primary + inter_secondary) + approx_inter = approx_inter_full.mean(dim=1) + flow_states: Dict[str, object] = {} + flow_states["matrix_primary"] = mat_primary + flow_states["matrix_secondary"] = mat_secondary + flow_states["base_core"] = base_k.squeeze(1) + if mirrored_primary is not None: + flow_states["matrix_rot_180"] = mirrored_primary + if square_tensor is not None: + flow_states["matrix_square"] = square_tensor + flow_states["square_leak_ratio"] = self.square_leak_ratio + if square_angle_applied is not None: + flow_states["square_angle"] = square_angle_applied + + triangle_debug: Dict[str, object] = {} + if self.triangle_module is not None: + delta_green, triangle_debug = self.triangle_module(mat_primary, mat_secondary, base_k) + else: + delta_green = inter_primary - base_k + flow_states["xy_bridge_matrix"] = delta_green + if triangle_debug: + flow_states["triangle_axis"] = triangle_debug.get("axis") + flow_states["triangle_diag"] = triangle_debug.get("diag") + flow_states["triangle_iterations"] = triangle_debug.get("iterations", 0) + flow_states["triangle_residual"] = triangle_debug.get("residual", 0.0) + + delta_norm_stream = torch.norm(delta_green, dim=-1, keepdim=True) + if self.boost_small_deltas: + boost_mask = delta_norm_stream < self.rotation_threshold + if boost_mask.any(): + safe_norm = delta_norm_stream.clamp_min(1e-6) + factor = (self.rotation_threshold / safe_norm) * self.delta_gain + delta_green = torch.where(boost_mask, delta_green * factor, delta_green) + boost_ratio = boost_mask.float().mean().item() + else: + boost_ratio = 0.0 + flow_states["matrix_green_boost"] = delta_green + + if self.enable_rotation: + delta_flat = delta_green.reshape(-1, self.embed_dim) + base_flat = base_k.expand(-1, self.num_streams, -1).reshape(-1, self.embed_dim) + delta_norm = torch.norm(delta_flat, dim=-1, keepdim=True) + rotation_mask = delta_norm > self.rotation_threshold + if rotation_mask.any(): + rotated = self.rotate_difference(delta_flat, base_flat) + delta_flat = torch.where(rotation_mask, rotated, delta_flat) + delta_green = delta_flat.view(-1, self.num_streams, self.embed_dim) + rotation_ratio = rotation_mask.float().mean().item() + else: + rotation_ratio = 0.0 + flow_states["matrix_green_rot"] = delta_green + + boundary_val = self._get_quadratic_boundary() + flow_states["quadratic_boundary"] = boundary_val.detach().item() + if self.enable_quadratic_reflection and self.reflection_push > 0: + norm_after_rot = torch.norm(delta_green, dim=-1, keepdim=True) + push_mask = norm_after_rot < boundary_val + if push_mask.any(): + push_factor = 1.0 + self.reflection_push + delta_green = torch.where(push_mask, delta_green * push_factor, delta_green) + pre_reflect_push_ratio = push_mask.float().mean().item() + else: + pre_reflect_push_ratio = 0.0 + + if self.enable_quadratic_reflection: + delta_green, reflection_ratio = self.quadratic_reflection(delta_green) + else: + reflection_ratio = 0.0 + flow_states["matrix_green_reflect"] = delta_green + + mirror_reference = self.intersection_knowledge(mat_secondary, base_k) + matrix_black = 0.5 * (2 * base_k - delta_green + mirror_reference) + matrix_black = self.ia_router.apply("mirror", matrix_black) + stage_signatures["mirror"] = self.ia_router.stage_signature("mirror") + if self.enforce_square_geometry: + x_stream = mat_primary[:, 0, :] + x_output = mat_secondary[:, 0, :] + matrix_black[:, 0, :] = x_output + flow_states["square_input"] = x_stream + flow_states["square_output"] = x_output + flow_states["matrix_black_square"] = matrix_black + + flow_states["matrix_black"] = matrix_black + + delta_inter = matrix_black.mean(dim=1) + cycle_signatures: List[str] = [] + if self.refinement_cycles > 0: + refined_delta = delta_inter + for cycle_idx in range(self.refinement_cycles): + refined_delta = self.ia_router.apply( + self.cycle_stage_name, + refined_delta, + cycle_idx=cycle_idx, + ) + cycle_signatures.append(self.ia_router.stage_signature(self.cycle_stage_name)) + delta_inter = refined_delta + stage_signatures[self.cycle_stage_name] = cycle_signatures[-1] if cycle_signatures else "identity" + flow_states["delta_pre_lorentz"] = delta_inter + flow_states["ia_cycle_signatures"] = cycle_signatures + + w = torch.norm(delta_inter, dim=-1, keepdim=True) + lorentz_gamma = 1.0 + if self.enable_lorentz_transform: + reference_stream = self._select_axis_stream(mat_primary) + delta_inter, lorentz_gamma, w = self._lorentz_boost(delta_inter, reference_stream, w) + flow_states["lorentz_reference"] = reference_stream + flow_states["delta_lorentz"] = delta_inter + flow_states["lorentz_gamma"] = lorentz_gamma + flow_states["lorentz_time"] = w + + delta_before_chaos = delta_inter + + # APLICAÇÃO DO FATOR CAOS (OVERDRIVE) - VORTEX DYNAMICS V2 + if chaos_factor != 1.0: + batch_size = delta_inter.shape[0] + + # 1. Generate chaos sequences from both attractors + lor_seq_np = lorenz_sequence(batch_size, init=(0.1, 0.0, 0.0)) + ros_seq_np = rossler_sequence(batch_size, init=(0.1, 0.0, 0.0)) + + # Normalize sequences + lor_seq_np = (lor_seq_np - lor_seq_np.mean(axis=0)) / (lor_seq_np.std(axis=0) + 1e-8) + ros_seq_np = (ros_seq_np - ros_seq_np.mean(axis=0)) / (ros_seq_np.std(axis=0) + 1e-8) + + lor_tensor = torch.tensor(lor_seq_np, dtype=delta_inter.dtype, device=delta_inter.device) + ros_tensor = torch.tensor(ros_seq_np, dtype=delta_inter.dtype, device=delta_inter.device) + + # 2. Adaptive attractor mixing (learnable weights) + attractor_weights = F.softmax(self.attractor_selector, dim=0) + mixed_chaos = attractor_weights[0] * lor_tensor + attractor_weights[1] * ros_tensor + + # 3. Map 3D Chaos -> Embedding Dimension + perturb = mixed_chaos @ self.vortex_linear + perturb = perturb * self.vortex_scale + + # 4. Terminal Velocity Matching (Flow-like correction) + velocity = compute_terminal_velocity(delta_inter, target_distribution="gaussian") + gated_velocity = velocity * torch.sigmoid(self.velocity_gate) + self.velocity_bias + + # 5. Normalize perturbation magnitude + emb_norm = delta_inter.norm(dim=1, keepdim=True) + 1e-8 + pert_norm = perturb.norm(dim=1, keepdim=True) + 1e-8 + normalized_perturb = perturb * (emb_norm / pert_norm) + + # 6. Adaptive temperature scaling + learned gate + chaos_scale = torch.sigmoid(self.chaos_temperature + self.chaos_gate) + effective_chaos = chaos_factor * chaos_scale + + # Gate chaos intensity if semantic entropy explodes (uncertain corrections) + delta_entropy = compute_semantic_entropy(delta_inter.unsqueeze(1)) + chaos_entropy_gate = 1.0 if delta_entropy < 1.0 else 0.5 + effective_chaos = effective_chaos * chaos_entropy_gate + + # 7. Apply combined perturbation: chaos + velocity flow + delta_inter = delta_inter + effective_chaos * normalized_perturb + 0.1 * gated_velocity + + # Store chaos metrics + flow_states["attractor_mix"] = attractor_weights.detach().cpu().tolist() + flow_states["effective_chaos"] = effective_chaos.item() + flow_states["chaos_scale"] = chaos_scale.item() + flow_states["chaos_entropy_gate"] = chaos_entropy_gate + flow_states["angular_divergence"] = compute_angular_divergence(delta_before_chaos, delta_inter) + + # Gain boost to increase boundary hits when chaos is controlled + delta_inter = delta_inter * 1.5 + flow_states["delta_output"] = delta_inter + + state = torch.cat([approx_inter, delta_inter, w], dim=-1) + evolved = self.euler_vortex(state) + flow_states["output_corner"] = evolved[..., :-1] + flow_states["ia_stage_logs"] = self.ia_router.describe_all_stages() + self.last_flow_states = flow_states + overlap = F.cosine_similarity(inter_primary.view(-1, self.embed_dim), inter_secondary.view(-1, self.embed_dim), dim=-1) + overlap = overlap.view(-1, self.num_streams).mean(dim=1) + annulation = torch.norm(mat_primary - (-mat_secondary), dim=-1).pow(2).mean(dim=1) + vortex_sink = evolved[..., -1] + hall_penalty = (1 - overlap).clamp_min(0.0) + approx_pull = ( + torch.norm(mat_primary.mean(dim=1) - approx_inter, dim=-1).pow(2) + + torch.norm(mat_secondary.mean(dim=1) - approx_inter, dim=-1).pow(2) + ) + + # SIGReg Regularization (Isotropic Gaussian Enforcement) + sig_loss = sigreg_loss(delta_inter) + + # Spectral Energy Regularization (distribui energia uniformemente) + spectral_loss = spectral_energy_loss(delta_inter) + + delta_probs = F.softmax(delta_inter, dim=-1) + semantic_entropy = -(delta_probs * (delta_probs + 1e-8).log()).sum(dim=-1).mean() + semantic_entropy_val = semantic_entropy.item() + + boundary_val = self._get_quadratic_boundary() + boundary_reg = (0.2 - boundary_val).clamp_min(0.0).pow(2) + + loss = ( + annulation.mean() + + 0.5 * hall_penalty.mean() + + 0.25 * approx_pull.mean() + - 0.1 * vortex_sink.mean() + + 0.05 * sig_loss + + 0.02 * spectral_loss # Força distribuição espectral uniforme + + 0.02 * boundary_reg + + 0.05 * semantic_entropy + ) + metrics = { + "annulation": annulation.mean().item(), + "cosine_overlap": overlap.mean().item(), + "vortex_energy": vortex_sink.mean().item(), + "sigreg_loss": sig_loss.item(), + "spectral_loss": spectral_loss.item() if isinstance(spectral_loss, torch.Tensor) else spectral_loss, + "boundary_reg": boundary_reg.mean().item() if isinstance(boundary_reg, torch.Tensor) else boundary_reg, + "rotation_ratio": rotation_ratio, + "approx_alignment": approx_pull.mean().item(), + "reflection_ratio": reflection_ratio, + "reflect_ratio": reflection_ratio, + "boost_ratio": boost_ratio, + "reflection_push_ratio": pre_reflect_push_ratio, + "ia_base": stage_signatures.get("base", "identity"), + "ia_inversion": stage_signatures.get("inversion", "identity"), + "ia_mirror": stage_signatures.get("mirror", "identity"), + "ia_cycle": " || ".join(cycle_signatures) if cycle_signatures else stage_signatures.get(self.cycle_stage_name, "identity"), + "lorentz_gamma": lorentz_gamma, + "square_angle_deg": math.degrees(square_angle_applied) if square_angle_applied is not None else 0.0, + "square_leak_ratio": self.square_leak_ratio, + "angular_divergence": flow_states.get("angular_divergence", 0.0), + "attractor_mix": flow_states.get("attractor_mix", [1.0, 0.0]), + "effective_chaos": flow_states.get("effective_chaos", 1.0), + "semantic_entropy_approx": semantic_entropy_val, + "triangle_iters": triangle_debug.get("iterations", 0) if triangle_debug else 0, + "triangle_residual": triangle_debug.get("residual", 0.0) if triangle_debug else 0.0, + } + delta_norm = torch.norm(delta_inter, dim=-1) + metrics["delta_norm_mean"] = delta_norm.mean().item() + metrics["delta_norm_max"] = delta_norm.max().item() + return evolved, loss, metrics, delta_inter + + +class PortugueseSentenceDataset(Dataset): + def __init__( + self, + sentences: List[str], + tokenizer: AutoTokenizer, + embedding_model: SentenceTransformer, + mask_prob: float = 0.15, + max_seq_length: int = 512, + precompute_embeddings: bool = False, + embedding_batch_size: int = 64, + ): + self.sentences = sentences + self.tokenizer = tokenizer + self.embedding_model = embedding_model + self.mask_prob = mask_prob + self.max_seq_length = max_seq_length + self.precompute_embeddings = precompute_embeddings + self.embedding_batch_size = max(1, embedding_batch_size) + self.embeddings: Optional[torch.Tensor] = None + self._embedding_cache: Dict[int, torch.Tensor] = {} + if self.precompute_embeddings: + self._precompute_all_embeddings() + + def __len__(self) -> int: + return len(self.sentences) + + def _mask_tokens(self, input_ids: torch.Tensor, special_mask: torch.Tensor) -> Dict[str, torch.Tensor]: + labels = input_ids.clone() + probability_matrix = torch.full(labels.shape, self.mask_prob) + probability_matrix.masked_fill_(special_mask.bool(), 0.0) + masked_indices = torch.bernoulli(probability_matrix).bool() + if not masked_indices.any(): + candidate_positions = (~special_mask.bool()).nonzero(as_tuple=False).view(-1) + choice = candidate_positions[torch.randint(0, candidate_positions.numel(), (1,)).item()] + masked_indices[choice] = True + labels[~masked_indices] = -100 + input_ids = input_ids.clone() + input_ids[masked_indices] = self.tokenizer.mask_token_id + return {"input_ids": input_ids, "labels": labels} + + def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]: + sentence = self.sentences[idx] + encoding = self.tokenizer( + sentence, + return_tensors="pt", + return_special_tokens_mask=True, + truncation=True, + max_length=self.max_seq_length, + ) + input_ids = encoding["input_ids"].squeeze(0) + attention_mask = encoding["attention_mask"].squeeze(0) + special_mask = encoding["special_tokens_mask"].squeeze(0) + masked = self._mask_tokens(input_ids, special_mask) + embedding = self._get_embedding(idx) + return { + "input_ids": masked["input_ids"], + "attention_mask": attention_mask, + "labels": masked["labels"], + "embedding": embedding, + } + + @torch.no_grad() + def _precompute_all_embeddings(self) -> None: + chunks: List[torch.Tensor] = [] + total = len(self.sentences) + for start in range(0, total, self.embedding_batch_size): + batch = self.sentences[start : start + self.embedding_batch_size] + batch_embeds = self.embedding_model.encode( + batch, + convert_to_tensor=True, + show_progress_bar=False, + batch_size=self.embedding_batch_size, + ) + chunks.append(batch_embeds.float().cpu()) + self.embeddings = torch.cat(chunks, dim=0) if chunks else torch.empty(0) + + @torch.no_grad() + def _compute_single_embedding(self, sentence: str) -> torch.Tensor: + embed = self.embedding_model.encode( + sentence, + convert_to_tensor=True, + show_progress_bar=False, + batch_size=1, + ) + return embed.float().cpu() + + def _get_embedding(self, idx: int) -> torch.Tensor: + if self.embeddings is not None: + return self.embeddings[idx] + if idx not in self._embedding_cache: + self._embedding_cache[idx] = self._compute_single_embedding(self.sentences[idx]) + return self._embedding_cache[idx] + + +def build_collate_fn(tokenizer: AutoTokenizer): + pad_id = tokenizer.pad_token_id + + def collate(batch: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]: + input_ids = pad_sequence([item["input_ids"] for item in batch], batch_first=True, padding_value=pad_id) + attention_mask = pad_sequence([item["attention_mask"] for item in batch], batch_first=True, padding_value=0) + labels = pad_sequence([item["labels"] for item in batch], batch_first=True, padding_value=-100) + embeddings = torch.stack([item["embedding"] for item in batch]) + return { + "input_ids": input_ids, + "attention_mask": attention_mask, + "labels": labels, + "embedding": embeddings, + } + + return collate + + +class BetinaTrainer: + def __init__( + self, + vortex: VortexBetinaAntiHalluc, + tokenizer: AutoTokenizer, + embedding_model: SentenceTransformer, + mlm_model: AutoModelForMaskedLM, + raw_embedding_dim: int, + embed_dim: int, + lambda_vortex: float = 0.5, + learning_rate: float = 1e-4, + mlm_learning_rate: float = 5e-5, + freeze_mlm: bool = False, + freeze_projectors: bool = False, + correction_max_norm: float | None = None, + chaos_factor: float = 1.0, + eval_chaos_factor: float = 1.0, + device: torch.device | None = None, + max_seq_length: int = 512, + ): + self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.vortex = vortex.to(self.device) + self.tokenizer = tokenizer + self.embedding_model = embedding_model + self.mlm_model = mlm_model.to(self.device) + self.lambda_vortex = lambda_vortex + self.chaos_factor = chaos_factor + self.eval_chaos_factor = eval_chaos_factor + self.embedding_projector = nn.Linear(raw_embedding_dim, embed_dim).to(self.device) + hidden_size = self.mlm_model.config.hidden_size + self.correction_projector = nn.Linear(embed_dim, hidden_size).to(self.device) + self.freeze_projectors = freeze_projectors + self.correction_max_norm = correction_max_norm if correction_max_norm and correction_max_norm > 0 else None + self.max_seq_length = max_seq_length + mlm_params = list(self.mlm_model.parameters()) + if freeze_mlm: + for param in mlm_params: + param.requires_grad = False + projector_params = list(self.embedding_projector.parameters()) + list(self.correction_projector.parameters()) + if freeze_projectors: + for param in projector_params: + param.requires_grad = False + trainable_projectors = [] if freeze_projectors else projector_params + vortex_params = list(self.vortex.parameters()) + trainable_projectors + optimizer_groups = [ + {"params": vortex_params, "lr": learning_rate}, + ] + if not freeze_mlm: + optimizer_groups.append({"params": mlm_params, "lr": mlm_learning_rate}) + self.optimizer = optim.AdamW(optimizer_groups) + self.freeze_mlm = freeze_mlm + self.scaler = betina_grad_scaler(self.device.type, enabled=self.device.type == "cuda") + + def _project_correction(self, delta: torch.Tensor) -> torch.Tensor: + correction = self.correction_projector(delta) + if self.correction_max_norm is not None: + dim = correction.dim() - 1 if correction.dim() > 0 else 0 + correction = correction.renorm(p=2, dim=dim, maxnorm=self.correction_max_norm) + return correction + + def train(self, dataloader: DataLoader, epochs: int = 5, grad_clip: float = 1.0) -> List[Dict[str, float]]: + history: List[Dict[str, float]] = [] + self.vortex.train() + self.mlm_model.train() + for epoch in range(epochs): + for step, batch in enumerate(dataloader): + input_ids = batch["input_ids"].to(self.device) + attention_mask = batch["attention_mask"].to(self.device) + labels = batch["labels"].to(self.device) + embeds = batch["embedding"].to(self.device) + self.optimizer.zero_grad(set_to_none=True) + with betina_autocast(self.device.type, enabled=self.device.type == "cuda"): + projected = self.embedding_projector(embeds) + _, vortex_loss, metrics, delta = self.vortex(projected, chaos_factor=self.chaos_factor) + if self.freeze_mlm: + with torch.no_grad(): + outputs = self.mlm_model( + input_ids=input_ids, + attention_mask=attention_mask, + output_hidden_states=True, + return_dict=True, + ) + else: + outputs = self.mlm_model( + input_ids=input_ids, + attention_mask=attention_mask, + output_hidden_states=True, + return_dict=True, + ) + hidden = outputs.hidden_states[-1] + correction = self._project_correction(delta).unsqueeze(1) + attention_mask_f = attention_mask.unsqueeze(-1).float() + mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() + weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus + corrected_hidden = hidden + correction * weight_mask + if hasattr(self.mlm_model, "cls"): + logits = self.mlm_model.cls(corrected_hidden) + else: + logits = self.mlm_model.get_output_embeddings()(corrected_hidden) + mask_positions = labels != -100 + if mask_positions.any(): + mlm_loss = F.cross_entropy(logits[mask_positions], labels[mask_positions]) + else: + mlm_loss = torch.zeros(1, device=self.device) + total_loss = mlm_loss + self.lambda_vortex * vortex_loss + self.scaler.scale(total_loss).backward() + torch.nn.utils.clip_grad_norm_(self.parameters(), grad_clip) + self.scaler.step(self.optimizer) + self.scaler.update() + perplexity = torch.exp(mlm_loss.detach()) + record = { + "epoch": epoch + 1, + "step": step + 1, + "mlm_loss": mlm_loss.detach().item(), + "vortex_loss": vortex_loss.detach().item(), + "total_loss": total_loss.detach().item(), + "perplexity": perplexity.item(), + "vortex_energy": metrics["vortex_energy"], + "cosine_overlap": metrics["cosine_overlap"], + "rotation_ratio": metrics["rotation_ratio"], + "approx_alignment": metrics["approx_alignment"], + "reflection_ratio": metrics["reflection_ratio"], + "boost_ratio": metrics.get("boost_ratio", 0.0), + "reflection_push_ratio": metrics.get("reflection_push_ratio", 0.0), + } + history.append(record) + if step % 10 == 0: + print( + f"Epoch {record['epoch']:03d} Step {record['step']:04d} | " + f"Total {record['total_loss']:.4f} | MLM {record['mlm_loss']:.4f} | " + f"PPL {record['perplexity']:.4f} | " + f"Vortex {record['vortex_loss']:.4f} | Overlap {record['cosine_overlap']:.4f} | " + f"Energy {record['vortex_energy']:.4f} | Rotation {record['rotation_ratio']:.3f} | " + f"Reflect {record['reflection_ratio']:.3f} | Boost {record['boost_ratio']:.3f} | " + f"PreReflect {record['reflection_push_ratio']:.3f} | Approx {record['approx_alignment']:.4f}" + ) + return history + + def parameters(self): + for module in (self.vortex, self.embedding_projector, self.correction_projector, self.mlm_model): + for param in module.parameters(): + if param.requires_grad: + yield param + + @torch.no_grad() + def evaluate_perplexity(self, dataloader: DataLoader, apply_correction: bool = True) -> float: + self.vortex.eval() + self.mlm_model.eval() + total_loss = 0.0 + total_tokens = 0 + for batch in dataloader: + input_ids = batch["input_ids"].to(self.device) + attention_mask = batch["attention_mask"].to(self.device) + labels = batch["labels"].to(self.device) + embeds = batch["embedding"].to(self.device) + outputs = self.mlm_model( + input_ids=input_ids, + attention_mask=attention_mask, + output_hidden_states=True, + return_dict=True, + ) + hidden = outputs.hidden_states[-1] + if apply_correction: + projected = self.embedding_projector(embeds) + _, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor) + correction = self._project_correction(delta).unsqueeze(1) + attention_mask_f = attention_mask.unsqueeze(-1).float() + mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() + weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus + hidden = hidden + correction * weight_mask + if hasattr(self.mlm_model, "cls"): + logits = self.mlm_model.cls(hidden) + else: + logits = self.mlm_model.get_output_embeddings()(hidden) + mask_positions = labels != -100 + if mask_positions.any(): + loss = F.cross_entropy(logits[mask_positions], labels[mask_positions], reduction="sum") + total_loss += loss.item() + total_tokens += mask_positions.sum().item() + if total_tokens == 0: + return float("inf") + return math.exp(total_loss / total_tokens) + + @torch.no_grad() + def save(self, output_dir: str) -> None: + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + torch.save(self.vortex.state_dict(), output_path / "vortex.pt") + torch.save(self.embedding_projector.state_dict(), output_path / "embedding_projector.pt") + torch.save(self.correction_projector.state_dict(), output_path / "correction_projector.pt") + self.mlm_model.save_pretrained(output_path / "mlm") + self.tokenizer.save_pretrained(output_path / "mlm") + + @torch.no_grad() + def fill_masks( + self, + texts: List[str], + top_k: int = 5, + apply_correction: bool = True, + ) -> List[List[List[Tuple[str, float]]]]: + self.vortex.eval() + self.mlm_model.eval() + encodings = self.tokenizer( + texts, + return_tensors="pt", + padding=True, + truncation=True, + max_length=self.max_seq_length, + ) + input_ids = encodings["input_ids"].to(self.device) + attention_mask = encodings["attention_mask"].to(self.device) + outputs = self.mlm_model( + input_ids=input_ids, + attention_mask=attention_mask, + output_hidden_states=True, + return_dict=True, + ) + hidden = outputs.hidden_states[-1] + if apply_correction: + embeds = self.embedding_model.encode(texts, convert_to_tensor=True, show_progress_bar=False).to(self.device) + projected = self.embedding_projector(embeds) + _, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor) + correction = self._project_correction(delta).unsqueeze(1) + attention_mask_f = attention_mask.unsqueeze(-1).float() + mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float() + weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus + hidden = hidden + correction * weight_mask + if hasattr(self.mlm_model, "cls"): + logits = self.mlm_model.cls(hidden) + else: + logits = self.mlm_model.get_output_embeddings()(hidden) + mask_positions = (input_ids == self.tokenizer.mask_token_id) + results: List[List[List[Tuple[str, float]]]] = [] + for batch_index in range(input_ids.size(0)): + batch_tokens: List[List[Tuple[str, float]]] = [] + positions = mask_positions[batch_index].nonzero(as_tuple=False).view(-1) + for position in positions: + token_logits = logits[batch_index, position] + token_probs = F.softmax(token_logits, dim=-1) + topk = torch.topk(token_probs, top_k) + decoded: List[Tuple[str, float]] = [] + for token_id, prob in zip(topk.indices, topk.values): + token = self.tokenizer.decode([token_id]).strip() + decoded.append((token, prob.item())) + batch_tokens.append(decoded) + results.append(batch_tokens) + return results + + +def run_demo(embed_dim: int = 4, batch_size: int = 3, seed: int = 123) -> None: + torch.manual_seed(seed) + model = VortexBetinaAntiHalluc(embed_dim=embed_dim) + inputs = torch.randn(batch_size, embed_dim) + evolved, loss, metrics, delta = model(inputs) + print("Dimensão de entrada:", embed_dim) + print("Inputs:", inputs) + print("Estado evoluído shape:", evolved.shape) + print("Delta shape:", delta.shape) + print("Loss:", loss.item()) + for key, value in metrics.items(): + print(f"{key}: {value}") + + +def sample_sentences() -> List[str]: + return [ + "O céu de Lisboa estava completamente claro naquela manhã.", + "A inteligência coletiva da equipe resolveu o problema rapidamente.", + "O gato preto dormia tranquilo sobre o sofá da sala.", + "A orquestra executou a sinfonia com uma precisão impressionante.", + "Os dados indicam uma redução consistente nas alucinações do modelo.", + "A pesquisa científica requer paciência, rigor e curiosidade constante.", + "A ponte antiga foi restaurada para preservar o patrimônio cultural.", + "O sistema Betina ajusta embeddings para evitar distorções semânticas.", + ] + + +def load_sentences_from_args(args: argparse.Namespace) -> List[str]: + if args.dataset_file: + path = Path(args.dataset_file) + if not path.exists(): + raise FileNotFoundError(f"Dataset file not found: {path}") + sentences = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] + if not sentences: + raise ValueError(f"Dataset file {path} is empty") + return sentences if args.dataset_limit is None else sentences[: args.dataset_limit] + if args.dataset_hf: + if load_dataset is None: + raise ImportError("Install the 'datasets' package to use --dataset-hf option") + split = args.dataset_split + if args.dataset_limit and ":" not in split: + split = f"{split}[:{args.dataset_limit}]" + dataset_name = args.dataset_hf + config_name: Optional[str] = args.dataset_hf_config or None + try: + dataset = _safe_load_dataset( + dataset_name, + config_name, + split=split, + hf_token=args.hf_token, + trust_remote_code=args.trust_remote_code, + ) + except Exception as exc: + message = str(exc).lower() + scripts_blocked = "dataset scripts are no longer supported" in message + trust_flag_blocked = "trust_remote_code" in message + if dataset_name == "wikipedia" and (scripts_blocked or trust_flag_blocked): + fallback_name = "wikimedia/wikipedia" + fallback_config = config_name or "20231101.pt" + print( + "Dataset 'wikipedia' agora usa snapshot parquet e não aceita mais scripts remotos." + f" Alternando automaticamente para {fallback_name} ({fallback_config})." + ) + dataset = _safe_load_dataset( + fallback_name, + fallback_config, + split=split, + hf_token=args.hf_token, + trust_remote_code=False, + ) + elif scripts_blocked and not args.trust_remote_code: + hint = ( + "O dataset solicita código remoto. Reexecute com --trust-remote-code para habilitar" + " scripts do autor do dataset. Apenas use se confiar na fonte." + ) + raise RuntimeError(hint) from exc + if "gated dataset" in message or "403" in message or "401" in message: + hint = ( + "Dataset protegido requer autenticação. Informe --hf-token , defina HF_TOKEN/HUGGINGFACE_TOKEN" + " ou utilize --hf-token-file apontando para o token salvo pelo huggingface-cli. Também é possível" + " executar 'huggingface-cli login' para gerar ~/.cache/huggingface/token.\nVocê pode obter o token em" + " https://huggingface.co/settings/tokens." + ) + raise RuntimeError(hint) from exc + raise + sentences: List[str] = [] + text_field = args.dataset_text_field + limit = args.dataset_limit + for item in dataset: + text = item.get(text_field) + if isinstance(text, str) and text.strip(): + sentences.append(text.strip()) + if limit is not None and len(sentences) >= limit: + break + if not sentences: + raise ValueError("No sentences extracted from the specified dataset") + return sentences + return sample_sentences() + + +def print_gain_summary( + prompts: List[str], + base_fills: List[List[List[Tuple[str, float]]]], + betina_fills: List[List[List[Tuple[str, float]]]], +) -> None: + print("\nResumo de ganhos top-1:") + for prompt, base_masks, betina_masks in zip(prompts, base_fills, betina_fills): + prompt_head = prompt if len(prompt) <= 60 else f"{prompt[:57]}..." + for idx, (base_group, betina_group) in enumerate(zip(base_masks, betina_masks), start=1): + if not base_group or not betina_group: + continue + base_token, base_prob = base_group[0] + betina_token, betina_prob = betina_group[0] + delta = betina_prob - base_prob + if base_prob > 0: + rel = delta / base_prob * 100.0 + rel_text = f"{rel:+.2f}%" + else: + rel_text = "n/d" + change_desc = "mantido" if betina_token == base_token else f"{base_token} -> {betina_token}" + print( + f" [{prompt_head}] máscara {idx}: {change_desc} | base {base_prob:.4f} -> betina {betina_prob:.4f} ({rel_text})" + ) + + +def _prepare_debug_value(value, max_examples: int): + if isinstance(value, torch.Tensor): + limited = value.detach().cpu() + if limited.dim() >= 1: + limited = limited[:max_examples] + return limited.tolist() + if isinstance(value, dict): + return {key: _prepare_debug_value(val, max_examples) for key, val in value.items()} + if isinstance(value, (list, tuple)): + return [_prepare_debug_value(item, max_examples) for item in value] + if isinstance(value, (float, int, str)) or value is None: + return value + return str(value) + + +def dump_square_debug(flow_states: Dict[str, object], metrics: Dict[str, float], output_path: str, max_examples: int = 1) -> Path: + output = Path(output_path).expanduser() + payload = { + "max_examples": max(1, max_examples), + "metrics": {key: float(value) if isinstance(value, (int, float)) else value for key, value in metrics.items()}, + "flow_states": {key: _prepare_debug_value(val, max_examples) for key, val in flow_states.items()}, + } + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(json.dumps(payload, indent=2), encoding="utf-8") + return output + + +def main(argv: list[str] | None = None): + parser = argparse.ArgumentParser(description="Treinamento do modelo Betina anti-hallucination") + parser.add_argument("--train", action="store_true", help="Executa treinamento completo") + parser.add_argument("--epochs", type=int, default=5, help="Número de épocas para treinar") + parser.add_argument("--batch-size", type=int, default=4, help="Tamanho do batch") + parser.add_argument("--embed-dim", type=int, default=256, choices=[128, 256], help="Dimensão interna do vórtice") + parser.add_argument("--lambda-vortex", type=float, default=0.1, help="Peso da loss do vórtice") + parser.add_argument("--chaos-factor", type=float, default=1.0, help="Fator de caos aplicado durante o treinamento") + parser.add_argument( + "--eval-chaos-factor", + type=float, + default=1.0, + help="Fator de caos usado em avaliação e inferência (permite medir diferentes regimes)", + ) + parser.add_argument("--learning-rate", type=float, default=1e-4, help="Learning rate para o vórtice e projetores") + parser.add_argument("--mlm-learning-rate", type=float, default=5e-5, help="Learning rate para o modelo de linguagem") + parser.add_argument("--freeze-mlm", action="store_true", help="Congela os pesos do modelo de linguagem durante o treino") + parser.add_argument("--freeze-projectors", action="store_true", help="Congela os projetores de embedding/correção (modo inferência)") + parser.add_argument( + "--correction-max-norm", + type=float, + default=None, + help="Clampa o vetor de correção Betina a esta norma L2 (<=0 desativa)", + ) + parser.add_argument("--output-dir", type=str, default="outputs/betina_vortex", help="Diretório para salvar o modelo") + parser.add_argument("--device", type=str, default=None, help="Força execução em cuda ou cpu") + parser.add_argument("--top-k", type=int, default=5, help="Top-k para avaliação de máscara") + parser.add_argument("--skip-eval", action="store_true", help="Pula avaliação pós-treino") + parser.add_argument("--eval-prompts", nargs="*", default=None, help="Prompts personalizados contendo [MASK] para avaliação") + parser.add_argument("--dataset-file", type=str, default=None, help="Arquivo de texto com uma sentença por linha") + parser.add_argument("--dataset-hf", type=str, default=None, help="Nome do dataset Hugging Face, ex.: oscar") + parser.add_argument("--dataset-hf-config", type=str, default=None, help="Config do dataset Hugging Face, ex.: unshuffled_deduplicated_pt") + parser.add_argument("--dataset-split", type=str, default="train[:1000]", help="Split do dataset Hugging Face") + parser.add_argument("--dataset-text-field", type=str, default="text", help="Campo de texto no dataset Hugging Face") + parser.add_argument("--dataset-limit", type=int, default=None, help="Limite de sentenças carregadas") + parser.add_argument("--hf-token", type=str, default=None, help="Token de autenticação da Hugging Face (ou defina HF_TOKEN)") + parser.add_argument( + "--hf-token-file", + type=str, + default=None, + help="Arquivo contendo o token da Hugging Face (padrão: ~/.cache/huggingface/token)", + ) + parser.add_argument("--trust-remote-code", action="store_true", help="Permite datasets com script remoto (requer confiança no autor)") + parser.add_argument("--force-download", action="store_true", help="Força novo download dos pesos do modelo de linguagem") + parser.add_argument("--disable-rotation", action="store_true", help="Desativa a rotação do delta do vórtice") + parser.add_argument("--rotation-angle", type=float, default=math.pi / 4, help="Ângulo (rad) para rotacionar o delta quando ativado") + parser.add_argument("--rotation-threshold", type=float, default=1e-4, help="Norma mínima do delta para aplicar rotação") + parser.add_argument("--rotation-clockwise", action="store_true", help="Força rotação horária (inverte o sinal do ângulo)") + parser.add_argument( + "--enable-quadratic-reflection", + action="store_true", + help="Ativa reflexão quadrática estilo bolinha/bastão no delta do vórtice", + ) + parser.add_argument( + "--quadratic-boundary", + type=float, + default=1.0, + help="Magnitudes acima desse valor são refletidas (parede virtual)", + ) + parser.add_argument( + "--quadratic-strength", + type=float, + default=0.5, + help="Mistura (0-1) entre o delta original e o refletido quadrático", + ) + parser.add_argument( + "--disable-triangle", + action="store_true", + help="Desativa o neurônio sintético triangular que confronta X, Y e contrabase", + ) + parser.add_argument( + "--triangle-hidden-dim", + type=int, + default=512, + help="Dimensão oculta usada dentro do neurônio triangular", + ) + parser.add_argument( + "--triangle-max-iters", + type=int, + default=5, + help="Iterações máximas de refinamento (porquês) do triângulo", + ) + parser.add_argument( + "--triangle-tol", + type=float, + default=1e-4, + help="Tolerância para encerrar refinamento triangular (quanto menor, mais perguntas)", + ) + parser.add_argument( + "--triangle-delta-gain", + type=float, + default=1.0, + help="Ganho aplicado ao eixo integrador do triângulo", + ) + parser.add_argument( + "--disable-square-geometry", + action="store_true", + help="Desativa o giro de 180° que forma o quadrado X↔X⁻¹", + ) + parser.add_argument( + "--square-rotation-degrees", + type=float, + default=180.0, + help="Ângulo aplicado ao girar a matriz completa (180° gera o quadrado perfeito)", + ) + parser.add_argument( + "--square-leak-ratio", + type=float, + default=0.05, + help="Mistura o quadrado com a matriz original (0 mantém oposição perfeita, 1 ignora o giro)", + ) + parser.add_argument( + "--square-jitter-std-deg", + type=float, + default=0.0, + help="Desvio padrão em graus para injetar ruído aleatório no giro quadrado", + ) + parser.add_argument( + "--square-debug-json", + type=str, + default=None, + help="Se definido, salva um dump JSON com as matrizes primária/secundária e métricas do vórtice", + ) + parser.add_argument( + "--square-debug-max", + type=int, + default=1, + help="Número máximo de exemplos incluídos no dump quadrado", + ) + parser.add_argument( + "--enable-lorentz-transform", + action="store_true", + help="Aplica transformação de Lorentz no delta final para medir o resultado físico", + ) + parser.add_argument( + "--lorentz-beta", + type=float, + default=0.6, + help="Fração da velocidade da luz usada no boost de Lorentz", + ) + parser.add_argument( + "--lorentz-axis-stream", + type=int, + default=0, + help="Stream usado como eixo espacial (0=X, 1=Y, etc.) na transformação de Lorentz", + ) + parser.add_argument( + "--ia-config", + type=str, + default=None, + help="Arquivo JSON descrevendo quais IAs assumem cada estágio/stream do fluxo matriz", + ) + parser.add_argument( + "--refinement-cycles", + type=int, + default=0, + help="Quantidade de ciclos circundantes de refinamento IA aplicados ao delta final", + ) + parser.add_argument( + "--cycle-stage-name", + type=str, + default="cycle", + help="Nome do estágio IA usado durante cada ciclo circundante", + ) + parser.add_argument("--max-seq-length", type=int, default=512, help="Comprimento máximo de tokens por exemplo (padrão BERT)") + parser.add_argument("--precompute-embeddings", action="store_true", help="Codifica todas as sentenças antes do treino (requer muita RAM)") + parser.add_argument("--embedding-batch-size", type=int, default=64, help="Batch interno para geração de embeddings (encode)") + if argv is None: + argv_list = sys.argv[1:] + if "ipykernel" in sys.modules: + filtered: List[str] = [] + skip_next = False + for item in argv_list: + if skip_next: + skip_next = False + continue + if item == "-f": + skip_next = True + continue + if item.startswith("-f="): + continue + filtered.append(item) + argv_list = filtered + else: + argv_list = list(argv) + + parsed, unknown = parser.parse_known_args(argv_list) + if unknown: + print(f"Ignorando argumentos desconhecidos: {unknown}") + args = parsed + + args.hf_token, token_source = resolve_hf_token(args.hf_token, args.hf_token_file) + if args.hf_token and token_source: + print(f"Token Hugging Face detectado via {token_source}.") + + device = torch.device(args.device) if args.device else torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f"Usando dispositivo: {device}") + + embedding_model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" + tokenizer_name = "neuralmind/bert-base-portuguese-cased" + + embedding_model = SentenceTransformer(embedding_model_name, device=str(device) if device.type == "cuda" else "cpu") + tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token or tokenizer.sep_token or tokenizer.cls_token + try: + mlm_model = AutoModelForMaskedLM.from_pretrained(tokenizer_name, force_download=args.force_download) + except Exception as exc: + print(f"Download failed: {exc}. Try checking internet or cache.") + raise + + sentences = load_sentences_from_args(args) + dataset = PortugueseSentenceDataset( + sentences, + tokenizer, + embedding_model, + max_seq_length=args.max_seq_length, + precompute_embeddings=args.precompute_embeddings, + embedding_batch_size=args.embedding_batch_size, + ) + collate_fn = build_collate_fn(tokenizer) + dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn) + eval_dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn) + + stream_aliases: Optional[List[str]] = None + ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None + refinement_cycles = args.refinement_cycles + config_cycle_stage = args.cycle_stage_name + if args.ia_config: + ia_config_data = load_ia_config_file(args.ia_config, args.embed_dim) + print(f"Config IA carregada de {args.ia_config}") + stream_aliases = ia_config_data.get("stream_aliases") + ia_stage_config = ia_config_data.get("stage_config") # type: ignore[assignment] + config_cycles = ia_config_data.get("refinement_cycles") + if isinstance(config_cycles, int): + refinement_cycles = config_cycles + config_cycle_stage = str(ia_config_data.get("cycle_stage_name", config_cycle_stage)) + + vortex = VortexBetinaAntiHalluc( + embed_dim=args.embed_dim, + enable_rotation=not args.disable_rotation, + rotation_angle=args.rotation_angle, + rotation_threshold=args.rotation_threshold, + rotation_clockwise=args.rotation_clockwise, + enable_quadratic_reflection=args.enable_quadratic_reflection, + quadratic_boundary=args.quadratic_boundary, + quadratic_strength=args.quadratic_strength, + stream_aliases=stream_aliases, + ia_stage_config=ia_stage_config, + refinement_cycles=refinement_cycles, + cycle_stage_name=config_cycle_stage, + enforce_square_geometry=not args.disable_square_geometry, + square_rotation_degrees=args.square_rotation_degrees, + square_leak_ratio=args.square_leak_ratio, + square_jitter_std_degrees=args.square_jitter_std_deg, + enable_lorentz_transform=args.enable_lorentz_transform, + lorentz_beta=args.lorentz_beta, + lorentz_axis_stream=args.lorentz_axis_stream, + enable_triangle=not args.disable_triangle, + triangle_hidden_dim=args.triangle_hidden_dim, + triangle_max_iters=args.triangle_max_iters, + triangle_tol=args.triangle_tol, + triangle_delta_gain=args.triangle_delta_gain, + ) + trainer = BetinaTrainer( + vortex=vortex, + tokenizer=tokenizer, + embedding_model=embedding_model, + mlm_model=mlm_model, + raw_embedding_dim=embedding_model.get_sentence_embedding_dimension(), + embed_dim=args.embed_dim, + lambda_vortex=args.lambda_vortex, + learning_rate=args.learning_rate, + mlm_learning_rate=args.mlm_learning_rate, + freeze_mlm=args.freeze_mlm, + freeze_projectors=args.freeze_projectors, + correction_max_norm=args.correction_max_norm, + eval_chaos_factor=args.eval_chaos_factor, + chaos_factor=args.chaos_factor, + device=device, + max_seq_length=args.max_seq_length, + ) + + if args.square_debug_json: + square_max = max(1, args.square_debug_max) + try: + sample_batch = next(iter(dataloader)) + except StopIteration as exc: # pragma: no cover - dataset vazio + raise RuntimeError("Não é possível gerar dump quadrado: dataset vazio") from exc + sample_embeddings = sample_batch["embedding"][:square_max].to(device) + with torch.no_grad(): + projected = trainer.embedding_projector(sample_embeddings) + _, _, metrics_debug, _ = trainer.vortex(projected, chaos_factor=trainer.eval_chaos_factor) + dump_square_debug( + trainer.vortex.last_flow_states, + metrics_debug, + args.square_debug_json, + max_examples=min(square_max, sample_embeddings.size(0)), + ) + print(f"Dump quadrado salvo em {args.square_debug_json}") + + if args.train: + print("Iniciando treinamento...") + history = trainer.train(dataloader, epochs=args.epochs) + print(f"Treinamento finalizado com {len(history)} passos") + trainer.save(args.output_dir) + print(f"Modelos salvos em {args.output_dir}") + + if not args.skip_eval: + ppl_base = trainer.evaluate_perplexity(eval_dataloader, apply_correction=False) + ppl_betina = trainer.evaluate_perplexity(eval_dataloader, apply_correction=True) + print(f"\nPerplexity sem correção: {ppl_base:.4f}") + print(f"Perplexity com correção Betina: {ppl_betina:.4f}") + ppl_delta = ppl_base - ppl_betina + if ppl_base > 0: + ppl_rel = ppl_delta / ppl_base * 100.0 + print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: {ppl_rel:+.2f}%") + else: + print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: n/d") + + eval_prompts = args.eval_prompts or [ + "O modelo Betina evita [MASK] durante a geração.", + "A capital de Portugal é [MASK].", + "A IA Betina corrige [MASK] via vórtice.", + "O vórtice no Betina filtra [MASK] para reduzir alucinações.", + ] + print("\nPreenchimento sem correção:") + base_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=False) + for prompt, tokens in zip(eval_prompts, base_fills): + print(prompt) + for idx, group in enumerate(tokens, start=1): + formatted = [f"{token} ({prob:.4f})" for token, prob in group] + print(f" Mascara {idx}: {formatted}") + print("\nPreenchimento com correção Betina:") + betina_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=True) + for prompt, tokens in zip(eval_prompts, betina_fills): + print(prompt) + for idx, group in enumerate(tokens, start=1): + formatted = [f"{token} ({prob:.4f})" for token, prob in group] + print(f" Mascara {idx}: {formatted}") + + print_gain_summary(eval_prompts, base_fills, betina_fills) + + +if __name__ == "__main__": main() \ No newline at end of file