bettina3.0 / bettina.py
reynaldo22's picture
Update bettina.py
762811e verified
import argparse
import json
import math
import os
import sys
import numpy as np
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from contextlib import nullcontext
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
try:
from torch.amp import autocast as _autocast, GradScaler as _GradScaler
def betina_autocast(device_type: str, enabled: bool = True):
if not enabled or device_type != "cuda":
return nullcontext()
return _autocast(device_type=device_type, enabled=enabled)
def betina_grad_scaler(device_type: str, enabled: bool = True):
if not enabled or device_type != "cuda":
return _NoOpGradScaler()
return _GradScaler(device_type=device_type, enabled=enabled)
except ImportError: # pragma: no cover
from torch.cuda.amp import autocast as _autocast, GradScaler as _GradScaler
def betina_autocast(device_type: str, enabled: bool = True):
if not enabled or device_type != "cuda":
return nullcontext()
return _autocast(enabled=enabled)
def betina_grad_scaler(device_type: str, enabled: bool = True):
if not enabled or device_type != "cuda":
return _NoOpGradScaler()
return _GradScaler(enabled=enabled)
try:
from sentence_transformers import SentenceTransformer
except ImportError as exc: # pragma: no cover
raise ImportError("Install sentence-transformers to run the Betina pipeline") from exc
try:
from transformers import AutoModelForMaskedLM, AutoTokenizer
except ImportError as exc: # pragma: no cover
raise ImportError("Install transformers to run the Betina pipeline") from exc
try:
from datasets import load_dataset # type: ignore[import-not-found]
except ImportError: # pragma: no cover
load_dataset = None
def _safe_load_dataset(
path: str,
name: Optional[str],
*,
split: str,
hf_token: Optional[str],
trust_remote_code: bool,
):
if load_dataset is None:
raise ImportError("Install the 'datasets' package to use Hugging Face corpora")
base_kwargs = {"split": split, "trust_remote_code": trust_remote_code}
attempts: List[Dict[str, Optional[str]]] = []
if hf_token:
attempts.append({"token": hf_token})
attempts.append({"use_auth_token": hf_token})
attempts.append({})
last_error: Optional[Exception] = None
for extra in attempts:
try:
return load_dataset(path, name, **base_kwargs, **extra)
except TypeError as err:
last_error = err
continue
except ValueError as err:
if "use_auth_token" in str(err).lower():
last_error = err
continue
raise
if last_error:
raise last_error
raise RuntimeError(f"Falha ao carregar dataset {path}/{name}")
def _read_hf_token_file(path: Path) -> Optional[str]:
try:
content = path.read_text(encoding="utf-8").strip()
except OSError:
return None
if not content:
return None
first_line = content.splitlines()[0].strip()
return first_line or None
def resolve_hf_token(explicit_token: Optional[str], token_file: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
"""Resolve o token HF preferindo argumento, env vars e arquivo do huggingface-cli."""
if explicit_token and explicit_token.strip():
return explicit_token.strip(), "--hf-token"
env_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
if env_token and env_token.strip():
return env_token.strip(), "env"
file_candidates: List[Path] = []
if token_file:
file_candidates.append(Path(token_file).expanduser())
else:
hf_home = os.getenv("HF_HOME")
if hf_home:
file_candidates.append(Path(hf_home).expanduser() / "token")
file_candidates.append(Path.home() / ".cache" / "huggingface" / "token")
file_candidates.append(Path.home() / ".huggingface" / "token")
for candidate in file_candidates:
token = _read_hf_token_file(candidate)
if token:
return token, str(candidate)
return None, None
class _NoOpGradScaler:
def __init__(self):
pass
def scale(self, loss):
return loss
def step(self, optimizer):
optimizer.step()
def update(self):
pass
def unscale_(self, optimizer):
pass
def state_dict(self):
return {}
def load_state_dict(self, state):
pass
class CallableAgentAdapter(nn.Module):
def __init__(self, fn: Callable[[torch.Tensor], torch.Tensor], name: str):
super().__init__()
self.fn = fn
self.agent_name = name or getattr(fn, "__name__", "callable_agent")
def forward(self, tensor: torch.Tensor) -> torch.Tensor: # noqa: D401
return self.fn(tensor)
class MultiIntelligenceRouter(nn.Module):
"""Despacha cada estágio do fluxo matriz para IAs distintas por stream/etapa."""
def __init__(
self,
num_streams: int,
*,
stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None,
stream_aliases: Optional[List[str]] = None,
):
super().__init__()
self.num_streams = num_streams
self.stream_aliases = stream_aliases or [f"S{idx}" for idx in range(num_streams)]
self.stage_modules = nn.ModuleDict()
self.stage_logs: Dict[str, List[Dict[str, str]]] = {}
if stage_config:
self.apply_stage_config(stage_config)
def apply_stage_config(self, stage_config: Dict[str, Dict[str, nn.Module]]) -> None:
for stage_name, mapping in stage_config.items():
module_dict = nn.ModuleDict()
for key, module in mapping.items():
module_dict[str(key)] = self._wrap_module(stage_name, key, module)
self.stage_modules[stage_name] = module_dict
def register_stage(self, stage_name: str, mapping: Dict[str, nn.Module]) -> None:
module_dict = nn.ModuleDict()
for key, module in mapping.items():
module_dict[str(key)] = self._wrap_module(stage_name, key, module)
self.stage_modules[stage_name] = module_dict
def _wrap_module(self, stage: str, key: str | int, module: nn.Module | Callable) -> nn.Module:
if isinstance(module, list):
wrapped = [self._wrap_module(stage, f"{key}_{idx}", item) for idx, item in enumerate(module)]
seq = nn.Sequential(*wrapped)
if not hasattr(seq, "agent_name"):
seq.agent_name = f"seq_{stage}_{key}"
return seq
if isinstance(module, nn.Module):
if not hasattr(module, "agent_name"):
module.agent_name = module.__class__.__name__
return module
if callable(module):
name = getattr(module, "agent_name", None) or getattr(module, "__name__", f"{stage}_{key}_fn")
return CallableAgentAdapter(module, name)
raise TypeError(f"Módulo IA inválido para estágio {stage}/{key}: {type(module)}")
def _select_module(self, stage_dict: nn.ModuleDict, key: str | int | None) -> Optional[nn.Module]:
if key is not None:
candidate_key = str(key)
if candidate_key in stage_dict:
return stage_dict[candidate_key]
for fallback in ("*", "default", "-1"):
if fallback in stage_dict:
return stage_dict[fallback]
return None
def apply(self, stage: str, tensor: torch.Tensor, *, cycle_idx: Optional[int] = None) -> torch.Tensor:
stage_dict = self.stage_modules[stage] if stage in self.stage_modules else None
log: List[Dict[str, str]] = []
if stage_dict is None:
self.stage_logs[stage] = log
return tensor
if tensor.dim() == 3:
outputs = []
for stream_idx in range(tensor.size(1)):
module = self._select_module(stage_dict, stream_idx)
if module is None and stream_idx < len(self.stream_aliases):
module = self._select_module(stage_dict, self.stream_aliases[stream_idx])
chunk = tensor[:, stream_idx, :]
if module is not None:
chunk = module(chunk)
log.append(
{
"stream": self.stream_aliases[stream_idx] if stream_idx < len(self.stream_aliases) else str(stream_idx),
"agent": getattr(module, "agent_name", module.__class__.__name__),
}
)
outputs.append(chunk)
stacked = torch.stack(outputs, dim=1)
self.stage_logs[stage] = log
return stacked
module = None
if cycle_idx is not None:
module = self._select_module(stage_dict, cycle_idx)
if module is None:
module = self._select_module(stage_dict, "global")
if module is None:
module = self._select_module(stage_dict, None)
if module is None:
self.stage_logs[stage] = log
return tensor
updated = module(tensor)
alias = f"cycle_{cycle_idx}" if cycle_idx is not None else "global"
log.append({"stream": alias, "agent": getattr(module, "agent_name", module.__class__.__name__)})
self.stage_logs[stage] = log
return updated
def stage_signature(self, stage: str) -> str:
logs = self.stage_logs.get(stage, [])
if not logs:
return "identity"
return " | ".join(f"{entry['stream']}{entry['agent']}" for entry in logs)
def describe_all_stages(self) -> Dict[str, List[Dict[str, str]]]:
return {stage: list(entries) for stage, entries in self.stage_logs.items()}
class SyntheticNeuronTriangle(nn.Module):
"""Refina deltas considerando uma base triangular (X,Y,contrabase)."""
def __init__(
self,
embed_dim: int,
num_streams: int,
*,
hidden_dim: int = 512,
max_iters: int = 5,
tol: float = 1e-4,
delta_gain: float = 1.0,
) -> None:
super().__init__()
self.embed_dim = embed_dim
self.num_streams = num_streams
self.max_iters = max(0, max_iters)
self.tol = max(1e-6, tol)
self.delta_gain = float(delta_gain)
seed_in = embed_dim * 4 # X, Y, contrabase média, diagonal
refine_in = embed_dim * 3 # delta médio, eixo integrado, diagonal
self.seed_proj = nn.Sequential(
nn.LayerNorm(seed_in),
nn.Linear(seed_in, hidden_dim),
nn.GELU(),
nn.Linear(hidden_dim, embed_dim),
)
self.refine_proj = nn.Sequential(
nn.LayerNorm(refine_in),
nn.Linear(refine_in, hidden_dim),
nn.SiLU(),
nn.Linear(hidden_dim, embed_dim),
)
def forward(
self,
mat_primary: torch.Tensor,
mat_secondary: torch.Tensor,
base_core: torch.Tensor,
) -> Tuple[torch.Tensor, Dict[str, object]]:
batch, streams, dim = mat_primary.shape
x_stream = mat_primary[:, 0, :]
y_stream = mat_primary[:, 1, :] if streams > 1 else mat_secondary[:, 0, :]
contra_stream = mat_secondary[:, 0, :]
contra_mean = mat_secondary.mean(dim=1)
base_center = torch.stack([x_stream, y_stream], dim=1).mean(dim=1)
diag = torch.stack([x_stream - contra_stream, y_stream - contra_mean], dim=1).mean(dim=1)
seed_features = torch.cat([base_center, contra_mean, base_core.squeeze(1), diag], dim=-1)
axis_vector = torch.tanh(self.seed_proj(seed_features))
axis_norm = F.normalize(axis_vector, dim=-1)
base_delta = mat_primary - base_core
stream_align = torch.sum(
F.normalize(mat_primary, dim=-1) * axis_norm.unsqueeze(1),
dim=-1,
keepdim=True,
)
axis_component = axis_norm.unsqueeze(1) * stream_align
delta = base_delta + self.delta_gain * axis_component
iterations = 0
last_change = torch.zeros(1, device=mat_primary.device)
if self.max_iters > 0:
for idx in range(self.max_iters):
delta_mean = delta.mean(dim=1)
refine_inp = torch.cat([delta_mean, axis_vector, diag], dim=-1)
refine = torch.tanh(self.refine_proj(refine_inp)).unsqueeze(1)
delta = delta + refine
last_change = refine.norm(dim=-1).mean()
iterations = idx + 1
if last_change.item() < self.tol:
break
debug = {
"axis": axis_vector.detach(),
"diag": diag.detach(),
"iterations": iterations,
"residual": float(last_change.detach().item()),
}
return delta, debug
def build_builtin_agent(name: str, embed_dim: int) -> nn.Module:
normalized = name.strip().lower()
if normalized in {"brock", "brockman", "brock ia"}:
module = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, embed_dim * 2),
nn.GELU(),
nn.Linear(embed_dim * 2, embed_dim),
)
elif normalized in {"chatgpt", "chatgpt 5.1", "chatgpt5.1", "gpt51"}:
module = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, embed_dim),
nn.SiLU(),
nn.Linear(embed_dim, embed_dim),
)
elif normalized in {"code", "code ia", "coder"}:
module = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, embed_dim),
)
elif normalized in {"critic", "mirror", "refiner"}:
module = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, embed_dim * 3 // 2),
nn.GELU(),
nn.Linear(embed_dim * 3 // 2, embed_dim),
nn.LayerNorm(embed_dim),
)
elif normalized in {"identity", "none"}:
module = nn.Identity()
else:
raise ValueError(f"Agente IA desconhecido: {name}")
module.agent_name = name
return module
def _build_custom_agent(agent_def: Dict[str, object], embed_dim: int) -> nn.Module:
if "style" in agent_def:
module = build_builtin_agent(str(agent_def["style"]), embed_dim)
module.agent_name = str(agent_def.get("name", agent_def["style"]))
return module
agent_type = str(agent_def.get("type", "mlp")).lower()
hidden = int(agent_def.get("hidden", embed_dim * 2))
dropout = float(agent_def.get("dropout", 0.0))
if agent_type == "mlp":
layers: List[nn.Module] = [
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, hidden),
nn.GELU(),
]
if dropout > 0:
layers.append(nn.Dropout(dropout))
layers.append(nn.Linear(hidden, embed_dim))
module = nn.Sequential(*layers)
elif agent_type == "linear":
module = nn.Sequential(nn.LayerNorm(embed_dim), nn.Linear(embed_dim, embed_dim))
else:
raise ValueError(f"Tipo de agente custom '{agent_type}' não suportado")
module.agent_name = str(agent_def.get("name", agent_type))
return module
def load_ia_config_file(path: str, embed_dim: int) -> Dict[str, object]:
data = json.loads(Path(path).read_text(encoding="utf-8"))
stage_entries = data.get("stages", {})
if not isinstance(stage_entries, dict):
raise ValueError("Campo 'stages' do arquivo IA precisa ser um objeto mapeando estágio→streams")
stage_config: Dict[str, Dict[str, nn.Module]] = {}
for stage, mapping in stage_entries.items():
if not isinstance(mapping, dict):
raise ValueError(f"Estágio '{stage}' precisa mapear streams para agentes")
stage_config[stage] = {}
for stream_key, agent_def in mapping.items():
if isinstance(agent_def, str):
module = build_builtin_agent(agent_def, embed_dim)
elif isinstance(agent_def, dict):
module = _build_custom_agent(agent_def, embed_dim)
else:
raise ValueError(f"Agente inválido para estágio '{stage}' stream '{stream_key}'")
stage_config[stage][str(stream_key)] = module
stream_aliases = data.get("stream_aliases")
if stream_aliases is not None and (not isinstance(stream_aliases, list) or not all(isinstance(x, str) for x in stream_aliases)):
raise ValueError("'stream_aliases' deve ser uma lista de strings")
return {
"stream_aliases": stream_aliases,
"stage_config": stage_config,
"refinement_cycles": int(data.get("refinement_cycles", 0)),
"cycle_stage_name": str(data.get("cycle_stage_name", "cycle")),
}
# --------------------------
# Lorenz attractor generator (discrete)
# --------------------------
def lorenz_step(x, y, z, sigma=10.0, rho=28.0, beta=8/3, dt=0.01):
dx = sigma * (y - x)
dy = x * (rho - z) - y
dz = x * y - beta * z
xn = x + dx * dt
yn = y + dy * dt
zn = z + dz * dt
return xn, yn, zn
def lorenz_sequence(length, init=(0.1, 0.0, 0.0), sigma=10.0, rho=28.0, beta=8/3, dt=0.01):
x, y, z = init
seq = []
for _ in range(length):
x, y, z = lorenz_step(x, y, z, sigma, rho, beta, dt)
seq.append((x, y, z))
return np.array(seq) # shape (length, 3)
# --------------------------
# Rössler attractor (alternative chaos source)
# --------------------------
def rossler_step(x, y, z, a=0.2, b=0.2, c=5.7, dt=0.01):
dx = -y - z
dy = x + a * y
dz = b + z * (x - c)
xn = x + dx * dt
yn = y + dy * dt
zn = z + dz * dt
return xn, yn, zn
def rossler_sequence(length, init=(0.1, 0.0, 0.0), a=0.2, b=0.2, c=5.7, dt=0.01):
x, y, z = init
seq = []
for _ in range(length):
x, y, z = rossler_step(x, y, z, a, b, c, dt)
seq.append((x, y, z))
return np.array(seq)
# --------------------------
# Terminal Velocity Matching (Flow Matching inspired)
# --------------------------
def compute_terminal_velocity(embeddings: torch.Tensor, target_distribution: str = "gaussian") -> torch.Tensor:
"""
Calcula a velocidade terminal para mover embeddings em direção a uma distribuição alvo.
Inspirado em Flow Matching / Rectified Flow.
"""
batch_size, dim = embeddings.shape
if target_distribution == "gaussian":
# Alvo: Gaussiana isotrópica padrão
target = torch.randn_like(embeddings) * 0.1
elif target_distribution == "uniform_sphere":
# Alvo: superfície de esfera unitária
target = F.normalize(torch.randn_like(embeddings), dim=-1)
else:
target = torch.zeros_like(embeddings)
# Velocidade = direção do alvo - posição atual (normalizada)
velocity = target - embeddings
velocity = F.normalize(velocity, dim=-1) * embeddings.norm(dim=-1, keepdim=True) * 0.1
return velocity
# --------------------------
# Spectral Energy Regularization
# --------------------------
def spectral_energy_loss(embeddings: torch.Tensor, target_rank: int = 32) -> torch.Tensor:
"""
Penaliza energia concentrada em poucos componentes principais.
Força distribuição mais uniforme do espectro singular.
"""
if embeddings.shape[0] < 2:
return torch.tensor(0.0, device=embeddings.device)
centered = embeddings - embeddings.mean(dim=0, keepdim=True)
# SVD para obter valores singulares
try:
_, s, _ = torch.linalg.svd(centered, full_matrices=False)
except RuntimeError:
return torch.tensor(0.0, device=embeddings.device)
# Normaliza para distribuição de energia
s_normalized = s / (s.sum() + 1e-8)
# Entropia do espectro (queremos maximizar = distribuição uniforme)
spectral_entropy = -(s_normalized * (s_normalized + 1e-8).log()).sum()
# Penalidade: quanto menor a entropia, maior a penalidade
max_entropy = math.log(min(embeddings.shape[0], embeddings.shape[1]))
return (max_entropy - spectral_entropy) / max_entropy
# --------------------------
# Semantic Divergence Metrics
# --------------------------
def compute_angular_divergence(original: torch.Tensor, perturbed: torch.Tensor) -> float:
"""Calcula a divergência angular média entre vetores originais e perturbados."""
cos_sim = F.cosine_similarity(original, perturbed, dim=-1)
# Clamp para evitar problemas numéricos com acos
cos_sim = cos_sim.clamp(-1.0, 1.0)
angles = torch.acos(cos_sim)
return angles.mean().item()
def compute_semantic_entropy(logits: torch.Tensor, top_k: int = 10) -> float:
"""Calcula a entropia semântica da distribuição de probabilidade."""
probs = F.softmax(logits, dim=-1)
top_probs, _ = torch.topk(probs, min(top_k, probs.shape[-1]), dim=-1)
# Renormaliza top-k
top_probs = top_probs / (top_probs.sum(dim=-1, keepdim=True) + 1e-8)
entropy = -(top_probs * (top_probs + 1e-8).log()).sum(dim=-1)
return entropy.mean().item()
# --------------------------
# SIGReg-like regularizer (encourage isotropic Gaussian embeddings)
# --------------------------
def sigreg_loss(embeddings):
# embeddings: (B, D)
mu = embeddings.mean(dim=0) # (D,)
centered = embeddings - mu.unsqueeze(0) # (B, D)
# empirical covariance (D x D) approx via (centered^T center)
B = embeddings.shape[0]
cov = (centered.t() @ centered) / (B - 1 + 1e-8) # (D, D)
# penalty = norm(cov - I) + norm(mu)
D = embeddings.shape[1]
eye = torch.eye(D, device=embeddings.device)
cov_pen = F.mse_loss(cov, eye)
mu_pen = (mu.pow(2)).mean()
return cov_pen + mu_pen
class VortexBetinaAntiHalluc(nn.Module):
def __init__(
self,
embed_dim: int = 256,
vortex_steps: int = 10,
vortex_dt: float = 0.02,
num_streams: int = 3,
enable_rotation: bool = True,
rotation_angle: float = math.pi / 4,
rotation_threshold: float = 1e-4,
rotation_clockwise: bool = False,
enable_quadratic_reflection: bool = False,
quadratic_boundary: float = 0.3,
quadratic_strength: float = 0.5,
boost_small_deltas: bool = True,
delta_gain: float = 1.5,
reflection_push: float = 0.25,
stream_aliases: Optional[List[str]] = None,
ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None,
refinement_cycles: int = 0,
cycle_stage_name: str = "cycle",
enforce_square_geometry: bool = True,
square_rotation_degrees: float = 180.0,
square_leak_ratio: float = 0.05,
square_jitter_std_degrees: float = 0.0,
enable_lorentz_transform: bool = False,
lorentz_beta: float = 0.6,
lorentz_axis_stream: int = 0,
enable_triangle: bool = True,
triangle_hidden_dim: int = 512,
triangle_max_iters: int = 5,
triangle_tol: float = 1e-4,
triangle_delta_gain: float = 1.0,
):
super().__init__()
self.embed_dim = embed_dim
self.vortex_steps = vortex_steps
self.vortex_dt = vortex_dt
self.num_streams = max(1, num_streams)
self.enable_rotation = enable_rotation
self.rotation_angle = rotation_angle
self.rotation_threshold = rotation_threshold
self.rotation_clockwise = rotation_clockwise
self.enable_quadratic_reflection = enable_quadratic_reflection
self.quadratic_boundary = nn.Parameter(torch.tensor(float(quadratic_boundary)))
self.quadratic_strength = float(min(1.0, max(0.0, quadratic_strength)))
self.boost_small_deltas = boost_small_deltas
self.delta_gain = max(1.0, delta_gain)
self.reflection_push = max(0.0, reflection_push)
self.stream_aliases = self._prepare_stream_aliases(stream_aliases)
self.refinement_cycles = max(0, refinement_cycles)
self.cycle_stage_name = cycle_stage_name or "cycle"
self.enforce_square_geometry = enforce_square_geometry
self.square_rotation_radians = math.radians(square_rotation_degrees)
self.square_leak_ratio = float(min(1.0, max(0.0, square_leak_ratio)))
self.square_jitter_std = math.radians(max(0.0, square_jitter_std_degrees))
beta = max(-0.999, min(0.999, lorentz_beta))
self.enable_lorentz_transform = enable_lorentz_transform
self.lorentz_beta = beta
self.lorentz_axis_stream = max(0, lorentz_axis_stream)
self.enable_triangle = enable_triangle
self.ia_router = MultiIntelligenceRouter(
num_streams=self.num_streams,
stage_config=ia_stage_config,
stream_aliases=self.stream_aliases,
)
self.triangle_module = (
SyntheticNeuronTriangle(
embed_dim,
self.num_streams,
hidden_dim=triangle_hidden_dim,
max_iters=triangle_max_iters,
tol=triangle_tol,
delta_gain=triangle_delta_gain,
)
if self.enable_triangle
else None
)
self.projector = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, embed_dim),
)
nn.init.kaiming_normal_(self.projector[1].weight, mode="fan_out", nonlinearity="relu")
nn.init.constant_(self.projector[1].bias, 0.1)
# Vortex Dynamics Parameters (Chaos Injection)
self.vortex_linear = nn.Parameter(torch.randn(3, embed_dim) * 0.1)
self.vortex_scale = nn.Parameter(torch.ones(embed_dim))
# Terminal Velocity Matching parameters
self.velocity_gate = nn.Parameter(torch.zeros(embed_dim))
self.velocity_bias = nn.Parameter(torch.zeros(embed_dim))
# Adaptive chaos parameters
self.chaos_temperature = nn.Parameter(torch.tensor(1.0))
self.chaos_gate = nn.Parameter(torch.tensor(0.0))
self.attractor_selector = nn.Parameter(torch.tensor([0.7, 0.3])) # [lorenz, rossler]
self.last_flow_states: Dict[str, object] = {}
def _prepare_stream_aliases(self, provided: Optional[List[str]]) -> List[str]:
if provided:
aliases = list(provided)
else:
aliases = ["X", "Y", "Z", "W", "V", "U", "T", "S"]
if len(aliases) < self.num_streams:
aliases.extend(f"S{idx}" for idx in range(len(aliases), self.num_streams))
return aliases[: self.num_streams]
def _rotate_matrix_plane(self, tensor: torch.Tensor, radians: float) -> torch.Tensor:
if tensor.shape[-1] < 2:
return tensor
angle = radians % (2 * math.pi)
if abs(angle) < 1e-9:
return tensor
rotated = tensor.clone()
cos_theta = math.cos(angle)
sin_theta = math.sin(angle)
x = tensor[..., 0]
y = tensor[..., 1]
rotated[..., 0] = cos_theta * x - sin_theta * y
rotated[..., 1] = sin_theta * x + cos_theta * y
return rotated
def _rotate_matrix_square(self, tensor: torch.Tensor) -> Tuple[torch.Tensor, float]:
base_angle = abs(self.square_rotation_radians) % (2 * math.pi)
if base_angle > math.pi:
base_angle = 2 * math.pi - base_angle
jitter = 0.0
if self.square_jitter_std > 0:
jitter = torch.randn(1, device=tensor.device).item() * self.square_jitter_std
angle = max(0.0, min(math.pi, base_angle + jitter))
if math.isclose(angle, 0.0, rel_tol=1e-6, abs_tol=1e-6):
mirrored = tensor.clone()
else:
intensity = angle / math.pi
mirrored = torch.lerp(tensor, -tensor, intensity)
if self.square_leak_ratio > 0:
mirrored = torch.lerp(mirrored, tensor, self.square_leak_ratio)
return mirrored, angle
def _select_axis_stream(self, tensor: torch.Tensor) -> torch.Tensor:
stream_idx = min(self.lorentz_axis_stream, tensor.size(1) - 1)
return tensor[:, stream_idx, :]
def _lorentz_boost(
self,
spatial: torch.Tensor,
reference_stream: torch.Tensor,
time_like: torch.Tensor,
) -> Tuple[torch.Tensor, float, torch.Tensor]:
beta = self.lorentz_beta
gamma = 1.0 / math.sqrt(max(1e-8, 1.0 - beta**2))
axis = F.normalize(reference_stream, dim=-1, eps=1e-6)
parallel_scalar = torch.sum(spatial * axis, dim=-1, keepdim=True)
parallel_vec = parallel_scalar * axis
perpendicular_vec = spatial - parallel_vec
t_prime = gamma * (time_like - beta * parallel_scalar)
parallel_prime = gamma * (parallel_scalar - beta * time_like) * axis
updated_spatial = parallel_prime + perpendicular_vec
return updated_spatial, gamma, t_prime.abs()
def invert(self, vec: torch.Tensor) -> torch.Tensor:
return -vec
def intersection_knowledge(self, vec: torch.Tensor, base_k: torch.Tensor) -> torch.Tensor:
dot = torch.sum(vec * base_k, dim=-1, keepdim=True)
norm_k = torch.norm(base_k, dim=-1, keepdim=True).pow(2).clamp_min(1e-8)
return (dot / norm_k) * base_k
def euler_vortex(self, state: torch.Tensor) -> torch.Tensor:
sigma, rho, beta, gamma = 10.0, 28.0, 8.0 / 3.0, 1.0
feature_dim = state.shape[-1] - 1
base = state[..., :-1]
w = state[..., -1:]
updated_base = torch.zeros_like(base)
for i in range(0, feature_dim, 3):
chunk = base[..., i : i + 3]
if chunk.shape[-1] < 3:
chunk = F.pad(chunk, (0, 3 - chunk.shape[-1]))
chunk = chunk.clone()
for _ in range(self.vortex_steps):
x = chunk[..., 0]
y = chunk[..., 1]
z = chunk[..., 2]
dx = sigma * (y - x) * self.vortex_dt
dy = (x * (rho - z) - y) * self.vortex_dt
dz = (x * y - beta * z) * self.vortex_dt
chunk = chunk + torch.stack([dx, dy, dz], dim=-1)
span = min(3, feature_dim - i)
updated_base[..., i : i + span] = chunk[..., :span]
energy = torch.norm(base, dim=-1, keepdim=True).pow(2)
w_iter = w.clone()
for _ in range(self.vortex_steps):
dw = (gamma * energy - w_iter) * self.vortex_dt
w_iter = w_iter + dw
updated = state.clone()
updated[..., :-1] = updated_base
updated[..., -1:] = w_iter
return updated
def rotate_difference(self, delta: torch.Tensor, anchor: torch.Tensor) -> torch.Tensor: # noqa: ARG002
# Rotation now confined to the (x, y) plane while keeping z (and higher dims) untouched.
if delta.shape[-1] < 2:
return delta
angle = -abs(self.rotation_angle) if self.rotation_clockwise else abs(self.rotation_angle)
cos_theta = math.cos(angle)
sin_theta = math.sin(angle)
rotated = delta.clone()
x = delta[..., 0]
y = delta[..., 1]
rotated[..., 0] = cos_theta * x - sin_theta * y
rotated[..., 1] = sin_theta * x + cos_theta * y
if delta.shape[-1] >= 3:
rotated[..., 2] = delta[..., 2] # keep z static per vortex requirement
return rotated
def _get_quadratic_boundary(self) -> torch.Tensor:
return self.quadratic_boundary.abs().clamp(0.05, 0.5)
def quadratic_reflection(self, delta: torch.Tensor) -> Tuple[torch.Tensor, float]:
"""Reflect components between ±boundary and warp them quadratically (Pong-like bounce)."""
if not self.enable_quadratic_reflection:
return delta, 0.0
boundary = self._get_quadratic_boundary()
period = 2.0 * boundary
magnitude = delta.abs()
direction = torch.sign(delta)
wrapped = torch.remainder(magnitude, period)
mirrored = torch.where(wrapped > boundary, period - wrapped, wrapped)
normalized = (mirrored / boundary).clamp(0.0, 1.0)
squared = normalized.pow(2)
bounced = direction * boundary * squared
blended = torch.lerp(delta, bounced, self.quadratic_strength)
bounce_ratio = (magnitude > boundary).float().mean().item()
return blended, bounce_ratio
def forward(self, input_vec: torch.Tensor, chaos_factor: float = 1.0):
"""
chaos_factor: Multiplicador de agressividade (1.0 = normal, 5.0 = agressivo, 10.0 = insano)
"""
if input_vec.dim() == 2:
batch = input_vec.size(0)
mat_input = input_vec.unsqueeze(1).expand(batch, self.num_streams, self.embed_dim)
elif input_vec.dim() == 3:
batch, streams, dim = input_vec.shape
if streams != self.num_streams or dim != self.embed_dim:
raise ValueError(
f"Esperava tensor (batch,{self.num_streams},{self.embed_dim}), recebi {input_vec.shape}"
)
mat_input = input_vec
else:
raise ValueError("input_vec precisa ter 2 ou 3 dimensões")
mat_flat = mat_input.reshape(-1, self.embed_dim)
mat_primary = self.projector(mat_flat).reshape(-1, self.num_streams, self.embed_dim)
stage_signatures: Dict[str, str] = {}
mat_primary = self.ia_router.apply("base", mat_primary)
stage_signatures["base"] = self.ia_router.stage_signature("base")
mirrored_primary: Optional[torch.Tensor] = None
square_tensor: Optional[torch.Tensor] = None
square_angle_applied: Optional[float] = None
if self.enforce_square_geometry:
mirrored_primary, square_angle_applied = self._rotate_matrix_square(mat_primary)
square_tensor = torch.stack([mat_primary, mirrored_primary], dim=2)
mat_secondary = mirrored_primary
else:
mat_secondary = self.invert(mat_primary)
mat_secondary = self.ia_router.apply("inversion", mat_secondary)
stage_signatures["inversion"] = self.ia_router.stage_signature("inversion")
base_k = mat_primary.mean(dim=1, keepdim=True) + 1e-4 * torch.randn_like(mat_primary[:, :1, :])
inter_primary = self.intersection_knowledge(mat_primary, base_k)
inter_secondary = self.intersection_knowledge(mat_secondary, base_k)
approx_inter_full = 0.5 * (inter_primary + inter_secondary)
approx_inter = approx_inter_full.mean(dim=1)
flow_states: Dict[str, object] = {}
flow_states["matrix_primary"] = mat_primary
flow_states["matrix_secondary"] = mat_secondary
flow_states["base_core"] = base_k.squeeze(1)
if mirrored_primary is not None:
flow_states["matrix_rot_180"] = mirrored_primary
if square_tensor is not None:
flow_states["matrix_square"] = square_tensor
flow_states["square_leak_ratio"] = self.square_leak_ratio
if square_angle_applied is not None:
flow_states["square_angle"] = square_angle_applied
triangle_debug: Dict[str, object] = {}
if self.triangle_module is not None:
delta_green, triangle_debug = self.triangle_module(mat_primary, mat_secondary, base_k)
else:
delta_green = inter_primary - base_k
flow_states["xy_bridge_matrix"] = delta_green
if triangle_debug:
flow_states["triangle_axis"] = triangle_debug.get("axis")
flow_states["triangle_diag"] = triangle_debug.get("diag")
flow_states["triangle_iterations"] = triangle_debug.get("iterations", 0)
flow_states["triangle_residual"] = triangle_debug.get("residual", 0.0)
delta_norm_stream = torch.norm(delta_green, dim=-1, keepdim=True)
if self.boost_small_deltas:
boost_mask = delta_norm_stream < self.rotation_threshold
if boost_mask.any():
safe_norm = delta_norm_stream.clamp_min(1e-6)
factor = (self.rotation_threshold / safe_norm) * self.delta_gain
delta_green = torch.where(boost_mask, delta_green * factor, delta_green)
boost_ratio = boost_mask.float().mean().item()
else:
boost_ratio = 0.0
flow_states["matrix_green_boost"] = delta_green
if self.enable_rotation:
delta_flat = delta_green.reshape(-1, self.embed_dim)
base_flat = base_k.expand(-1, self.num_streams, -1).reshape(-1, self.embed_dim)
delta_norm = torch.norm(delta_flat, dim=-1, keepdim=True)
rotation_mask = delta_norm > self.rotation_threshold
if rotation_mask.any():
rotated = self.rotate_difference(delta_flat, base_flat)
delta_flat = torch.where(rotation_mask, rotated, delta_flat)
delta_green = delta_flat.view(-1, self.num_streams, self.embed_dim)
rotation_ratio = rotation_mask.float().mean().item()
else:
rotation_ratio = 0.0
flow_states["matrix_green_rot"] = delta_green
boundary_val = self._get_quadratic_boundary()
flow_states["quadratic_boundary"] = boundary_val.detach().item()
if self.enable_quadratic_reflection and self.reflection_push > 0:
norm_after_rot = torch.norm(delta_green, dim=-1, keepdim=True)
push_mask = norm_after_rot < boundary_val
if push_mask.any():
push_factor = 1.0 + self.reflection_push
delta_green = torch.where(push_mask, delta_green * push_factor, delta_green)
pre_reflect_push_ratio = push_mask.float().mean().item()
else:
pre_reflect_push_ratio = 0.0
if self.enable_quadratic_reflection:
delta_green, reflection_ratio = self.quadratic_reflection(delta_green)
else:
reflection_ratio = 0.0
flow_states["matrix_green_reflect"] = delta_green
mirror_reference = self.intersection_knowledge(mat_secondary, base_k)
matrix_black = 0.5 * (2 * base_k - delta_green + mirror_reference)
matrix_black = self.ia_router.apply("mirror", matrix_black)
stage_signatures["mirror"] = self.ia_router.stage_signature("mirror")
if self.enforce_square_geometry:
x_stream = mat_primary[:, 0, :]
x_output = mat_secondary[:, 0, :]
matrix_black[:, 0, :] = x_output
flow_states["square_input"] = x_stream
flow_states["square_output"] = x_output
flow_states["matrix_black_square"] = matrix_black
flow_states["matrix_black"] = matrix_black
delta_inter = matrix_black.mean(dim=1)
cycle_signatures: List[str] = []
if self.refinement_cycles > 0:
refined_delta = delta_inter
for cycle_idx in range(self.refinement_cycles):
refined_delta = self.ia_router.apply(
self.cycle_stage_name,
refined_delta,
cycle_idx=cycle_idx,
)
cycle_signatures.append(self.ia_router.stage_signature(self.cycle_stage_name))
delta_inter = refined_delta
stage_signatures[self.cycle_stage_name] = cycle_signatures[-1] if cycle_signatures else "identity"
flow_states["delta_pre_lorentz"] = delta_inter
flow_states["ia_cycle_signatures"] = cycle_signatures
w = torch.norm(delta_inter, dim=-1, keepdim=True)
lorentz_gamma = 1.0
if self.enable_lorentz_transform:
reference_stream = self._select_axis_stream(mat_primary)
delta_inter, lorentz_gamma, w = self._lorentz_boost(delta_inter, reference_stream, w)
flow_states["lorentz_reference"] = reference_stream
flow_states["delta_lorentz"] = delta_inter
flow_states["lorentz_gamma"] = lorentz_gamma
flow_states["lorentz_time"] = w
delta_before_chaos = delta_inter
# APLICAÇÃO DO FATOR CAOS (OVERDRIVE) - VORTEX DYNAMICS V2
if chaos_factor != 1.0:
batch_size = delta_inter.shape[0]
# 1. Generate chaos sequences from both attractors
lor_seq_np = lorenz_sequence(batch_size, init=(0.1, 0.0, 0.0))
ros_seq_np = rossler_sequence(batch_size, init=(0.1, 0.0, 0.0))
# Normalize sequences
lor_seq_np = (lor_seq_np - lor_seq_np.mean(axis=0)) / (lor_seq_np.std(axis=0) + 1e-8)
ros_seq_np = (ros_seq_np - ros_seq_np.mean(axis=0)) / (ros_seq_np.std(axis=0) + 1e-8)
lor_tensor = torch.tensor(lor_seq_np, dtype=delta_inter.dtype, device=delta_inter.device)
ros_tensor = torch.tensor(ros_seq_np, dtype=delta_inter.dtype, device=delta_inter.device)
# 2. Adaptive attractor mixing (learnable weights)
attractor_weights = F.softmax(self.attractor_selector, dim=0)
mixed_chaos = attractor_weights[0] * lor_tensor + attractor_weights[1] * ros_tensor
# 3. Map 3D Chaos -> Embedding Dimension
perturb = mixed_chaos @ self.vortex_linear
perturb = perturb * self.vortex_scale
# 4. Terminal Velocity Matching (Flow-like correction)
velocity = compute_terminal_velocity(delta_inter, target_distribution="gaussian")
gated_velocity = velocity * torch.sigmoid(self.velocity_gate) + self.velocity_bias
# 5. Normalize perturbation magnitude
emb_norm = delta_inter.norm(dim=1, keepdim=True) + 1e-8
pert_norm = perturb.norm(dim=1, keepdim=True) + 1e-8
normalized_perturb = perturb * (emb_norm / pert_norm)
# 6. Adaptive temperature scaling + learned gate
chaos_scale = torch.sigmoid(self.chaos_temperature + self.chaos_gate)
effective_chaos = chaos_factor * chaos_scale
# Gate chaos intensity if semantic entropy explodes (uncertain corrections)
delta_entropy = compute_semantic_entropy(delta_inter.unsqueeze(1))
chaos_entropy_gate = 1.0 if delta_entropy < 1.0 else 0.5
effective_chaos = effective_chaos * chaos_entropy_gate
# 7. Apply combined perturbation: chaos + velocity flow
delta_inter = delta_inter + effective_chaos * normalized_perturb + 0.1 * gated_velocity
# Store chaos metrics
flow_states["attractor_mix"] = attractor_weights.detach().cpu().tolist()
flow_states["effective_chaos"] = effective_chaos.item()
flow_states["chaos_scale"] = chaos_scale.item()
flow_states["chaos_entropy_gate"] = chaos_entropy_gate
flow_states["angular_divergence"] = compute_angular_divergence(delta_before_chaos, delta_inter)
# Gain boost to increase boundary hits when chaos is controlled
delta_inter = delta_inter * 1.5
flow_states["delta_output"] = delta_inter
state = torch.cat([approx_inter, delta_inter, w], dim=-1)
evolved = self.euler_vortex(state)
flow_states["output_corner"] = evolved[..., :-1]
flow_states["ia_stage_logs"] = self.ia_router.describe_all_stages()
self.last_flow_states = flow_states
overlap = F.cosine_similarity(inter_primary.view(-1, self.embed_dim), inter_secondary.view(-1, self.embed_dim), dim=-1)
overlap = overlap.view(-1, self.num_streams).mean(dim=1)
annulation = torch.norm(mat_primary - (-mat_secondary), dim=-1).pow(2).mean(dim=1)
vortex_sink = evolved[..., -1]
hall_penalty = (1 - overlap).clamp_min(0.0)
approx_pull = (
torch.norm(mat_primary.mean(dim=1) - approx_inter, dim=-1).pow(2)
+ torch.norm(mat_secondary.mean(dim=1) - approx_inter, dim=-1).pow(2)
)
# SIGReg Regularization (Isotropic Gaussian Enforcement)
sig_loss = sigreg_loss(delta_inter)
# Spectral Energy Regularization (distribui energia uniformemente)
spectral_loss = spectral_energy_loss(delta_inter)
delta_probs = F.softmax(delta_inter, dim=-1)
semantic_entropy = -(delta_probs * (delta_probs + 1e-8).log()).sum(dim=-1).mean()
semantic_entropy_val = semantic_entropy.item()
boundary_val = self._get_quadratic_boundary()
boundary_reg = (0.2 - boundary_val).clamp_min(0.0).pow(2)
loss = (
annulation.mean()
+ 0.5 * hall_penalty.mean()
+ 0.25 * approx_pull.mean()
- 0.1 * vortex_sink.mean()
+ 0.05 * sig_loss
+ 0.02 * spectral_loss # Força distribuição espectral uniforme
+ 0.02 * boundary_reg
+ 0.05 * semantic_entropy
)
metrics = {
"annulation": annulation.mean().item(),
"cosine_overlap": overlap.mean().item(),
"vortex_energy": vortex_sink.mean().item(),
"sigreg_loss": sig_loss.item(),
"spectral_loss": spectral_loss.item() if isinstance(spectral_loss, torch.Tensor) else spectral_loss,
"boundary_reg": boundary_reg.mean().item() if isinstance(boundary_reg, torch.Tensor) else boundary_reg,
"rotation_ratio": rotation_ratio,
"approx_alignment": approx_pull.mean().item(),
"reflection_ratio": reflection_ratio,
"reflect_ratio": reflection_ratio,
"boost_ratio": boost_ratio,
"reflection_push_ratio": pre_reflect_push_ratio,
"ia_base": stage_signatures.get("base", "identity"),
"ia_inversion": stage_signatures.get("inversion", "identity"),
"ia_mirror": stage_signatures.get("mirror", "identity"),
"ia_cycle": " || ".join(cycle_signatures) if cycle_signatures else stage_signatures.get(self.cycle_stage_name, "identity"),
"lorentz_gamma": lorentz_gamma,
"square_angle_deg": math.degrees(square_angle_applied) if square_angle_applied is not None else 0.0,
"square_leak_ratio": self.square_leak_ratio,
"angular_divergence": flow_states.get("angular_divergence", 0.0),
"attractor_mix": flow_states.get("attractor_mix", [1.0, 0.0]),
"effective_chaos": flow_states.get("effective_chaos", 1.0),
"semantic_entropy_approx": semantic_entropy_val,
"triangle_iters": triangle_debug.get("iterations", 0) if triangle_debug else 0,
"triangle_residual": triangle_debug.get("residual", 0.0) if triangle_debug else 0.0,
}
delta_norm = torch.norm(delta_inter, dim=-1)
metrics["delta_norm_mean"] = delta_norm.mean().item()
metrics["delta_norm_max"] = delta_norm.max().item()
return evolved, loss, metrics, delta_inter
class PortugueseSentenceDataset(Dataset):
def __init__(
self,
sentences: List[str],
tokenizer: AutoTokenizer,
embedding_model: SentenceTransformer,
mask_prob: float = 0.15,
max_seq_length: int = 512,
precompute_embeddings: bool = False,
embedding_batch_size: int = 64,
):
self.sentences = sentences
self.tokenizer = tokenizer
self.embedding_model = embedding_model
self.mask_prob = mask_prob
self.max_seq_length = max_seq_length
self.precompute_embeddings = precompute_embeddings
self.embedding_batch_size = max(1, embedding_batch_size)
self.embeddings: Optional[torch.Tensor] = None
self._embedding_cache: Dict[int, torch.Tensor] = {}
if self.precompute_embeddings:
self._precompute_all_embeddings()
def __len__(self) -> int:
return len(self.sentences)
def _mask_tokens(self, input_ids: torch.Tensor, special_mask: torch.Tensor) -> Dict[str, torch.Tensor]:
labels = input_ids.clone()
probability_matrix = torch.full(labels.shape, self.mask_prob)
probability_matrix.masked_fill_(special_mask.bool(), 0.0)
masked_indices = torch.bernoulli(probability_matrix).bool()
if not masked_indices.any():
candidate_positions = (~special_mask.bool()).nonzero(as_tuple=False).view(-1)
choice = candidate_positions[torch.randint(0, candidate_positions.numel(), (1,)).item()]
masked_indices[choice] = True
labels[~masked_indices] = -100
input_ids = input_ids.clone()
input_ids[masked_indices] = self.tokenizer.mask_token_id
return {"input_ids": input_ids, "labels": labels}
def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
sentence = self.sentences[idx]
encoding = self.tokenizer(
sentence,
return_tensors="pt",
return_special_tokens_mask=True,
truncation=True,
max_length=self.max_seq_length,
)
input_ids = encoding["input_ids"].squeeze(0)
attention_mask = encoding["attention_mask"].squeeze(0)
special_mask = encoding["special_tokens_mask"].squeeze(0)
masked = self._mask_tokens(input_ids, special_mask)
embedding = self._get_embedding(idx)
return {
"input_ids": masked["input_ids"],
"attention_mask": attention_mask,
"labels": masked["labels"],
"embedding": embedding,
}
@torch.no_grad()
def _precompute_all_embeddings(self) -> None:
chunks: List[torch.Tensor] = []
total = len(self.sentences)
for start in range(0, total, self.embedding_batch_size):
batch = self.sentences[start : start + self.embedding_batch_size]
batch_embeds = self.embedding_model.encode(
batch,
convert_to_tensor=True,
show_progress_bar=False,
batch_size=self.embedding_batch_size,
)
chunks.append(batch_embeds.float().cpu())
self.embeddings = torch.cat(chunks, dim=0) if chunks else torch.empty(0)
@torch.no_grad()
def _compute_single_embedding(self, sentence: str) -> torch.Tensor:
embed = self.embedding_model.encode(
sentence,
convert_to_tensor=True,
show_progress_bar=False,
batch_size=1,
)
return embed.float().cpu()
def _get_embedding(self, idx: int) -> torch.Tensor:
if self.embeddings is not None:
return self.embeddings[idx]
if idx not in self._embedding_cache:
self._embedding_cache[idx] = self._compute_single_embedding(self.sentences[idx])
return self._embedding_cache[idx]
def build_collate_fn(tokenizer: AutoTokenizer):
pad_id = tokenizer.pad_token_id
def collate(batch: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]:
input_ids = pad_sequence([item["input_ids"] for item in batch], batch_first=True, padding_value=pad_id)
attention_mask = pad_sequence([item["attention_mask"] for item in batch], batch_first=True, padding_value=0)
labels = pad_sequence([item["labels"] for item in batch], batch_first=True, padding_value=-100)
embeddings = torch.stack([item["embedding"] for item in batch])
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels,
"embedding": embeddings,
}
return collate
class BetinaTrainer:
def __init__(
self,
vortex: VortexBetinaAntiHalluc,
tokenizer: AutoTokenizer,
embedding_model: SentenceTransformer,
mlm_model: AutoModelForMaskedLM,
raw_embedding_dim: int,
embed_dim: int,
lambda_vortex: float = 0.5,
learning_rate: float = 1e-4,
mlm_learning_rate: float = 5e-5,
freeze_mlm: bool = False,
freeze_projectors: bool = False,
correction_max_norm: float | None = None,
chaos_factor: float = 1.0,
eval_chaos_factor: float = 1.0,
device: torch.device | None = None,
max_seq_length: int = 512,
):
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.vortex = vortex.to(self.device)
self.tokenizer = tokenizer
self.embedding_model = embedding_model
self.mlm_model = mlm_model.to(self.device)
self.lambda_vortex = lambda_vortex
self.chaos_factor = chaos_factor
self.eval_chaos_factor = eval_chaos_factor
self.embedding_projector = nn.Linear(raw_embedding_dim, embed_dim).to(self.device)
hidden_size = self.mlm_model.config.hidden_size
self.correction_projector = nn.Linear(embed_dim, hidden_size).to(self.device)
self.freeze_projectors = freeze_projectors
self.correction_max_norm = correction_max_norm if correction_max_norm and correction_max_norm > 0 else None
self.max_seq_length = max_seq_length
mlm_params = list(self.mlm_model.parameters())
if freeze_mlm:
for param in mlm_params:
param.requires_grad = False
projector_params = list(self.embedding_projector.parameters()) + list(self.correction_projector.parameters())
if freeze_projectors:
for param in projector_params:
param.requires_grad = False
trainable_projectors = [] if freeze_projectors else projector_params
vortex_params = list(self.vortex.parameters()) + trainable_projectors
optimizer_groups = [
{"params": vortex_params, "lr": learning_rate},
]
if not freeze_mlm:
optimizer_groups.append({"params": mlm_params, "lr": mlm_learning_rate})
self.optimizer = optim.AdamW(optimizer_groups)
self.freeze_mlm = freeze_mlm
self.scaler = betina_grad_scaler(self.device.type, enabled=self.device.type == "cuda")
def _project_correction(self, delta: torch.Tensor) -> torch.Tensor:
correction = self.correction_projector(delta)
if self.correction_max_norm is not None:
dim = correction.dim() - 1 if correction.dim() > 0 else 0
correction = correction.renorm(p=2, dim=dim, maxnorm=self.correction_max_norm)
return correction
def train(self, dataloader: DataLoader, epochs: int = 5, grad_clip: float = 1.0) -> List[Dict[str, float]]:
history: List[Dict[str, float]] = []
self.vortex.train()
self.mlm_model.train()
for epoch in range(epochs):
for step, batch in enumerate(dataloader):
input_ids = batch["input_ids"].to(self.device)
attention_mask = batch["attention_mask"].to(self.device)
labels = batch["labels"].to(self.device)
embeds = batch["embedding"].to(self.device)
self.optimizer.zero_grad(set_to_none=True)
with betina_autocast(self.device.type, enabled=self.device.type == "cuda"):
projected = self.embedding_projector(embeds)
_, vortex_loss, metrics, delta = self.vortex(projected, chaos_factor=self.chaos_factor)
if self.freeze_mlm:
with torch.no_grad():
outputs = self.mlm_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
return_dict=True,
)
else:
outputs = self.mlm_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
return_dict=True,
)
hidden = outputs.hidden_states[-1]
correction = self._project_correction(delta).unsqueeze(1)
attention_mask_f = attention_mask.unsqueeze(-1).float()
mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float()
weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus
corrected_hidden = hidden + correction * weight_mask
if hasattr(self.mlm_model, "cls"):
logits = self.mlm_model.cls(corrected_hidden)
else:
logits = self.mlm_model.get_output_embeddings()(corrected_hidden)
mask_positions = labels != -100
if mask_positions.any():
mlm_loss = F.cross_entropy(logits[mask_positions], labels[mask_positions])
else:
mlm_loss = torch.zeros(1, device=self.device)
total_loss = mlm_loss + self.lambda_vortex * vortex_loss
self.scaler.scale(total_loss).backward()
torch.nn.utils.clip_grad_norm_(self.parameters(), grad_clip)
self.scaler.step(self.optimizer)
self.scaler.update()
perplexity = torch.exp(mlm_loss.detach())
record = {
"epoch": epoch + 1,
"step": step + 1,
"mlm_loss": mlm_loss.detach().item(),
"vortex_loss": vortex_loss.detach().item(),
"total_loss": total_loss.detach().item(),
"perplexity": perplexity.item(),
"vortex_energy": metrics["vortex_energy"],
"cosine_overlap": metrics["cosine_overlap"],
"rotation_ratio": metrics["rotation_ratio"],
"approx_alignment": metrics["approx_alignment"],
"reflection_ratio": metrics["reflection_ratio"],
"boost_ratio": metrics.get("boost_ratio", 0.0),
"reflection_push_ratio": metrics.get("reflection_push_ratio", 0.0),
}
history.append(record)
if step % 10 == 0:
print(
f"Epoch {record['epoch']:03d} Step {record['step']:04d} | "
f"Total {record['total_loss']:.4f} | MLM {record['mlm_loss']:.4f} | "
f"PPL {record['perplexity']:.4f} | "
f"Vortex {record['vortex_loss']:.4f} | Overlap {record['cosine_overlap']:.4f} | "
f"Energy {record['vortex_energy']:.4f} | Rotation {record['rotation_ratio']:.3f} | "
f"Reflect {record['reflection_ratio']:.3f} | Boost {record['boost_ratio']:.3f} | "
f"PreReflect {record['reflection_push_ratio']:.3f} | Approx {record['approx_alignment']:.4f}"
)
return history
def parameters(self):
for module in (self.vortex, self.embedding_projector, self.correction_projector, self.mlm_model):
for param in module.parameters():
if param.requires_grad:
yield param
@torch.no_grad()
def evaluate_perplexity(self, dataloader: DataLoader, apply_correction: bool = True) -> float:
self.vortex.eval()
self.mlm_model.eval()
total_loss = 0.0
total_tokens = 0
for batch in dataloader:
input_ids = batch["input_ids"].to(self.device)
attention_mask = batch["attention_mask"].to(self.device)
labels = batch["labels"].to(self.device)
embeds = batch["embedding"].to(self.device)
outputs = self.mlm_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
return_dict=True,
)
hidden = outputs.hidden_states[-1]
if apply_correction:
projected = self.embedding_projector(embeds)
_, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor)
correction = self._project_correction(delta).unsqueeze(1)
attention_mask_f = attention_mask.unsqueeze(-1).float()
mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float()
weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus
hidden = hidden + correction * weight_mask
if hasattr(self.mlm_model, "cls"):
logits = self.mlm_model.cls(hidden)
else:
logits = self.mlm_model.get_output_embeddings()(hidden)
mask_positions = labels != -100
if mask_positions.any():
loss = F.cross_entropy(logits[mask_positions], labels[mask_positions], reduction="sum")
total_loss += loss.item()
total_tokens += mask_positions.sum().item()
if total_tokens == 0:
return float("inf")
return math.exp(total_loss / total_tokens)
@torch.no_grad()
def save(self, output_dir: str) -> None:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
torch.save(self.vortex.state_dict(), output_path / "vortex.pt")
torch.save(self.embedding_projector.state_dict(), output_path / "embedding_projector.pt")
torch.save(self.correction_projector.state_dict(), output_path / "correction_projector.pt")
self.mlm_model.save_pretrained(output_path / "mlm")
self.tokenizer.save_pretrained(output_path / "mlm")
@torch.no_grad()
def fill_masks(
self,
texts: List[str],
top_k: int = 5,
apply_correction: bool = True,
) -> List[List[List[Tuple[str, float]]]]:
self.vortex.eval()
self.mlm_model.eval()
encodings = self.tokenizer(
texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=self.max_seq_length,
)
input_ids = encodings["input_ids"].to(self.device)
attention_mask = encodings["attention_mask"].to(self.device)
outputs = self.mlm_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
return_dict=True,
)
hidden = outputs.hidden_states[-1]
if apply_correction:
embeds = self.embedding_model.encode(texts, convert_to_tensor=True, show_progress_bar=False).to(self.device)
projected = self.embedding_projector(embeds)
_, _, _, delta = self.vortex(projected, chaos_factor=self.eval_chaos_factor)
correction = self._project_correction(delta).unsqueeze(1)
attention_mask_f = attention_mask.unsqueeze(-1).float()
mask_focus = (input_ids == self.tokenizer.mask_token_id).unsqueeze(-1).float()
weight_mask = 0.5 * attention_mask_f + 0.5 * mask_focus
hidden = hidden + correction * weight_mask
if hasattr(self.mlm_model, "cls"):
logits = self.mlm_model.cls(hidden)
else:
logits = self.mlm_model.get_output_embeddings()(hidden)
mask_positions = (input_ids == self.tokenizer.mask_token_id)
results: List[List[List[Tuple[str, float]]]] = []
for batch_index in range(input_ids.size(0)):
batch_tokens: List[List[Tuple[str, float]]] = []
positions = mask_positions[batch_index].nonzero(as_tuple=False).view(-1)
for position in positions:
token_logits = logits[batch_index, position]
token_probs = F.softmax(token_logits, dim=-1)
topk = torch.topk(token_probs, top_k)
decoded: List[Tuple[str, float]] = []
for token_id, prob in zip(topk.indices, topk.values):
token = self.tokenizer.decode([token_id]).strip()
decoded.append((token, prob.item()))
batch_tokens.append(decoded)
results.append(batch_tokens)
return results
def run_demo(embed_dim: int = 4, batch_size: int = 3, seed: int = 123) -> None:
torch.manual_seed(seed)
model = VortexBetinaAntiHalluc(embed_dim=embed_dim)
inputs = torch.randn(batch_size, embed_dim)
evolved, loss, metrics, delta = model(inputs)
print("Dimensão de entrada:", embed_dim)
print("Inputs:", inputs)
print("Estado evoluído shape:", evolved.shape)
print("Delta shape:", delta.shape)
print("Loss:", loss.item())
for key, value in metrics.items():
print(f"{key}: {value}")
def sample_sentences() -> List[str]:
return [
"O céu de Lisboa estava completamente claro naquela manhã.",
"A inteligência coletiva da equipe resolveu o problema rapidamente.",
"O gato preto dormia tranquilo sobre o sofá da sala.",
"A orquestra executou a sinfonia com uma precisão impressionante.",
"Os dados indicam uma redução consistente nas alucinações do modelo.",
"A pesquisa científica requer paciência, rigor e curiosidade constante.",
"A ponte antiga foi restaurada para preservar o patrimônio cultural.",
"O sistema Betina ajusta embeddings para evitar distorções semânticas.",
]
def load_sentences_from_args(args: argparse.Namespace) -> List[str]:
if args.dataset_file:
path = Path(args.dataset_file)
if not path.exists():
raise FileNotFoundError(f"Dataset file not found: {path}")
sentences = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
if not sentences:
raise ValueError(f"Dataset file {path} is empty")
return sentences if args.dataset_limit is None else sentences[: args.dataset_limit]
if args.dataset_hf:
if load_dataset is None:
raise ImportError("Install the 'datasets' package to use --dataset-hf option")
split = args.dataset_split
if args.dataset_limit and ":" not in split:
split = f"{split}[:{args.dataset_limit}]"
dataset_name = args.dataset_hf
config_name: Optional[str] = args.dataset_hf_config or None
try:
dataset = _safe_load_dataset(
dataset_name,
config_name,
split=split,
hf_token=args.hf_token,
trust_remote_code=args.trust_remote_code,
)
except Exception as exc:
message = str(exc).lower()
scripts_blocked = "dataset scripts are no longer supported" in message
trust_flag_blocked = "trust_remote_code" in message
if dataset_name == "wikipedia" and (scripts_blocked or trust_flag_blocked):
fallback_name = "wikimedia/wikipedia"
fallback_config = config_name or "20231101.pt"
print(
"Dataset 'wikipedia' agora usa snapshot parquet e não aceita mais scripts remotos."
f" Alternando automaticamente para {fallback_name} ({fallback_config})."
)
dataset = _safe_load_dataset(
fallback_name,
fallback_config,
split=split,
hf_token=args.hf_token,
trust_remote_code=False,
)
elif scripts_blocked and not args.trust_remote_code:
hint = (
"O dataset solicita código remoto. Reexecute com --trust-remote-code para habilitar"
" scripts do autor do dataset. Apenas use se confiar na fonte."
)
raise RuntimeError(hint) from exc
if "gated dataset" in message or "403" in message or "401" in message:
hint = (
"Dataset protegido requer autenticação. Informe --hf-token <TOKEN>, defina HF_TOKEN/HUGGINGFACE_TOKEN"
" ou utilize --hf-token-file apontando para o token salvo pelo huggingface-cli. Também é possível"
" executar 'huggingface-cli login' para gerar ~/.cache/huggingface/token.\nVocê pode obter o token em"
" https://huggingface.co/settings/tokens."
)
raise RuntimeError(hint) from exc
raise
sentences: List[str] = []
text_field = args.dataset_text_field
limit = args.dataset_limit
for item in dataset:
text = item.get(text_field)
if isinstance(text, str) and text.strip():
sentences.append(text.strip())
if limit is not None and len(sentences) >= limit:
break
if not sentences:
raise ValueError("No sentences extracted from the specified dataset")
return sentences
return sample_sentences()
def print_gain_summary(
prompts: List[str],
base_fills: List[List[List[Tuple[str, float]]]],
betina_fills: List[List[List[Tuple[str, float]]]],
) -> None:
print("\nResumo de ganhos top-1:")
for prompt, base_masks, betina_masks in zip(prompts, base_fills, betina_fills):
prompt_head = prompt if len(prompt) <= 60 else f"{prompt[:57]}..."
for idx, (base_group, betina_group) in enumerate(zip(base_masks, betina_masks), start=1):
if not base_group or not betina_group:
continue
base_token, base_prob = base_group[0]
betina_token, betina_prob = betina_group[0]
delta = betina_prob - base_prob
if base_prob > 0:
rel = delta / base_prob * 100.0
rel_text = f"{rel:+.2f}%"
else:
rel_text = "n/d"
change_desc = "mantido" if betina_token == base_token else f"{base_token} -> {betina_token}"
print(
f" [{prompt_head}] máscara {idx}: {change_desc} | base {base_prob:.4f} -> betina {betina_prob:.4f} ({rel_text})"
)
def _prepare_debug_value(value, max_examples: int):
if isinstance(value, torch.Tensor):
limited = value.detach().cpu()
if limited.dim() >= 1:
limited = limited[:max_examples]
return limited.tolist()
if isinstance(value, dict):
return {key: _prepare_debug_value(val, max_examples) for key, val in value.items()}
if isinstance(value, (list, tuple)):
return [_prepare_debug_value(item, max_examples) for item in value]
if isinstance(value, (float, int, str)) or value is None:
return value
return str(value)
def dump_square_debug(flow_states: Dict[str, object], metrics: Dict[str, float], output_path: str, max_examples: int = 1) -> Path:
output = Path(output_path).expanduser()
payload = {
"max_examples": max(1, max_examples),
"metrics": {key: float(value) if isinstance(value, (int, float)) else value for key, value in metrics.items()},
"flow_states": {key: _prepare_debug_value(val, max_examples) for key, val in flow_states.items()},
}
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
return output
def main(argv: list[str] | None = None):
parser = argparse.ArgumentParser(description="Treinamento do modelo Betina anti-hallucination")
parser.add_argument("--train", action="store_true", help="Executa treinamento completo")
parser.add_argument("--epochs", type=int, default=5, help="Número de épocas para treinar")
parser.add_argument("--batch-size", type=int, default=4, help="Tamanho do batch")
parser.add_argument("--embed-dim", type=int, default=256, choices=[128, 256], help="Dimensão interna do vórtice")
parser.add_argument("--lambda-vortex", type=float, default=0.1, help="Peso da loss do vórtice")
parser.add_argument("--chaos-factor", type=float, default=1.0, help="Fator de caos aplicado durante o treinamento")
parser.add_argument(
"--eval-chaos-factor",
type=float,
default=1.0,
help="Fator de caos usado em avaliação e inferência (permite medir diferentes regimes)",
)
parser.add_argument("--learning-rate", type=float, default=1e-4, help="Learning rate para o vórtice e projetores")
parser.add_argument("--mlm-learning-rate", type=float, default=5e-5, help="Learning rate para o modelo de linguagem")
parser.add_argument("--freeze-mlm", action="store_true", help="Congela os pesos do modelo de linguagem durante o treino")
parser.add_argument("--freeze-projectors", action="store_true", help="Congela os projetores de embedding/correção (modo inferência)")
parser.add_argument(
"--correction-max-norm",
type=float,
default=None,
help="Clampa o vetor de correção Betina a esta norma L2 (<=0 desativa)",
)
parser.add_argument("--output-dir", type=str, default="outputs/betina_vortex", help="Diretório para salvar o modelo")
parser.add_argument("--device", type=str, default=None, help="Força execução em cuda ou cpu")
parser.add_argument("--top-k", type=int, default=5, help="Top-k para avaliação de máscara")
parser.add_argument("--skip-eval", action="store_true", help="Pula avaliação pós-treino")
parser.add_argument("--eval-prompts", nargs="*", default=None, help="Prompts personalizados contendo [MASK] para avaliação")
parser.add_argument("--dataset-file", type=str, default=None, help="Arquivo de texto com uma sentença por linha")
parser.add_argument("--dataset-hf", type=str, default=None, help="Nome do dataset Hugging Face, ex.: oscar")
parser.add_argument("--dataset-hf-config", type=str, default=None, help="Config do dataset Hugging Face, ex.: unshuffled_deduplicated_pt")
parser.add_argument("--dataset-split", type=str, default="train[:1000]", help="Split do dataset Hugging Face")
parser.add_argument("--dataset-text-field", type=str, default="text", help="Campo de texto no dataset Hugging Face")
parser.add_argument("--dataset-limit", type=int, default=None, help="Limite de sentenças carregadas")
parser.add_argument("--hf-token", type=str, default=None, help="Token de autenticação da Hugging Face (ou defina HF_TOKEN)")
parser.add_argument(
"--hf-token-file",
type=str,
default=None,
help="Arquivo contendo o token da Hugging Face (padrão: ~/.cache/huggingface/token)",
)
parser.add_argument("--trust-remote-code", action="store_true", help="Permite datasets com script remoto (requer confiança no autor)")
parser.add_argument("--force-download", action="store_true", help="Força novo download dos pesos do modelo de linguagem")
parser.add_argument("--disable-rotation", action="store_true", help="Desativa a rotação do delta do vórtice")
parser.add_argument("--rotation-angle", type=float, default=math.pi / 4, help="Ângulo (rad) para rotacionar o delta quando ativado")
parser.add_argument("--rotation-threshold", type=float, default=1e-4, help="Norma mínima do delta para aplicar rotação")
parser.add_argument("--rotation-clockwise", action="store_true", help="Força rotação horária (inverte o sinal do ângulo)")
parser.add_argument(
"--enable-quadratic-reflection",
action="store_true",
help="Ativa reflexão quadrática estilo bolinha/bastão no delta do vórtice",
)
parser.add_argument(
"--quadratic-boundary",
type=float,
default=1.0,
help="Magnitudes acima desse valor são refletidas (parede virtual)",
)
parser.add_argument(
"--quadratic-strength",
type=float,
default=0.5,
help="Mistura (0-1) entre o delta original e o refletido quadrático",
)
parser.add_argument(
"--disable-triangle",
action="store_true",
help="Desativa o neurônio sintético triangular que confronta X, Y e contrabase",
)
parser.add_argument(
"--triangle-hidden-dim",
type=int,
default=512,
help="Dimensão oculta usada dentro do neurônio triangular",
)
parser.add_argument(
"--triangle-max-iters",
type=int,
default=5,
help="Iterações máximas de refinamento (porquês) do triângulo",
)
parser.add_argument(
"--triangle-tol",
type=float,
default=1e-4,
help="Tolerância para encerrar refinamento triangular (quanto menor, mais perguntas)",
)
parser.add_argument(
"--triangle-delta-gain",
type=float,
default=1.0,
help="Ganho aplicado ao eixo integrador do triângulo",
)
parser.add_argument(
"--disable-square-geometry",
action="store_true",
help="Desativa o giro de 180° que forma o quadrado X↔X⁻¹",
)
parser.add_argument(
"--square-rotation-degrees",
type=float,
default=180.0,
help="Ângulo aplicado ao girar a matriz completa (180° gera o quadrado perfeito)",
)
parser.add_argument(
"--square-leak-ratio",
type=float,
default=0.05,
help="Mistura o quadrado com a matriz original (0 mantém oposição perfeita, 1 ignora o giro)",
)
parser.add_argument(
"--square-jitter-std-deg",
type=float,
default=0.0,
help="Desvio padrão em graus para injetar ruído aleatório no giro quadrado",
)
parser.add_argument(
"--square-debug-json",
type=str,
default=None,
help="Se definido, salva um dump JSON com as matrizes primária/secundária e métricas do vórtice",
)
parser.add_argument(
"--square-debug-max",
type=int,
default=1,
help="Número máximo de exemplos incluídos no dump quadrado",
)
parser.add_argument(
"--enable-lorentz-transform",
action="store_true",
help="Aplica transformação de Lorentz no delta final para medir o resultado físico",
)
parser.add_argument(
"--lorentz-beta",
type=float,
default=0.6,
help="Fração da velocidade da luz usada no boost de Lorentz",
)
parser.add_argument(
"--lorentz-axis-stream",
type=int,
default=0,
help="Stream usado como eixo espacial (0=X, 1=Y, etc.) na transformação de Lorentz",
)
parser.add_argument(
"--ia-config",
type=str,
default=None,
help="Arquivo JSON descrevendo quais IAs assumem cada estágio/stream do fluxo matriz",
)
parser.add_argument(
"--refinement-cycles",
type=int,
default=0,
help="Quantidade de ciclos circundantes de refinamento IA aplicados ao delta final",
)
parser.add_argument(
"--cycle-stage-name",
type=str,
default="cycle",
help="Nome do estágio IA usado durante cada ciclo circundante",
)
parser.add_argument("--max-seq-length", type=int, default=512, help="Comprimento máximo de tokens por exemplo (padrão BERT)")
parser.add_argument("--precompute-embeddings", action="store_true", help="Codifica todas as sentenças antes do treino (requer muita RAM)")
parser.add_argument("--embedding-batch-size", type=int, default=64, help="Batch interno para geração de embeddings (encode)")
if argv is None:
argv_list = sys.argv[1:]
if "ipykernel" in sys.modules:
filtered: List[str] = []
skip_next = False
for item in argv_list:
if skip_next:
skip_next = False
continue
if item == "-f":
skip_next = True
continue
if item.startswith("-f="):
continue
filtered.append(item)
argv_list = filtered
else:
argv_list = list(argv)
parsed, unknown = parser.parse_known_args(argv_list)
if unknown:
print(f"Ignorando argumentos desconhecidos: {unknown}")
args = parsed
args.hf_token, token_source = resolve_hf_token(args.hf_token, args.hf_token_file)
if args.hf_token and token_source:
print(f"Token Hugging Face detectado via {token_source}.")
device = torch.device(args.device) if args.device else torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")
embedding_model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
tokenizer_name = "neuralmind/bert-base-portuguese-cased"
embedding_model = SentenceTransformer(embedding_model_name, device=str(device) if device.type == "cuda" else "cpu")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token or tokenizer.sep_token or tokenizer.cls_token
try:
mlm_model = AutoModelForMaskedLM.from_pretrained(tokenizer_name, force_download=args.force_download)
except Exception as exc:
print(f"Download failed: {exc}. Try checking internet or cache.")
raise
sentences = load_sentences_from_args(args)
dataset = PortugueseSentenceDataset(
sentences,
tokenizer,
embedding_model,
max_seq_length=args.max_seq_length,
precompute_embeddings=args.precompute_embeddings,
embedding_batch_size=args.embedding_batch_size,
)
collate_fn = build_collate_fn(tokenizer)
dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn)
eval_dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn)
stream_aliases: Optional[List[str]] = None
ia_stage_config: Optional[Dict[str, Dict[str, nn.Module]]] = None
refinement_cycles = args.refinement_cycles
config_cycle_stage = args.cycle_stage_name
if args.ia_config:
ia_config_data = load_ia_config_file(args.ia_config, args.embed_dim)
print(f"Config IA carregada de {args.ia_config}")
stream_aliases = ia_config_data.get("stream_aliases")
ia_stage_config = ia_config_data.get("stage_config") # type: ignore[assignment]
config_cycles = ia_config_data.get("refinement_cycles")
if isinstance(config_cycles, int):
refinement_cycles = config_cycles
config_cycle_stage = str(ia_config_data.get("cycle_stage_name", config_cycle_stage))
vortex = VortexBetinaAntiHalluc(
embed_dim=args.embed_dim,
enable_rotation=not args.disable_rotation,
rotation_angle=args.rotation_angle,
rotation_threshold=args.rotation_threshold,
rotation_clockwise=args.rotation_clockwise,
enable_quadratic_reflection=args.enable_quadratic_reflection,
quadratic_boundary=args.quadratic_boundary,
quadratic_strength=args.quadratic_strength,
stream_aliases=stream_aliases,
ia_stage_config=ia_stage_config,
refinement_cycles=refinement_cycles,
cycle_stage_name=config_cycle_stage,
enforce_square_geometry=not args.disable_square_geometry,
square_rotation_degrees=args.square_rotation_degrees,
square_leak_ratio=args.square_leak_ratio,
square_jitter_std_degrees=args.square_jitter_std_deg,
enable_lorentz_transform=args.enable_lorentz_transform,
lorentz_beta=args.lorentz_beta,
lorentz_axis_stream=args.lorentz_axis_stream,
enable_triangle=not args.disable_triangle,
triangle_hidden_dim=args.triangle_hidden_dim,
triangle_max_iters=args.triangle_max_iters,
triangle_tol=args.triangle_tol,
triangle_delta_gain=args.triangle_delta_gain,
)
trainer = BetinaTrainer(
vortex=vortex,
tokenizer=tokenizer,
embedding_model=embedding_model,
mlm_model=mlm_model,
raw_embedding_dim=embedding_model.get_sentence_embedding_dimension(),
embed_dim=args.embed_dim,
lambda_vortex=args.lambda_vortex,
learning_rate=args.learning_rate,
mlm_learning_rate=args.mlm_learning_rate,
freeze_mlm=args.freeze_mlm,
freeze_projectors=args.freeze_projectors,
correction_max_norm=args.correction_max_norm,
eval_chaos_factor=args.eval_chaos_factor,
chaos_factor=args.chaos_factor,
device=device,
max_seq_length=args.max_seq_length,
)
if args.square_debug_json:
square_max = max(1, args.square_debug_max)
try:
sample_batch = next(iter(dataloader))
except StopIteration as exc: # pragma: no cover - dataset vazio
raise RuntimeError("Não é possível gerar dump quadrado: dataset vazio") from exc
sample_embeddings = sample_batch["embedding"][:square_max].to(device)
with torch.no_grad():
projected = trainer.embedding_projector(sample_embeddings)
_, _, metrics_debug, _ = trainer.vortex(projected, chaos_factor=trainer.eval_chaos_factor)
dump_square_debug(
trainer.vortex.last_flow_states,
metrics_debug,
args.square_debug_json,
max_examples=min(square_max, sample_embeddings.size(0)),
)
print(f"Dump quadrado salvo em {args.square_debug_json}")
if args.train:
print("Iniciando treinamento...")
history = trainer.train(dataloader, epochs=args.epochs)
print(f"Treinamento finalizado com {len(history)} passos")
trainer.save(args.output_dir)
print(f"Modelos salvos em {args.output_dir}")
if not args.skip_eval:
ppl_base = trainer.evaluate_perplexity(eval_dataloader, apply_correction=False)
ppl_betina = trainer.evaluate_perplexity(eval_dataloader, apply_correction=True)
print(f"\nPerplexity sem correção: {ppl_base:.4f}")
print(f"Perplexity com correção Betina: {ppl_betina:.4f}")
ppl_delta = ppl_base - ppl_betina
if ppl_base > 0:
ppl_rel = ppl_delta / ppl_base * 100.0
print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: {ppl_rel:+.2f}%")
else:
print(f"Ganho absoluto: {ppl_delta:+.4f} | Ganho relativo: n/d")
eval_prompts = args.eval_prompts or [
"O modelo Betina evita [MASK] durante a geração.",
"A capital de Portugal é [MASK].",
"A IA Betina corrige [MASK] via vórtice.",
"O vórtice no Betina filtra [MASK] para reduzir alucinações.",
]
print("\nPreenchimento sem correção:")
base_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=False)
for prompt, tokens in zip(eval_prompts, base_fills):
print(prompt)
for idx, group in enumerate(tokens, start=1):
formatted = [f"{token} ({prob:.4f})" for token, prob in group]
print(f" Mascara {idx}: {formatted}")
print("\nPreenchimento com correção Betina:")
betina_fills = trainer.fill_masks(eval_prompts, top_k=args.top_k, apply_correction=True)
for prompt, tokens in zip(eval_prompts, betina_fills):
print(prompt)
for idx, group in enumerate(tokens, start=1):
formatted = [f"{token} ({prob:.4f})" for token, prob in group]
print(f" Mascara {idx}: {formatted}")
print_gain_summary(eval_prompts, base_fills, betina_fills)
if __name__ == "__main__":
main()