File size: 6,378 Bytes
c0ef6d4 0194326 e900a8d 0194326 c0ef6d4 45df059 c0ef6d4 0194326 c0ef6d4 45df059 c0ef6d4 0194326 c0ef6d4 196ee96 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 196ee96 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 45df059 c0ef6d4 196ee96 c0ef6d4 196ee96 0194326 45df059 0194326 c0ef6d4 0194326 c0ef6d4 0194326 e900a8d c0ef6d4 45df059 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 45df059 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import json
import time
from typing import Dict, Any, Optional
from core.memory import load_user_state, save_user_state
import logging
from datetime import datetime
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SessionManager:
"""Manages user sessions and conversation context"""
def __init__(self, session_timeout: int = 3600):
"""Initialize session manager
Args:
session_timeout: Session timeout in seconds (default: 1 hour)
"""
self.session_timeout = session_timeout
def get_session(self, user_id: str) -> Dict[str, Any]:
"""Retrieve user session data
Args:
user_id: Unique identifier for the user
Returns:
Dictionary containing session data
"""
try:
state = load_user_state(user_id)
if not state:
logger.info(f"Creating new session for user {user_id}")
return self._create_new_session()
# Check if session has expired
last_activity = float(state.get('last_activity', 0))
if time.time() - last_activity > self.session_timeout:
logger.info(f"Session expired for user {user_id}, creating new session")
return self._create_new_session()
# Deserialize complex data types
for key, value in state.items():
if isinstance(value, str):
try:
# Try to parse as JSON for lists/dicts
if value.startswith('[') or value.startswith('{'):
state[key] = json.loads(value)
except (json.JSONDecodeError, TypeError):
# Keep as string if not valid JSON
pass
return state
except Exception as e:
logger.error(f"Error retrieving session for user {user_id}: {e}")
return self._create_new_session()
def update_session(self, user_id: str, data: Dict[str, Any]) -> bool:
"""Update user session data
Args:
user_id: Unique identifier for the user
data: Data to update in the session
Returns:
Boolean indicating success
"""
try:
# Get existing session
session = self.get_session(user_id)
# Update with new data
session.update(data)
session['last_activity'] = time.time()
# Convert complex data types to strings for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (list, dict)):
redis_data[key] = json.dumps(value)
elif isinstance(value, (int, float, str, bool)):
redis_data[key] = value
else:
redis_data[key] = str(value)
# Save updated session
result = save_user_state(user_id, redis_data)
if result:
logger.debug(f"Successfully updated session for user {user_id}")
else:
logger.warning(f"Failed to save session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error updating session for user {user_id}: {e}")
return False
def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool:
"""Update session with AI coordination data"""
try:
# Get existing session
session = self.get_session(user_id)
# Add AI coordination tracking
if 'ai_coordination' not in session:
session['ai_coordination'] = {
'requests_processed': 0,
'ollama_responses': 0,
'hf_responses': 0,
'last_coordination': None
}
coord_data = session['ai_coordination']
coord_data['requests_processed'] += 1
coord_data['last_coordination'] = datetime.now().isoformat()
# Track response types
if 'immediate_response' in ai_data:
coord_data['ollama_responses'] += 1
if ai_data.get('hf_response'):
coord_data['hf_responses'] += 1
# Convert complex data to JSON strings for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (dict, list)):
redis_data[key] = json.dumps(value)
else:
redis_data[key] = value
# Save updated session
result = save_user_state(user_id, redis_data)
if result:
logger.debug(f"Successfully updated coordination session for user {user_id}")
else:
logger.warning(f"Failed to save coordination session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error updating coordination session for user {user_id}: {e}")
return False
def clear_session(self, user_id: str) -> bool:
"""Clear user session data
Args:
user_id: Unique identifier for the user
Returns:
Boolean indicating success
"""
try:
result = save_user_state(user_id, {})
if result:
logger.info(f"Cleared session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error clearing session for user {user_id}: {e}")
return False
def _create_new_session(self) -> Dict[str, Any]:
"""Create a new session with default values
Returns:
Dictionary containing new session data
"""
session = {
'conversation': [],
'preferences': {},
'last_activity': time.time(),
'created_at': time.time()
}
logger.debug("Created new session")
return session
# Global session manager instance
session_manager = SessionManager()
|