Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion worker/tests/policy/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_start_policy(policy_manager, sample_policy):
policy_manager.start_policy("policy1", sample_policy)

# Check that PolicyRunner.setup was called with correct arguments
mock_runner.setup.assert_called_once_with("policy1", None, sample_policy)
mock_runner.setup.assert_called_once_with("policy1", None, sample_policy, policy_manager.job_store)

# Ensure the policy runner was added to the manager's runners
assert "policy1" in policy_manager.runners
Expand Down
7 changes: 5 additions & 2 deletions worker/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ def mock_get_metric(name):
response = client.get("/api/v1/status")
mock_version_semver.assert_called_once()
assert response.status_code == 200
assert response.json()["version"] == "1.0.0"
assert "up_time_seconds" in response.json()
data = response.json()
assert data["version"] == "1.0.0"
assert "up_time_seconds" in data
assert "policies" in data
assert isinstance(data["policies"], list)
assert mock_api_requests.add.call_count == 1
assert mock_api_response_latency.record.call_count == 1

Expand Down
68 changes: 67 additions & 1 deletion worker/worker/policy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml

from worker.models import DiodeConfig, Policy, PolicyRequest
from worker.policy.job import JobStore
from worker.policy.runner import PolicyRunner

# Set up logging
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(self):
self.runners = dict[str, PolicyRunner]()
self.config = None
self.loaded_modules = set()
self.job_store = JobStore()

def get_loaded_modules(self):
"""Return the loaded modules."""
Expand Down Expand Up @@ -76,7 +78,7 @@ def start_policy(self, name: str, policy: Policy):
raise ValueError(f"policy '{name}' already exists")

runner = PolicyRunner()
runner.setup(name, self.config, policy)
runner.setup(name, self.config, policy, self.job_store)
self.loaded_modules.add(policy.config.package)
self.runners[name] = runner

Expand Down Expand Up @@ -132,3 +134,67 @@ def stop(self):
logger.info(f"Stopping policy '{name}'")
runner.stop()
self.runners = {}

def get_policy_statuses(self) -> list[dict]:
"""
Get all policies with their status and jobs.

Returns
-------
list[dict]: List of policy status dictionaries with name, status, and jobs.

"""
all_jobs = self.job_store.get_all_policies_with_jobs()
statuses = []

# Get statuses for all policies that have runners
for name in self.runners:
jobs = self.job_store.get_jobs_for_policy(name)
status = "unknown"
if len(jobs) > 0:
latest_job = jobs[-1]
status = latest_job.status.value
statuses.append(
{
"name": name,
"status": status,
"jobs": [
{
"id": job.id,
"status": job.status.value,
"reason": job.reason,
"entity_count": job.entity_count,
"created_at": job.created_at.isoformat(),
"updated_at": job.updated_at.isoformat(),
}
for job in jobs
],
}
)

# Also include policies that have jobs but no active runner
for name, jobs in all_jobs.items():
if name not in self.runners:
status = "unknown"
if len(jobs) > 0:
latest_job = jobs[-1]
status = latest_job.status.value
statuses.append(
{
"name": name,
"status": status,
"jobs": [
{
"id": job.id,
"status": job.status.value,
"reason": job.reason,
"entity_count": job.entity_count,
"created_at": job.created_at.isoformat(),
"updated_at": job.updated_at.isoformat(),
}
for job in jobs
],
}
)

return statuses
142 changes: 100 additions & 42 deletions worker/worker/policy/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from worker.backend import Backend, load_class
from worker.metrics import get_metric
from worker.models import DiodeConfig, Policy, Status
from worker.policy.job import JobStatus, JobStore

# Set up logging
logging.basicConfig(level=logging.INFO)
Expand All @@ -33,8 +34,9 @@ def __init__(self):
self.policy = None
self.status = Status.NEW
self.scheduler = BackgroundScheduler()
self.job_store: JobStore | None = None

