al1kss commited on
Commit
803f81a
·
verified ·
1 Parent(s): 736444f

Create enhanced_database_manager.py

Browse files
Files changed (1) hide show
  1. enhanced_database_manager.py +616 -0
enhanced_database_manager.py ADDED
@@ -0,0 +1,616 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import os
3
+ import json
4
+ import logging
5
+ import numpy as np
6
+ import pickle
7
+ import gzip
8
+ import asyncpg
9
+ from typing import Dict, List, Optional, Any, Tuple
10
+ from datetime import datetime
11
+ import uuid
12
+ import base64
13
+
14
+ class EnhancedDatabaseManager:
15
+ """Enhanced Database Manager that stores everything in PostgreSQL + Vercel Blob"""
16
+
17
+ def __init__(self, database_url: str):
18
+ self.database_url = database_url
19
+ self.pool = None
20
+ self.logger = logging.getLogger(__name__)
21
+
22
+ async def connect(self):
23
+ """Initialize database connection pool"""
24
+ try:
25
+ self.pool = await asyncpg.create_pool(
26
+ self.database_url,
27
+ min_size=2,
28
+ max_size=20,
29
+ command_timeout=60
30
+ )
31
+ self.logger.info("Enhanced database connection pool created successfully")
32
+
33
+ # Create all necessary tables
34
+ await self._create_all_tables()
35
+
36
+ except Exception as e:
37
+ self.logger.error(f"Database connection failed: {e}")
38
+ raise
39
+
40
+ async def _create_all_tables(self):
41
+ """Create all tables for comprehensive storage"""
42
+ async with self.pool.acquire() as conn:
43
+ await conn.execute("""
44
+ -- RAG instances metadata
45
+ CREATE TABLE IF NOT EXISTS rag_instances (
46
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
47
+ ai_type VARCHAR(50) NOT NULL,
48
+ user_id VARCHAR(100),
49
+ ai_id VARCHAR(100),
50
+ name VARCHAR(255) NOT NULL,
51
+ description TEXT,
52
+
53
+ -- Storage references
54
+ blob_url TEXT,
55
+ config_json JSONB,
56
+
57
+ -- Statistics
58
+ total_chunks INTEGER DEFAULT 0,
59
+ total_tokens INTEGER DEFAULT 0,
60
+ file_count INTEGER DEFAULT 0,
61
+
62
+ -- Timestamps
63
+ created_at TIMESTAMP DEFAULT NOW(),
64
+ updated_at TIMESTAMP DEFAULT NOW(),
65
+ last_accessed_at TIMESTAMP DEFAULT NOW(),
66
+
67
+ -- Status
68
+ status VARCHAR(20) DEFAULT 'active',
69
+
70
+ UNIQUE(ai_type, user_id, ai_id)
71
+ );
72
+
73
+ -- Knowledge files metadata
74
+ CREATE TABLE IF NOT EXISTS knowledge_files (
75
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
76
+ rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE,
77
+ filename VARCHAR(255) NOT NULL,
78
+ original_filename VARCHAR(255),
79
+ file_type VARCHAR(50),
80
+ file_size INTEGER,
81
+
82
+ -- Content storage
83
+ content_text TEXT,
84
+ content_blob BYTEA,
85
+
86
+ -- Processing info
87
+ processed_at TIMESTAMP DEFAULT NOW(),
88
+ processing_status VARCHAR(20) DEFAULT 'pending',
89
+ token_count INTEGER DEFAULT 0,
90
+
91
+ -- Timestamps
92
+ created_at TIMESTAMP DEFAULT NOW(),
93
+ updated_at TIMESTAMP DEFAULT NOW()
94
+ );
95
+
96
+ -- RAG graph data (for large graphs, store in chunks)
97
+ CREATE TABLE IF NOT EXISTS rag_graph_data (
98
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
99
+ rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE,
100
+ data_type VARCHAR(20) NOT NULL, -- 'nodes', 'edges', 'attrs'
101
+ chunk_index INTEGER DEFAULT 0,
102
+ chunk_data JSONB,
103
+ created_at TIMESTAMP DEFAULT NOW()
104
+ );
105
+
106
+ -- RAG vector data (for large embeddings, store in chunks)
107
+ CREATE TABLE IF NOT EXISTS rag_vector_data (
108
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
109
+ rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE,
110
+ data_type VARCHAR(20) NOT NULL, -- 'embeddings', 'metadata'
111
+ chunk_index INTEGER DEFAULT 0,
112
+ chunk_data JSONB,
113
+ created_at TIMESTAMP DEFAULT NOW()
114
+ );
115
+
116
+ -- User conversations
117
+ CREATE TABLE IF NOT EXISTS conversations (
118
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
119
+ user_id VARCHAR(100) NOT NULL,
120
+ rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE,
121
+ title VARCHAR(255),
122
+ created_at TIMESTAMP DEFAULT NOW(),
123
+ updated_at TIMESTAMP DEFAULT NOW(),
124
+ is_active BOOLEAN DEFAULT TRUE
125
+ );
126
+
127
+ -- Conversation messages
128
+ CREATE TABLE IF NOT EXISTS conversation_messages (
129
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
130
+ conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
131
+ role VARCHAR(20) NOT NULL, -- 'user', 'assistant'
132
+ content TEXT NOT NULL,
133
+ metadata JSONB DEFAULT '{}',
134
+ created_at TIMESTAMP DEFAULT NOW()
135
+ );
136
+
137
+ -- System statistics
138
+ CREATE TABLE IF NOT EXISTS system_stats (
139
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
140
+ stat_date DATE DEFAULT CURRENT_DATE,
141
+ total_rag_instances INTEGER DEFAULT 0,
142
+ total_conversations INTEGER DEFAULT 0,
143
+ total_messages INTEGER DEFAULT 0,
144
+ total_knowledge_files INTEGER DEFAULT 0,
145
+ created_at TIMESTAMP DEFAULT NOW(),
146
+ UNIQUE(stat_date)
147
+ );
148
+
149
+ -- Create indexes for performance
150
+ CREATE INDEX IF NOT EXISTS idx_rag_instances_lookup ON rag_instances(ai_type, user_id, ai_id);
151
+ CREATE INDEX IF NOT EXISTS idx_rag_instances_status ON rag_instances(status);
152
+ CREATE INDEX IF NOT EXISTS idx_rag_instances_user ON rag_instances(user_id);
153
+ CREATE INDEX IF NOT EXISTS idx_knowledge_files_rag ON knowledge_files(rag_instance_id);
154
+ CREATE INDEX IF NOT EXISTS idx_conversations_user ON conversations(user_id);
155
+ CREATE INDEX IF NOT EXISTS idx_conversation_messages_conv ON conversation_messages(conversation_id);
156
+ CREATE INDEX IF NOT EXISTS idx_rag_graph_data_rag ON rag_graph_data(rag_instance_id);
157
+ CREATE INDEX IF NOT EXISTS idx_rag_vector_data_rag ON rag_vector_data(rag_instance_id);
158
+ """)
159
+
160
+ self.logger.info("Enhanced database tables created/verified successfully")
161
+
162
+ async def save_complete_rag_instance(
163
+ self,
164
+ ai_type: str,
165
+ user_id: Optional[str],
166
+ ai_id: Optional[str],
167
+ name: str,
168
+ description: Optional[str],
169
+ rag_state: Dict[str, Any],
170
+ blob_url: Optional[str] = None
171
+ ) -> str:
172
+ """Save complete RAG instance with all data to database"""
173
+
174
+ async with self.pool.acquire() as conn:
175
+ async with conn.transaction():
176
+ # Save main RAG instance
177
+ rag_instance_id = await conn.fetchval("""
178
+ INSERT INTO rag_instances (
179
+ ai_type, user_id, ai_id, name, description, blob_url,
180
+ config_json, total_chunks, total_tokens, file_count
181
+ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
182
+ ON CONFLICT (ai_type, user_id, ai_id) DO UPDATE SET
183
+ name = EXCLUDED.name,
184
+ description = EXCLUDED.description,
185
+ blob_url = EXCLUDED.blob_url,
186
+ config_json = EXCLUDED.config_json,
187
+ total_chunks = EXCLUDED.total_chunks,
188
+ total_tokens = EXCLUDED.total_tokens,
189
+ file_count = EXCLUDED.file_count,
190
+ updated_at = NOW()
191
+ RETURNING id;
192
+ """,
193
+ ai_type, user_id, ai_id, name, description, blob_url,
194
+ json.dumps(rag_state.get('config', {})),
195
+ len(rag_state.get('vectors', {}).get('embeddings', [])),
196
+ self._estimate_tokens(rag_state),
197
+ 0
198
+ )
199
+
200
+ # Clear existing graph and vector data
201
+ await conn.execute("""
202
+ DELETE FROM rag_graph_data WHERE rag_instance_id = $1
203
+ """, rag_instance_id)
204
+
205
+ await conn.execute("""
206
+ DELETE FROM rag_vector_data WHERE rag_instance_id = $1
207
+ """, rag_instance_id)
208
+
209
+ # Save graph data in chunks
210
+ graph_data = rag_state.get('graph', {})
211
+ await self._save_graph_data(conn, rag_instance_id, graph_data)
212
+
213
+ # Save vector data in chunks
214
+ vector_data = rag_state.get('vectors', {})
215
+ await self._save_vector_data(conn, rag_instance_id, vector_data)
216
+
217
+ return str(rag_instance_id)
218
+
219
+ async def _save_graph_data(self, conn, rag_instance_id: str, graph_data: Dict[str, Any]):
220
+ """Save graph data in chunks to avoid size limits"""
221
+
222
+ # Save nodes in chunks
223
+ nodes = graph_data.get('nodes', [])
224
+ if nodes:
225
+ chunk_size = 1000 # Adjust based on your needs
226
+ for i in range(0, len(nodes), chunk_size):
227
+ chunk = nodes[i:i + chunk_size]
228
+ await conn.execute("""
229
+ INSERT INTO rag_graph_data (rag_instance_id, data_type, chunk_index, chunk_data)
230
+ VALUES ($1, $2, $3, $4)
231
+ """, rag_instance_id, 'nodes', i // chunk_size, json.dumps(chunk))
232
+
233
+ # Save edges in chunks
234
+ edges = graph_data.get('edges', [])
235
+ if edges:
236
+ chunk_size = 1000
237
+ for i in range(0, len(edges), chunk_size):
238
+ chunk = edges[i:i + chunk_size]
239
+ await conn.execute("""
240
+ INSERT INTO rag_graph_data (rag_instance_id, data_type, chunk_index, chunk_data)
241
+ VALUES ($1, $2, $3, $4)
242
+ """, rag_instance_id, 'edges', i // chunk_size, json.dumps(chunk))
243
+
244
+ # Save graph attributes
245
+ graph_attrs = graph_data.get('graph_attrs', {})
246
+ if graph_attrs:
247
+ await conn.execute("""
248
+ INSERT INTO rag_graph_data (rag_instance_id, data_type, chunk_index, chunk_data)
249
+ VALUES ($1, $2, $3, $4)
250
+ """, rag_instance_id, 'attrs', 0, json.dumps(graph_attrs))
251
+
252
+ async def _save_vector_data(self, conn, rag_instance_id: str, vector_data: Dict[str, Any]):
253
+ """Save vector data in chunks to avoid size limits"""
254
+
255
+ # Save embeddings in chunks
256
+ embeddings = vector_data.get('embeddings', [])
257
+ if embeddings:
258
+ chunk_size = 100 # Smaller chunks for embeddings
259
+ for i in range(0, len(embeddings), chunk_size):
260
+ chunk = embeddings[i:i + chunk_size]
261
+ await conn.execute("""
262
+ INSERT INTO rag_vector_data (rag_instance_id, data_type, chunk_index, chunk_data)
263
+ VALUES ($1, $2, $3, $4)
264
+ """, rag_instance_id, 'embeddings', i // chunk_size, json.dumps(chunk))
265
+
266
+ # Save metadata
267
+ metadata = vector_data.get('metadata', [])
268
+ if metadata:
269
+ await conn.execute("""
270
+ INSERT INTO rag_vector_data (rag_instance_id, data_type, chunk_index, chunk_data)
271
+ VALUES ($1, $2, $3, $4)
272
+ """, rag_instance_id, 'metadata', 0, json.dumps(metadata))
273
+
274
+ async def load_complete_rag_instance(
275
+ self,
276
+ ai_type: str,
277
+ user_id: Optional[str] = None,
278
+ ai_id: Optional[str] = None
279
+ ) -> Optional[Dict[str, Any]]:
280
+ """Load complete RAG instance from database"""
281
+
282
+ async with self.pool.acquire() as conn:
283
+ # Get main RAG instance
284
+ rag_instance = await conn.fetchrow("""
285
+ SELECT id, ai_type, user_id, ai_id, name, description, blob_url,
286
+ config_json, total_chunks, total_tokens, file_count,
287
+ created_at, updated_at, last_accessed_at, status
288
+ FROM rag_instances
289
+ WHERE ai_type = $1 AND user_id = $2 AND ai_id = $3 AND status = 'active'
290
+ """, ai_type, user_id, ai_id)
291
+
292
+ if not rag_instance:
293
+ return None
294
+
295
+ # Update last accessed time
296
+ await conn.execute("""
297
+ UPDATE rag_instances SET last_accessed_at = NOW() WHERE id = $1
298
+ """, rag_instance['id'])
299
+
300
+ # Load graph data
301
+ graph_data = await self._load_graph_data(conn, rag_instance['id'])
302
+
303
+ # Load vector data
304
+ vector_data = await self._load_vector_data(conn, rag_instance['id'])
305
+
306
+ return {
307
+ "metadata": dict(rag_instance),
308
+ "rag_state": {
309
+ "graph": graph_data,
310
+ "vectors": vector_data,
311
+ "config": rag_instance['config_json'] or {},
312
+ "version": "1.0"
313
+ }
314
+ }
315
+
316
+ async def _load_graph_data(self, conn, rag_instance_id: str) -> Dict[str, Any]:
317
+ """Load graph data from chunks"""
318
+
319
+ # Load nodes
320
+ nodes_chunks = await conn.fetch("""
321
+ SELECT chunk_index, chunk_data FROM rag_graph_data
322
+ WHERE rag_instance_id = $1 AND data_type = 'nodes'
323
+ ORDER BY chunk_index
324
+ """, rag_instance_id)
325
+
326
+ nodes = []
327
+ for chunk_row in nodes_chunks:
328
+ nodes.extend(chunk_row['chunk_data'])
329
+
330
+ # Load edges
331
+ edges_chunks = await conn.fetch("""
332
+ SELECT chunk_index, chunk_data FROM rag_graph_data
333
+ WHERE rag_instance_id = $1 AND data_type = 'edges'
334
+ ORDER BY chunk_index
335
+ """, rag_instance_id)
336
+
337
+ edges = []
338
+ for chunk_row in edges_chunks:
339
+ edges.extend(chunk_row['chunk_data'])
340
+
341
+ # Load graph attributes
342
+ attrs_row = await conn.fetchrow("""
343
+ SELECT chunk_data FROM rag_graph_data
344
+ WHERE rag_instance_id = $1 AND data_type = 'attrs'
345
+ """, rag_instance_id)
346
+
347
+ graph_attrs = attrs_row['chunk_data'] if attrs_row else {}
348
+
349
+ return {
350
+ "nodes": nodes,
351
+ "edges": edges,
352
+ "graph_attrs": graph_attrs
353
+ }
354
+
355
+ async def _load_vector_data(self, conn, rag_instance_id: str) -> Dict[str, Any]:
356
+ """Load vector data from chunks"""
357
+
358
+ # Load embeddings
359
+ embeddings_chunks = await conn.fetch("""
360
+ SELECT chunk_index, chunk_data FROM rag_vector_data
361
+ WHERE rag_instance_id = $1 AND data_type = 'embeddings'
362
+ ORDER BY chunk_index
363
+ """, rag_instance_id)
364
+
365
+ embeddings = []
366
+ for chunk_row in embeddings_chunks:
367
+ embeddings.extend(chunk_row['chunk_data'])
368
+
369
+ # Load metadata
370
+ metadata_row = await conn.fetchrow("""
371
+ SELECT chunk_data FROM rag_vector_data
372
+ WHERE rag_instance_id = $1 AND data_type = 'metadata'
373
+ """, rag_instance_id)
374
+
375
+ metadata = metadata_row['chunk_data'] if metadata_row else []
376
+
377
+ return {
378
+ "embeddings": embeddings,
379
+ "metadata": metadata,
380
+ "dimension": 1024
381
+ }
382
+
383
+ async def save_knowledge_file(
384
+ self,
385
+ rag_instance_id: str,
386
+ filename: str,
387
+ original_filename: str,
388
+ file_type: str,
389
+ file_size: int,
390
+ content_text: str,
391
+ content_blob: Optional[bytes] = None
392
+ ) -> str:
393
+ """Save knowledge file to database"""
394
+
395
+ async with self.pool.acquire() as conn:
396
+ file_id = await conn.fetchval("""
397
+ INSERT INTO knowledge_files (
398
+ rag_instance_id, filename, original_filename, file_type,
399
+ file_size, content_text, content_blob, processing_status
400
+ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
401
+ RETURNING id
402
+ """, rag_instance_id, filename, original_filename, file_type,
403
+ file_size, content_text, content_blob, 'processed')
404
+
405
+ return str(file_id)
406
+
407
+ async def get_knowledge_files(self, rag_instance_id: str) -> List[Dict[str, Any]]:
408
+ """Get all knowledge files for a RAG instance"""
409
+
410
+ async with self.pool.acquire() as conn:
411
+ files = await conn.fetch("""
412
+ SELECT id, filename, original_filename, file_type, file_size,
413
+ content_text, processing_status, token_count,
414
+ created_at, updated_at
415
+ FROM knowledge_files
416
+ WHERE rag_instance_id = $1
417
+ ORDER BY created_at DESC
418
+ """, rag_instance_id)
419
+
420
+ return [dict(file) for file in files]
421
+
422
+ async def list_user_rag_instances(self, user_id: str) -> List[Dict[str, Any]]:
423
+ """List all RAG instances for a user"""
424
+ async with self.pool.acquire() as conn:
425
+ results = await conn.fetch("""
426
+ SELECT id, ai_type, ai_id, name, description, total_chunks,
427
+ total_tokens, file_count, created_at, updated_at,
428
+ last_accessed_at, status
429
+ FROM rag_instances
430
+ WHERE user_id = $1 AND status = 'active'
431
+ ORDER BY created_at DESC
432
+ """, user_id)
433
+
434
+ return [dict(row) for row in results]
435
+
436
+ async def save_conversation(
437
+ self,
438
+ user_id: str,
439
+ rag_instance_id: str,
440
+ title: Optional[str] = None
441
+ ) -> str:
442
+ """Save conversation to database"""
443
+
444
+ async with self.pool.acquire() as conn:
445
+ conversation_id = await conn.fetchval("""
446
+ INSERT INTO conversations (user_id, rag_instance_id, title)
447
+ VALUES ($1, $2, $3)
448
+ RETURNING id
449
+ """, user_id, rag_instance_id, title)
450
+
451
+ return str(conversation_id)
452
+
453
+ async def save_conversation_message(
454
+ self,
455
+ conversation_id: str,
456
+ role: str,
457
+ content: str,
458
+ metadata: Optional[Dict[str, Any]] = None
459
+ ) -> str:
460
+ """Save conversation message to database"""
461
+
462
+ async with self.pool.acquire() as conn:
463
+ message_id = await conn.fetchval("""
464
+ INSERT INTO conversation_messages (conversation_id, role, content, metadata)
465
+ VALUES ($1, $2, $3, $4)
466
+ RETURNING id
467
+ """, conversation_id, role, content, json.dumps(metadata or {}))
468
+
469
+ # Update conversation timestamp
470
+ await conn.execute("""
471
+ UPDATE conversations SET updated_at = NOW() WHERE id = $1
472
+ """, conversation_id)
473
+
474
+ return str(message_id)
475
+
476
+ async def get_conversation_messages(
477
+ self,
478
+ conversation_id: str,
479
+ limit: int = 50
480
+ ) -> List[Dict[str, Any]]:
481
+ """Get conversation messages from database"""
482
+
483
+ async with self.pool.acquire() as conn:
484
+ messages = await conn.fetch("""
485
+ SELECT id, role, content, metadata, created_at
486
+ FROM conversation_messages
487
+ WHERE conversation_id = $1
488
+ ORDER BY created_at DESC
489
+ LIMIT $2
490
+ """, conversation_id, limit)
491
+
492
+ return [dict(msg) for msg in reversed(messages)]
493
+
494
+ async def get_user_conversations(self, user_id: str) -> List[Dict[str, Any]]:
495
+ """Get all conversations for a user"""
496
+
497
+ async with self.pool.acquire() as conn:
498
+ conversations = await conn.fetch("""
499
+ SELECT c.id, c.title, c.created_at, c.updated_at,
500
+ r.name as ai_name, r.ai_type,
501
+ (SELECT content FROM conversation_messages
502
+ WHERE conversation_id = c.id
503
+ ORDER BY created_at DESC LIMIT 1) as last_message
504
+ FROM conversations c
505
+ JOIN rag_instances r ON c.rag_instance_id = r.id
506
+ WHERE c.user_id = $1 AND c.is_active = TRUE
507
+ ORDER BY c.updated_at DESC
508
+ """, user_id)
509
+
510
+ return [dict(conv) for conv in conversations]
511
+
512
+ async def update_system_stats(self):
513
+ """Update system statistics"""
514
+
515
+ async with self.pool.acquire() as conn:
516
+ # Get current counts
517
+ stats = await conn.fetchrow("""
518
+ SELECT
519
+ (SELECT COUNT(*) FROM rag_instances WHERE status = 'active') as rag_count,
520
+ (SELECT COUNT(*) FROM conversations WHERE is_active = TRUE) as conv_count,
521
+ (SELECT COUNT(*) FROM conversation_messages) as msg_count,
522
+ (SELECT COUNT(*) FROM knowledge_files) as file_count
523
+ """)
524
+
525
+ # Update stats for today
526
+ await conn.execute("""
527
+ INSERT INTO system_stats (
528
+ stat_date, total_rag_instances, total_conversations,
529
+ total_messages, total_knowledge_files
530
+ ) VALUES (CURRENT_DATE, $1, $2, $3, $4)
531
+ ON CONFLICT (stat_date) DO UPDATE SET
532
+ total_rag_instances = EXCLUDED.total_rag_instances,
533
+ total_conversations = EXCLUDED.total_conversations,
534
+ total_messages = EXCLUDED.total_messages,
535
+ total_knowledge_files = EXCLUDED.total_knowledge_files
536
+ """, stats['rag_count'], stats['conv_count'], stats['msg_count'], stats['file_count'])
537
+
538
+ async def get_system_stats(self) -> Dict[str, Any]:
539
+ """Get system statistics"""
540
+
541
+ async with self.pool.acquire() as conn:
542
+ stats = await conn.fetchrow("""
543
+ SELECT * FROM system_stats
544
+ ORDER BY stat_date DESC
545
+ LIMIT 1
546
+ """)
547
+
548
+ return dict(stats) if stats else {}
549
+
550
+ async def delete_rag_instance(self, rag_instance_id: str):
551
+ """Soft delete a RAG instance"""
552
+
553
+ async with self.pool.acquire() as conn:
554
+ await conn.execute("""
555
+ UPDATE rag_instances
556
+ SET status = 'deleted', updated_at = NOW()
557
+ WHERE id = $1
558
+ """, rag_instance_id)
559
+
560
+ async def cleanup_old_data(self, days_old: int = 30):
561
+ """Clean up old data from database"""
562
+
563
+ async with self.pool.acquire() as conn:
564
+ # Clean up old deleted RAG instances
565
+ await conn.execute("""
566
+ DELETE FROM rag_instances
567
+ WHERE status = 'deleted' AND updated_at < NOW() - INTERVAL '%s days'
568
+ """, days_old)
569
+
570
+ # Clean up old system stats (keep last 90 days)
571
+ await conn.execute("""
572
+ DELETE FROM system_stats
573
+ WHERE stat_date < CURRENT_DATE - INTERVAL '90 days'
574
+ """)
575
+
576
+ def _estimate_tokens(self, rag_state: Dict[str, Any]) -> int:
577
+ """Estimate token count from RAG state"""
578
+ try:
579
+ # Simple estimation based on serialized size
580
+ content_size = len(json.dumps(rag_state))
581
+ return content_size // 4 # Rough estimate: 4 chars per token
582
+ except:
583
+ return 0
584
+
585
+ async def get_database_size(self) -> Dict[str, Any]:
586
+ """Get database size information"""
587
+
588
+ async with self.pool.acquire() as conn:
589
+ size_info = await conn.fetchrow("""
590
+ SELECT
591
+ pg_size_pretty(pg_database_size(current_database())) as total_size,
592
+ (SELECT COUNT(*) FROM rag_instances) as rag_instances,
593
+ (SELECT COUNT(*) FROM knowledge_files) as knowledge_files,
594
+ (SELECT COUNT(*) FROM conversations) as conversations,
595
+ (SELECT COUNT(*) FROM conversation_messages) as messages,
596
+ (SELECT COUNT(*) FROM rag_graph_data) as graph_chunks,
597
+ (SELECT COUNT(*) FROM rag_vector_data) as vector_chunks
598
+ """)
599
+
600
+ return dict(size_info)
601
+
602
+ async def test_connection(self) -> bool:
603
+ """Test database connection"""
604
+ try:
605
+ async with self.pool.acquire() as conn:
606
+ await conn.fetchval("SELECT 1")
607
+ return True
608
+ except Exception as e:
609
+ self.logger.error(f"Database connection test failed: {e}")
610
+ return False
611
+
612
+ async def close(self):
613
+ """Close database connection pool"""
614
+ if self.pool:
615
+ await self.pool.close()
616
+ self.logger.info("Database connection pool closed")