diff --git a/server/alembic/versions/b6f4c2d8e9a1_namespace_observability_events.py b/server/alembic/versions/b6f4c2d8e9a1_namespace_observability_events.py index c4bb1951..16420ac5 100644 --- a/server/alembic/versions/b6f4c2d8e9a1_namespace_observability_events.py +++ b/server/alembic/versions/b6f4c2d8e9a1_namespace_observability_events.py @@ -9,6 +9,7 @@ from __future__ import annotations import sqlalchemy as sa + from alembic import op # revision identifiers, used by Alembic. @@ -18,26 +19,40 @@ depends_on = None +_EVENT_TABLE = "control_execution_events" +_NAMESPACE_COLUMN = "namespace_key" +_TARGET_PK_COLUMNS = ["namespace_key", "control_execution_id"] +_LEGACY_PK_COLUMNS = ["control_execution_id"] + + +def _column_names() -> set[str]: + return {column["name"] for column in sa.inspect(op.get_bind()).get_columns(_EVENT_TABLE)} + + +def _replace_primary_key(columns: list[str]) -> None: + pk = sa.inspect(op.get_bind()).get_pk_constraint(_EVENT_TABLE) + current_columns = list(pk.get("constrained_columns") or []) + if current_columns == columns: + return + + constraint_name = pk.get("name") + if constraint_name: + op.drop_constraint(constraint_name, _EVENT_TABLE, type_="primary") + op.create_primary_key("control_execution_events_pkey", _EVENT_TABLE, columns) + + def upgrade() -> None: - op.add_column( - "control_execution_events", - sa.Column( - "namespace_key", - sa.String(length=255), - server_default=sa.text("'default'"), - nullable=False, - ), - ) - op.drop_constraint( - "control_execution_events_pkey", - "control_execution_events", - type_="primary", - ) - op.create_primary_key( - "control_execution_events_pkey", - "control_execution_events", - ["namespace_key", "control_execution_id"], - ) + if _NAMESPACE_COLUMN not in _column_names(): + op.add_column( + _EVENT_TABLE, + sa.Column( + _NAMESPACE_COLUMN, + sa.String(length=255), + server_default=sa.text("'default'"), + nullable=False, + ), + ) + _replace_primary_key(_TARGET_PK_COLUMNS) with op.get_context().autocommit_block(): op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_events_agent_time") op.execute( @@ -57,14 +72,6 @@ def downgrade() -> None: ON control_execution_events (agent_name, timestamp DESC) """ ) - op.drop_constraint( - "control_execution_events_pkey", - "control_execution_events", - type_="primary", - ) - op.create_primary_key( - "control_execution_events_pkey", - "control_execution_events", - ["control_execution_id"], - ) - op.drop_column("control_execution_events", "namespace_key") + if _NAMESPACE_COLUMN in _column_names(): + _replace_primary_key(_LEGACY_PK_COLUMNS) + op.drop_column(_EVENT_TABLE, _NAMESPACE_COLUMN) diff --git a/server/tests/test_data_model_v1_alembic_migration.py b/server/tests/test_data_model_v1_alembic_migration.py index 7ca22e3a..9e6764fa 100644 --- a/server/tests/test_data_model_v1_alembic_migration.py +++ b/server/tests/test_data_model_v1_alembic_migration.py @@ -6,12 +6,12 @@ from pathlib import Path import pytest -from alembic import command from alembic.config import Config from sqlalchemy import create_engine, inspect, text from sqlalchemy.engine import Engine, make_url from agent_control_server.config import db_config +from alembic import command SERVER_DIR = Path(__file__).resolve().parents[1] PRE_MIGRATION_REVISION = "c1e9f9c4a1d2" @@ -92,6 +92,17 @@ def _pk_columns(engine: Engine, table: str) -> list[str]: return list(inspect(engine).get_pk_constraint(table)["constrained_columns"]) +def _assert_observability_namespace_schema(engine: Engine) -> None: + assert "namespace_key" in _column_names(engine, "control_execution_events") + assert _pk_columns(engine, "control_execution_events") == [ + "namespace_key", + "control_execution_id", + ] + indexes = _index_names(engine, "control_execution_events") + assert "ix_events_namespace_agent_time" in indexes + assert "ix_events_agent_time" not in indexes + + def test_upgrade_applies_namespace_columns_and_constraints( alembic_config: Config, temp_engine: Engine ) -> None: @@ -229,14 +240,56 @@ def test_observability_namespace_migration_scopes_event_primary_key( ) -> None: command.upgrade(alembic_config, OBSERVABILITY_NAMESPACE_REVISION) - assert "namespace_key" in _column_names(temp_engine, "control_execution_events") - assert _pk_columns(temp_engine, "control_execution_events") == [ - "namespace_key", - "control_execution_id", - ] - indexes = _index_names(temp_engine, "control_execution_events") - assert "ix_events_namespace_agent_time" in indexes - assert "ix_events_agent_time" not in indexes + _assert_observability_namespace_schema(temp_engine) + + +def test_observability_namespace_migration_recovers_when_column_preexists( + alembic_config: Config, temp_engine: Engine +) -> None: + command.upgrade(alembic_config, MIGRATION_REVISION) + + with temp_engine.begin() as conn: + conn.execute( + text( + "ALTER TABLE control_execution_events " + "ADD COLUMN namespace_key VARCHAR(255) DEFAULT 'default' NOT NULL" + ) + ) + + command.upgrade(alembic_config, OBSERVABILITY_NAMESPACE_REVISION) + + _assert_observability_namespace_schema(temp_engine) + + +def test_observability_namespace_migration_recovers_when_primary_key_preexists( + alembic_config: Config, temp_engine: Engine +) -> None: + command.upgrade(alembic_config, MIGRATION_REVISION) + + with temp_engine.begin() as conn: + conn.execute( + text( + "ALTER TABLE control_execution_events " + "ADD COLUMN namespace_key VARCHAR(255) DEFAULT 'default' NOT NULL" + ) + ) + conn.execute( + text( + "ALTER TABLE control_execution_events " + "DROP CONSTRAINT control_execution_events_pkey" + ) + ) + conn.execute( + text( + "ALTER TABLE control_execution_events " + "ADD CONSTRAINT control_execution_events_pkey " + "PRIMARY KEY (namespace_key, control_execution_id)" + ) + ) + + command.upgrade(alembic_config, OBSERVABILITY_NAMESPACE_REVISION) + + _assert_observability_namespace_schema(temp_engine) def test_downgrade_rejects_cross_namespace_agents_duplicates(