1+ from __future__ import annotations
12import os
23import chromadb
34import uuid
1011from loguru import logger
1112import xmltodict
1213
13- from AgentCrew .modules .llm .base import BaseLLMService
1414from .base_service import BaseMemoryService
1515from AgentCrew .modules .prompts .constants import (
1616 SEMANTIC_EXTRACTING ,
1717 PRE_ANALYZE_PROMPT ,
1818)
1919import chromadb .utils .embedding_functions as embedding_functions
20+ from typing import TYPE_CHECKING
21+
22+ if TYPE_CHECKING :
23+ from chromadb import Collection
24+ from AgentCrew .modules .llm .base import BaseLLMService
2025
2126# Configuration constants
2227DEFAULT_CHUNK_SIZE = 200 # words per chunk
@@ -68,7 +73,29 @@ def __init__(
6873 elif self .llm_service .provider_name == "github_copilot" :
6974 self .llm_service .model = "gpt-4o-mini"
7075
76+ self ._collection = None
77+ self .collection_name = collection_name
78+ # Configuration for chunking
79+ self .chunk_size = DEFAULT_CHUNK_SIZE
80+ self .chunk_overlap = DEFAULT_CHUNK_OVERLAP
81+ self .current_embedding_context = None
82+
83+ self .context_embedding = []
84+ self .current_conversation_context : Dict [str , Any ] = {}
85+
86+ # Memory queue infrastructure
87+ self ._conversation_queue = queue .Queue (maxsize = MAX_QUEUE_SIZE )
88+ self ._memory_thread = None
89+ self ._memory_stop_event = Event ()
90+
91+ # Start worker thread
92+ self ._start_memory_worker ()
93+
94+ def _initialize_collection (self ) -> Collection :
7195 # Create or get collection for storing memories
96+
97+ if self ._collection is not None :
98+ return self ._collection
7299 if os .getenv ("VOYAGE_API_KEY" ):
73100 from .voyageai_ef import VoyageEmbeddingFunction
74101
@@ -93,25 +120,12 @@ def __init__(
93120 else :
94121 self .embedding_function = embedding_functions .DefaultEmbeddingFunction ()
95122
96- self .collection = self .client .get_or_create_collection (
97- name = collection_name ,
123+ self ._collection = self .client .get_or_create_collection (
124+ name = self . collection_name ,
98125 embedding_function = self .embedding_function , # type:ignore
99126 )
100- # Configuration for chunking
101- self .chunk_size = DEFAULT_CHUNK_SIZE
102- self .chunk_overlap = DEFAULT_CHUNK_OVERLAP
103- self .current_embedding_context = None
104-
105- self .context_embedding = []
106- self .current_conversation_context : Dict [str , Any ] = {}
107-
108- # Memory queue infrastructure
109- self ._conversation_queue = queue .Queue (maxsize = MAX_QUEUE_SIZE )
110- self ._memory_thread = None
111- self ._memory_stop_event = Event ()
112-
113- # Start worker thread
114- self ._start_memory_worker ()
127+ self .cleanup_old_memories (months = 1 )
128+ return self ._collection
115129
116130 def _create_chunks (self , text : str ) -> List [str ]:
117131 """
@@ -219,6 +233,7 @@ def store_conversation(
219233 async def _store_conversation_internal (self , operation_data : Dict [str , Any ]):
220234 """Internal method to actually store conversation (runs in worker thread)."""
221235 try :
236+ collection = self ._initialize_collection ()
222237 user_message = operation_data ["user_message" ]
223238 assistant_response = operation_data ["assistant_response" ]
224239 agent_name = operation_data ["agent_name" ]
@@ -227,7 +242,7 @@ async def _store_conversation_internal(self, operation_data: Dict[str, Any]):
227242 # Use the existing storage logic but make it synchronous
228243 ids = []
229244 memory_data = {}
230- avaialble_ids = self . collection .get (
245+ avaialble_ids = collection .get (
231246 where = {
232247 "agent" : agent_name ,
233248 },
@@ -310,14 +325,14 @@ async def _store_conversation_internal(self, operation_data: Dict[str, Any]):
310325
311326 # Add to ChromaDB collection (existing logic)
312327 if ids :
313- self . collection .upsert (
328+ collection .upsert (
314329 ids = [ids [0 ]],
315330 documents = [conversation_document ],
316331 embeddings = conversation_embedding ,
317332 metadatas = [metadata ],
318333 )
319334 else :
320- self . collection .add (
335+ collection .add (
321336 documents = [conversation_document ],
322337 embeddings = conversation_embedding ,
323338 metadatas = [metadata ],
@@ -397,6 +412,7 @@ def retrieve_memory(
397412 Returns:
398413 Formatted string of relevant memories
399414 """
415+ collection = self ._initialize_collection ()
400416
401417 and_conditions : List [Dict [str , Any ]] = []
402418
@@ -410,7 +426,7 @@ def retrieve_memory(
410426 if to_date :
411427 and_conditions .append ({"date" : {"$lte" : to_date }})
412428
413- results = self . collection .query (
429+ results = collection .query (
414430 query_texts = [keywords ],
415431 n_results = 10 ,
416432 where = {"$and" : and_conditions }
@@ -512,11 +528,12 @@ def cleanup_old_memories(self, months: int = 1) -> int:
512528 Returns:
513529 Number of memories removed
514530 """
531+ collection = self ._initialize_collection ()
515532 # Calculate the cutoff date
516533 cutoff_date = datetime .now () - timedelta (days = 30 * months )
517534
518535 # Get all memories
519- all_memories = self . collection .get ()
536+ all_memories = collection .get ()
520537
521538 # Find IDs to remove
522539 ids_to_remove = []
@@ -537,7 +554,7 @@ def cleanup_old_memories(self, months: int = 1) -> int:
537554
538555 # Remove the old memories
539556 if ids_to_remove :
540- self . collection .delete (ids = ids_to_remove )
557+ collection .delete (ids = ids_to_remove )
541558
542559 return len (ids_to_remove )
543560
@@ -558,6 +575,7 @@ def forget_topic(
558575 Dict with success status and information about the operation
559576 """
560577 try :
578+ collection = self ._initialize_collection ()
561579 # Query for memories related to the topic
562580 and_conditions : List [Dict [str , Any ]] = []
563581
@@ -568,7 +586,7 @@ def forget_topic(
568586 and_conditions .append ({"date" : {"$gte" : from_date }})
569587 if to_date :
570588 and_conditions .append ({"date" : {"$lte" : to_date }})
571- results = self . collection .query (
589+ results = collection .query (
572590 query_texts = [topic ],
573591 n_results = 100 ,
574592 where = {"$and" : and_conditions }
@@ -594,7 +612,7 @@ def forget_topic(
594612 conversation_ids .add (conv_id )
595613
596614 # Get all memories to find those with matching conversation IDs
597- all_memories = self . collection .get ()
615+ all_memories = collection .get ()
598616
599617 # Find IDs to remove
600618 ids_to_remove = []
@@ -605,7 +623,7 @@ def forget_topic(
605623
606624 # Remove the memories
607625 if ids_to_remove :
608- self . collection .delete (ids = ids_to_remove )
626+ collection .delete (ids = ids_to_remove )
609627
610628 return {
611629 "success" : True ,
@@ -622,7 +640,8 @@ def forget_topic(
622640 }
623641
624642 def forget_ids (self , ids : List [str ], agent_name : str = "None" ) -> Dict [str , Any ]:
625- self .collection .delete (ids = ids , where = {"agent" : agent_name })
643+ collection = self ._initialize_collection ()
644+ collection .delete (ids = ids , where = {"agent" : agent_name })
626645
627646 return {
628647 "success" : True ,
0 commit comments