def setup(self, name: str, diode_config: DiodeConfig, policy: Policy):
def setup(self, name: str, diode_config: DiodeConfig, policy: Policy, job_store: JobStore | None = None):
"""
Set up the policy runner.

Expand All @@ -43,8 +45,10 @@ def setup(self, name: str, diode_config: DiodeConfig, policy: Policy):
name: Policy name.
diode_config: Diode configuration data.
policy: Policy configuration data.
job_store: Optional JobStore for tracking jobs.

"""
self.job_store = job_store
self.name = name.replace("\r\n", "").replace("\n", "")
policy.config.package = policy.config.package.replace("\r\n", "").replace(
"\n", ""
Expand Down Expand Up @@ -120,54 +124,108 @@ def run(
policy: Policy configuration.

"""
policy_executions = get_metric("policy_executions")
if policy_executions:
policy_executions.add(1, {"policy": self.name})
job = self._create_job()
self._record_execution_metrics()

exec_start_time = time.perf_counter()
entity_count = 0
try:
entities = backend.run(self.name, policy)
metadata = {
"policy_name": self.name,
"worker_backend": self.metadata.name,
}

for chunk_num, entity_chunk in enumerate(self._create_message_chunks(entities), 1):
chunk_size_mb = self._estimate_message_size(entity_chunk) / (1024 * 1024)
logger.debug(
f"Ingesting chunk {chunk_num} with {len(entity_chunk)} entities (~{chunk_size_mb:.2f} MB)"
)
response = client.ingest(entities=entity_chunk, metadata=metadata)
if response.errors:
raise RuntimeError(f"Chunk {chunk_num} ingestion failed: {response.errors}")
logger.debug(f"Chunk {chunk_num} ingested successfully")

logger.info(f"Policy {self.name}: Successfully ingested {len(entities)} entities in {chunk_num} chunks")
run_success = get_metric("backend_execution_success")
if run_success:
run_success.add(
1,
{
"policy": self.name,
"backend": self.metadata.name,
"app_name": self.metadata.app_name,
"app_version": self.metadata.app_version,
},
)
metadata = self._build_metadata(job)

entity_count = len(entities)
if entity_count == 0:
logger.info(f"Policy {self.name}: No entities to ingest")
self._update_job_status(job, JobStatus.COMPLETED, None, 0)
return

chunk_num = self._ingest_entities(client, entities, metadata)
logger.info(f"Policy {self.name}: Successfully ingested {entity_count} entities in {chunk_num} chunks")

self._handle_success(job, entity_count)
except Exception as e:
logger.error(f"Policy {self.name}: {e}")
run_failure = get_metric("backend_execution_failure")
if run_failure:
run_failure.add(
1,
{
"policy": self.name,
"backend": self.metadata.name,
"app_name": self.metadata.app_name,
"app_version": self.metadata.app_version,
},
)
self._handle_failure(job, e, entity_count)

self._record_latency_metric(exec_start_time)

def _create_job(self):
"""Create a job if job_store is available."""
if self.job_store:
return self.job_store.create_job(self.name)
return None

def _record_execution_metrics(self):
"""Record policy execution metrics."""
policy_executions = get_metric("policy_executions")
if policy_executions:
policy_executions.add(1, {"policy": self.name})

def _build_metadata(self, job):
"""Build metadata dictionary for ingestion."""
metadata = {
"policy_name": self.name,
"worker_backend": self.metadata.name,
}
if job:
metadata["job_id"] = job.id
return metadata

def _update_job_status(self, job, status: JobStatus, reason, entity_count: int):
"""Update job status if job_store and job are available."""
if self.job_store and job:
self.job_store.update_job(self.name, job.id, status, reason=reason, entity_count=entity_count)

def _ingest_entities(
self, client: DiodeClient | DiodeDryRunClient, entities: list[ingester_pb2.Entity], metadata: dict
) -> int:
"""Ingest entities in chunks and return the number of chunks."""
chunk_num = 0
for chunk_num, entity_chunk in enumerate(self._create_message_chunks(entities), 1):
chunk_size_mb = self._estimate_message_size(entity_chunk) / (1024 * 1024)
logger.debug(
f"Ingesting chunk {chunk_num} with {len(entity_chunk)} entities (~{chunk_size_mb:.2f} MB)"
)
response = client.ingest(entities=entity_chunk, metadata=metadata)
if response.errors:
raise RuntimeError(f"Chunk {chunk_num} ingestion failed: {response.errors}")
logger.debug(f"Chunk {chunk_num} ingested successfully")
return chunk_num

def _handle_success(self, job, entity_count: int):
"""Handle successful execution by updating job status and recording success metrics."""
self._update_job_status(job, JobStatus.COMPLETED, None, entity_count)

run_success = get_metric("backend_execution_success")
if run_success:
run_success.add(
1,
{
"policy": self.name,
"backend": self.metadata.name,
"app_name": self.metadata.app_name,
"app_version": self.metadata.app_version,
},
)

def _handle_failure(self, job, error: Exception, entity_count: int):
"""Handle failed execution by updating job status and recording failure metrics."""
self._update_job_status(job, JobStatus.FAILED, error, entity_count)

run_failure = get_metric("backend_execution_failure")
if run_failure:
run_failure.add(
1,
{
"policy": self.name,
"backend": self.metadata.name,
"app_name": self.metadata.app_name,
"app_version": self.metadata.app_version,
},
)

def _record_latency_metric(self, exec_start_time: float):
"""Record backend execution latency metric."""
backend_execution_latency = get_metric("backend_execution_latency")
if backend_execution_latency:
exec_duration = (time.perf_counter() - exec_start_time) * 1000
Expand Down
3 changes: 2 additions & 1 deletion worker/worker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ def read_status():

Returns
-------
dict: The status of the server.
dict: The status of the server including policies with jobs.

"""
time_diff = datetime.now() - start_time
return {
"version": version_semver(),
"up_time_seconds": round(time_diff.total_seconds()),
"policies": manager.get_policy_statuses(),
}


Expand Down
Loading