Skip to content

Commit 2cc7ae7

Browse files
committed
cache refactor
1 parent 27ec42d commit 2cc7ae7

File tree

4 files changed

+72
-101
lines changed

4 files changed

+72
-101
lines changed

augmenta/augmenta.py

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from augmenta.utils.prompt_formatter import format_examples, format_prompt
1111
from augmenta.agent import AugmentaAgent
1212
from augmenta.cache import CacheManager
13-
from augmenta.cache.process import handle_process_resumption, setup_caching, apply_cached_results
13+
from augmenta.cache.process import setup_cache_handling, apply_cached_results
1414
from augmenta.config.read_config import load_config, get_config_values
1515
import logfire
1616

@@ -73,52 +73,6 @@ def load_input_data(config_data: Dict[str, Any]) -> pd.DataFrame:
7373
raise ValueError(f"Failed to read input CSV file '{input_csv}': {str(e)}")
7474

7575

76-
def setup_cache_handling(
77-
config_data: Dict[str, Any],
78-
config_path: Path,
79-
cache_enabled: bool,
80-
process_id: Optional[str],
81-
auto_resume: bool,
82-
df: pd.DataFrame
83-
) -> Tuple[Optional[str], Optional[CacheManager], Dict[int, Any]]:
84-
"""Set up caching configuration.
85-
86-
Args:
87-
config_data: Configuration dictionary
88-
config_path: Path to configuration file
89-
cache_enabled: Whether caching is enabled
90-
process_id: Optional process ID for resuming
91-
auto_resume: Whether to auto-resume previous processes
92-
df: Loaded DataFrame
93-
94-
Returns:
95-
Tuple of (process ID, cache manager, cached results)
96-
"""
97-
if not cache_enabled:
98-
return None, None, {}
99-
100-
# Handle process resumption
101-
process_id = handle_process_resumption(
102-
config_data=config_data,
103-
config_path=config_path,
104-
csv_path=config_data["input_csv"],
105-
no_cache=not cache_enabled,
106-
resume=process_id,
107-
no_auto_resume=not auto_resume
108-
)
109-
110-
# Set up caching
111-
cache_manager, process_id, cached_results = setup_caching(
112-
config_data=config_data,
113-
csv_path=config_data["input_csv"],
114-
cache_enabled=cache_enabled,
115-
df_length=len(df),
116-
process_id=process_id
117-
)
118-
119-
return process_id, cache_manager, cached_results
120-
121-
12276
async def process_augmenta(
12377
config_path: Union[str, Path],
12478
cache_enabled: bool = True,
@@ -152,8 +106,7 @@ async def process_augmenta(
152106

153107
# Set up agent
154108
agent = setup_agent(config_data)
155-
156-
# Load input data
109+
# Load input data
157110
df = load_input_data(config_data)
158111

159112
# Handle caching setup

augmenta/cache/__init__.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,20 @@
66
from .exceptions import CacheError, DatabaseError, ValidationError
77
from .models import ProcessStatus
88
from .process import (
9-
handle_process_resumption,
9+
get_cache_manager,
1010
handle_cache_cleanup,
11-
setup_caching,
11+
setup_cache_handling,
1212
apply_cached_results
1313
)
1414
from .manager import CacheManager
1515

1616
__all__ = [
1717
# Core cache management
1818
'CacheManager',
19-
20-
# Process handling
21-
'handle_process_resumption',
19+
'get_cache_manager',
20+
# Process handling
2221
'handle_cache_cleanup',
23-
'setup_caching',
22+
'setup_cache_handling',
2423
'apply_cached_results',
2524

2625
# Models and exceptions

augmenta/cache/manager.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,22 @@ def __new__(cls, *args, **kwargs) -> 'CacheManager':
3333
cls._instance = super().__new__(cls)
3434
return cls._instance
3535

36-
def __init__(self, cache_dir: Optional[Path] = None) -> None:
36+
def __init__(self, cache_dir: Optional[Path] = None, auto_cleanup_days: int = 30) -> None:
3737
with self._lock:
3838
if hasattr(self, 'initialized'):
3939
return
4040

4141
self.cache_dir = cache_dir or Path(os.getcwd()) / '.augmenta' / 'cache'
4242
self.cache_dir.mkdir(parents=True, exist_ok=True)
4343
self.db_path = self.cache_dir / 'cache.db'
44+
self.auto_cleanup_days = auto_cleanup_days
4445

4546
self.write_queue = Queue()
4647
self.is_running = True
4748

4849
self.db = DatabaseConnection(self.db_path)
4950
self._start_writer_thread()
51+
self._cleanup_old_processes() # Auto-cleanup on startup
5052
atexit.register(self.cleanup)
5153
self.initialized = True
5254

@@ -181,6 +183,17 @@ def mark_process_completed(self, process_id: str) -> None:
181183
(datetime.now(), process_id)
182184
))
183185

186+
def _cleanup_old_processes(self) -> None:
187+
"""Clean up processes older than the specified days."""
188+
try:
189+
cutoff = datetime.now() - timedelta(days=self.auto_cleanup_days)
190+
with self.db.get_connection() as conn:
191+
result = conn.execute("DELETE FROM processes WHERE last_updated < ?", (cutoff,))
192+
if result.rowcount > 0:
193+
logger.info(f"Cleaned up {result.rowcount} old processes from cache")
194+
except Exception as e:
195+
logger.error(f"Error during automatic cache cleanup: {e}")
196+
184197
def cleanup_old_processes(self, days: int = 30) -> None:
185198
"""Clean up processes older than specified days."""
186199
validate_int(days, "Days")

augmenta/cache/process.py

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,73 +9,79 @@
99
from augmenta.utils.get_hash import get_hash
1010
from .manager import CacheManager
1111

12-
def handle_process_resumption(
13-
config_data: Dict[str, Any],
14-
config_path: Path,
15-
csv_path: Path,
16-
no_cache: bool = False,
17-
resume: Optional[str] = None,
18-
no_auto_resume: bool = False,
19-
cache_manager: Optional[Any] = None
20-
) -> Optional[str]:
21-
"""Handle process resumption logic."""
22-
if resume or no_cache or no_auto_resume:
23-
return resume
24-
25-
if cache_manager is None:
26-
cache_manager = CacheManager()
27-
28-
config_hash = get_hash(config_data)
29-
csv_hash = get_hash(csv_path)
30-
combined_hash = get_hash({'config': config_hash, 'csv': csv_hash})
31-
32-
if unfinished_process := cache_manager.find_unfinished_process(combined_hash):
33-
summary = cache_manager.get_process_summary(unfinished_process)
34-
click.echo(summary)
35-
if click.confirm("Would you like to resume this process?"):
36-
return unfinished_process.process_id
37-
38-
return None
12+
def get_cache_manager() -> CacheManager:
13+
"""Get the singleton cache manager instance."""
14+
return CacheManager()
3915

40-
def setup_caching(
16+
def setup_cache_handling(
4117
config_data: Dict[str, Any],
42-
csv_path: Path,
18+
config_path: Path,
4319
cache_enabled: bool,
44-
df_length: int,
45-
process_id: Optional[str] = None,
46-
cache_manager: Optional[Any] = None
47-
) -> Tuple[Optional[Any], Optional[str], Dict]:
48-
"""Set up caching for a process."""
20+
process_id: Optional[str],
21+
auto_resume: bool,
22+
df: pd.DataFrame
23+
) -> Tuple[Optional[str], Optional[CacheManager], Dict[int, Any]]:
24+
"""Set up caching configuration.
25+
26+
Args:
27+
config_data: Configuration dictionary
28+
config_path: Path to configuration file
29+
cache_enabled: Whether caching is enabled
30+
process_id: Optional process ID for resuming
31+
auto_resume: Whether to auto-resume previous processes
32+
df: Loaded DataFrame
33+
34+
Returns:
35+
Tuple of (process ID, cache manager, cached results)
36+
"""
4937
if not cache_enabled:
5038
return None, None, {}
5139

52-
if cache_manager is None:
53-
cache_manager = CacheManager()
40+
# Initialize cache manager once
41+
cache_manager = get_cache_manager()
42+
43+
# Skip resumption if explicitly provided or disabled
44+
if not process_id and auto_resume:
45+
# Generate hash for config and input data
46+
config_hash = get_hash(config_data)
47+
csv_hash = get_hash(config_data["input_csv"])
48+
combined_hash = get_hash({'config': config_hash, 'csv': csv_hash})
5449

55-
config_hash = get_hash(config_data)
56-
csv_hash = get_hash(csv_path)
57-
combined_hash = get_hash({'config': config_hash, 'csv': csv_hash})
50+
# Check for unfinished process
51+
if unfinished_process := cache_manager.find_unfinished_process(combined_hash):
52+
summary = cache_manager.get_process_summary(unfinished_process)
53+
click.echo(summary)
54+
if click.confirm("Would you like to resume this process?"):
55+
process_id = unfinished_process.process_id
5856

57+
# Set up or resume process
5958
if not process_id:
60-
process_id = cache_manager.start_process(combined_hash, df_length)
59+
# Start new process
60+
config_hash = get_hash(config_data)
61+
csv_hash = get_hash(config_data["input_csv"])
62+
combined_hash = get_hash({'config': config_hash, 'csv': csv_hash})
63+
process_id = cache_manager.start_process(combined_hash, len(df))
6164
else:
65+
# Update existing process
6266
with cache_manager.db.get_connection() as conn:
6367
conn.execute(
6468
"UPDATE processes SET status = 'running', last_updated = ? WHERE process_id = ?",
6569
(datetime.now(), process_id)
6670
)
67-
71+
72+
# Get cached results
6873
cached_results = cache_manager.get_cached_results(process_id)
69-
return cache_manager, process_id, cached_results
74+
75+
return process_id, cache_manager, cached_results
7076

7177
def apply_cached_results(
7278
df: pd.DataFrame,
7379
process_id: str,
74-
cache_manager: Optional[Any] = None
80+
cache_manager: Optional[CacheManager] = None
7581
) -> pd.DataFrame:
7682
"""Apply cached results to a DataFrame."""
7783
if cache_manager is None:
78-
cache_manager = CacheManager()
84+
cache_manager = get_cache_manager()
7985

8086
cached_results = cache_manager.get_cached_results(process_id)
8187
for row_index, result in cached_results.items():
@@ -86,7 +92,7 @@ def apply_cached_results(
8692
def handle_cache_cleanup(cache_manager: Optional[Any] = None) -> None:
8793
"""Clean up cache by removing the cache database file."""
8894
if cache_manager is None:
89-
cache_manager = CacheManager()
95+
cache_manager = get_cache_manager()
9096

9197
try:
9298
# Ensure all pending writes are processed

0 commit comments

Comments
 (0)