diff --git a/integration-tests/tests/_macros.luxinc b/integration-tests/tests/_macros.luxinc index 3c80f3287f..cd2a1d4f17 100644 --- a/integration-tests/tests/_macros.luxinc +++ b/integration-tests/tests/_macros.luxinc @@ -186,7 +186,12 @@ [macro start_electric_script shell_name port env] [shell $shell_name] - !ELECTRIC_PORT=$port $env ../scripts/electric_dev.sh --no-color + !ELECTRIC_DB_POOL_SIZE=4 \ + ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS=1 \ + ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS=1 \ + ELECTRIC_PORT=$port \ + $env \ + ../scripts/electric_dev.sh --no-color [endmacro] [macro stop_electric] diff --git a/integration-tests/tests/insufficient-connection-resources.lux b/integration-tests/tests/insufficient-connection-resources.lux index 1ad6e4b22d..064d6978e4 100644 --- a/integration-tests/tests/insufficient-connection-resources.lux +++ b/integration-tests/tests/insufficient-connection-resources.lux @@ -7,10 +7,10 @@ ### ## Start a new Postgres cluster with limited max connections -[invoke setup_pg_with_shell_name "pg" "" "-c max_connections=10" ""] +[invoke setup_pg_with_shell_name "pg" "" "-c max_connections=4" ""] ## Start the sync service. -[invoke setup_electric] +[invoke setup_electric_with_env "ELECTRIC_DB_POOL_SIZE=5"] [shell electric] ??[info] Lock acquired from postgres diff --git a/packages/sync-service/.env.dev b/packages/sync-service/.env.dev index 3d989b1c42..325f617968 100644 --- a/packages/sync-service/.env.dev +++ b/packages/sync-service/.env.dev @@ -14,3 +14,4 @@ ELECTRIC_INSECURE=true ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS=1 ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS=1 +ELECTRIC_DB_POOL_SIZE=4 diff --git a/packages/sync-service/.env.test b/packages/sync-service/.env.test index 7bf60edc10..fadc459e45 100644 --- a/packages/sync-service/.env.test +++ b/packages/sync-service/.env.test @@ -10,3 +10,4 @@ ELECTRIC_TEST_LOG_LEVEL=error DATABASE_URL=postgresql://postgres:password@localhost:54321/postgres?sslmode=disable ELECTRIC_QUERY_DATABASE_URL=postgresql://postgres:password@localhost:65432/postgres?sslmode=disable +ELECTRIC_DB_POOL_SIZE=4 diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index c49f7ca9d6..2eb39ed6cd 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -149,8 +149,15 @@ defmodule Electric.Application do # Gets the API-side configuration based on the same opts + application config # used for `configuration/1` defp api_configuration(opts) do - Electric.StackSupervisor.build_shared_opts(core_configuration(opts)) + {feature_flags, core_config} = + opts + |> core_configuration() + |> Electric.StackSupervisor.build_shared_opts() + |> Keyword.pop(:feature_flags) + + core_config |> Keyword.merge( + allow_subqueries?: Electric.Config.feature_flag_allow_subqueries() in feature_flags, long_poll_timeout: get_env(opts, :long_poll_timeout), max_age: get_env(opts, :cache_max_age), stale_age: get_env(opts, :cache_stale_age), diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index ca3790e4b1..29f6b8db8f 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -38,7 +38,9 @@ defmodule Electric.Config do @build_env Mix.env() - @known_feature_flags ~w[allow_subqueries suspend_consumers] + @feature_flag_allow_subqueries :allow_subqueries + @feature_flag_suspend_consumers :suspend_consumers + @known_feature_flags [@feature_flag_allow_subqueries, @feature_flag_suspend_consumers] @defaults [ ## Database @@ -91,6 +93,7 @@ defmodule Electric.Config do ## Misc process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0, feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []), + allow_subqueries?: Mix.env() == :test, publication_refresh_period: 60_000, schema_reconciler_period: 60_000, snapshot_timeout_to_first_data: :timer.seconds(30) @@ -98,6 +101,9 @@ defmodule Electric.Config do @installation_id_key "electric_installation_id" + def feature_flag_allow_subqueries, do: @feature_flag_allow_subqueries + def feature_flag_suspend_consumers, do: @feature_flag_suspend_consumers + def default(key) do case Keyword.fetch!(@defaults, key) do fun when is_function(fun, 0) -> fun.() @@ -473,7 +479,7 @@ defmodule Electric.Config do |> String.split(",") |> Enum.map(&String.trim/1) |> Enum.reject(&(&1 == "")) - |> Enum.split_with(&(&1 in @known_feature_flags)) + |> split_into_known_and_unknown_flags() |> case do {known, []} -> known @@ -481,7 +487,22 @@ defmodule Electric.Config do {_, unknown} -> raise Dotenvy.Error, message: - "Unknown feature flags specified: #{inspect(unknown)}. Known feature flags: #{inspect(@known_feature_flags)}" + "Unknown feature flags specified: #{format_flags(unknown)}. " <> + "Known feature flags: #{format_flags(@known_feature_flags)}" end end + + defp split_into_known_and_unknown_flags(inputs) do + known_flags = Map.new(@known_feature_flags, &{Atom.to_string(&1), &1}) + + Enum.reduce(inputs, {[], []}, fn input, {known, unknown} -> + if flag = Map.get(known_flags, input) do + {[flag | known], unknown} + else + {known, [input | unknown]} + end + end) + end + + defp format_flags(list), do: Enum.join(list, ", ") end diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index c8799174ee..2b7fc47aa0 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -20,8 +20,7 @@ defmodule Electric.Connection.Manager do connection_opts: [...], replication_opts: [...], pool_opts: [...], - timeline_opts: [...], - shape_cache_opts: [...]} + timeline_opts: [...] ] Supervisor.start_link(children, strategy: :one_for_one) @@ -75,10 +74,8 @@ defmodule Electric.Connection.Manager do :replication_opts, # Database connection pool options :pool_opts, - # Options specific to `Electric.Timeline` + # Basically stack_id and persistent_kv :timeline_opts, - # Options passed to the Shapes.Supervisor's start_link() function - :shape_cache_opts, # PID of the replication client :replication_client_pid, # Timer reference for the periodic replication client status check @@ -98,17 +95,10 @@ defmodule Electric.Connection.Manager do :pg_system_identifier, # PostgreSQL timeline ID :pg_timeline_id, - # User setting that determines whether the table publishing is to be automatically - # managed by the stack or whether it's the user's responsibility. - :manual_table_publishing?, # ID used for process labeling and sibling discovery :stack_id, # Registry used for stack events :stack_events_registry, - :inspector, - :tweaks, - :max_shapes, - :persistent_kv, purge_all_shapes?: false, # PIDs of the database connection pools pool_pids: %{admin: nil, snapshot: nil}, @@ -134,7 +124,6 @@ defmodule Electric.Connection.Manager do | {:replication_opts, Keyword.t()} | {:pool_opts, Keyword.t()} | {:timeline_opts, Keyword.t()} - | {:shape_cache_opts, Keyword.t()} @type options :: [option] @@ -270,7 +259,6 @@ defmodule Electric.Connection.Manager do pool_opts = Keyword.fetch!(opts, :pool_opts) timeline_opts = Keyword.fetch!(opts, :timeline_opts) - shape_cache_opts = Keyword.fetch!(opts, :shape_cache_opts) connection_backoff = Keyword.get(opts, :connection_backoff, ConnectionBackoff.init(1000, 10_000)) @@ -281,15 +269,9 @@ defmodule Electric.Connection.Manager do current_step: {:start_replication_client, nil}, pool_opts: pool_opts, timeline_opts: timeline_opts, - inspector: Keyword.fetch!(opts, :inspector), - shape_cache_opts: shape_cache_opts, connection_backoff: {connection_backoff, nil}, stack_id: stack_id, - stack_events_registry: Keyword.fetch!(opts, :stack_events_registry), - tweaks: Keyword.fetch!(opts, :tweaks), - persistent_kv: Keyword.fetch!(opts, :persistent_kv), - manual_table_publishing?: Keyword.get(opts, :manual_table_publishing?, false), - max_shapes: Keyword.fetch!(opts, :max_shapes) + stack_events_registry: Keyword.fetch!(opts, :stack_events_registry) } |> initialize_connection_opts(opts) @@ -458,7 +440,7 @@ defmodule Electric.Connection.Manager do Electric.CoreSupervisor.stop_shapes_supervisor(stack_id: state.stack_id) # Clean up the on-disk storage from all shapes. - Electric.Shapes.Supervisor.reset_storage(shape_cache_opts: state.shape_cache_opts) + Electric.Shapes.Supervisor.reset_storage(state.stack_id) # The ShapeStatusOwner process lives independently of connection or replication # supervisor. Purge all shapes from it before starting the replication supervisor. @@ -466,10 +448,7 @@ defmodule Electric.Connection.Manager do end if timeline_changed? do - Electric.Replication.PersistentReplicationState.reset( - stack_id: state.stack_id, - persistent_kv: state.persistent_kv - ) + Electric.Replication.PersistentReplicationState.reset(state.timeline_opts) dispatch_stack_event( {:warning, @@ -483,23 +462,11 @@ defmodule Electric.Connection.Manager do ) end - repl_sup_opts = [ - stack_id: state.stack_id, - shape_cache_opts: state.shape_cache_opts, - inspector: state.inspector, - pool_opts: state.pool_opts, - replication_opts: state.replication_opts, - tweaks: state.tweaks, - manual_table_publishing?: state.manual_table_publishing?, - persistent_kv: state.persistent_kv, - max_shapes: state.max_shapes - ] - - case Electric.CoreSupervisor.start_shapes_supervisor(repl_sup_opts) do + case Electric.CoreSupervisor.start_shapes_supervisor(stack_id: state.stack_id) do {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> + {:error, :running} -> # Shapes supervisor is already running, which can happen if the # Connection.Manager is restarting :ok @@ -509,7 +476,7 @@ defmodule Electric.Connection.Manager do exit(reason) end - StatusMonitor.mark_integrety_checks_passed(state.stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(state.stack_id, self()) state = %{ state diff --git a/packages/sync-service/lib/electric/core_supervisor.ex b/packages/sync-service/lib/electric/core_supervisor.ex index 6bd57c48e9..4bfa0a3e1b 100644 --- a/packages/sync-service/lib/electric/core_supervisor.ex +++ b/packages/sync-service/lib/electric/core_supervisor.ex @@ -27,7 +27,8 @@ defmodule Electric.CoreSupervisor do connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts) children = [ - {Electric.Connection.Supervisor, connection_manager_opts} + {Electric.Connection.Supervisor, connection_manager_opts}, + {Electric.Shapes.Supervisor, opts} |> Supervisor.child_spec(restart: :transient) ] Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) @@ -38,61 +39,7 @@ defmodule Electric.CoreSupervisor do initialization sequence. """ def start_shapes_supervisor(opts) do - stack_id = Keyword.fetch!(opts, :stack_id) - shape_cache_opts = Keyword.fetch!(opts, :shape_cache_opts) - replication_opts = Keyword.fetch!(opts, :replication_opts) - inspector = Keyword.fetch!(opts, :inspector) - persistent_kv = Keyword.fetch!(opts, :persistent_kv) - tweaks = Keyword.fetch!(opts, :tweaks) - - consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]} - - shape_cleaner_spec = - {Electric.ShapeCache.ShapeCleaner.CleanupTaskSupervisor, - [stack_id: stack_id] ++ Keyword.get(tweaks, :shape_cleaner_opts, [])} - - shape_cache_spec = {Electric.ShapeCache, shape_cache_opts} - - publication_manager_spec = - {Electric.Replication.PublicationManager, - stack_id: stack_id, - publication_name: Keyword.fetch!(replication_opts, :publication_name), - manual_table_publishing?: Keyword.fetch!(opts, :manual_table_publishing?), - db_pool: Electric.Connection.Manager.admin_pool(stack_id), - update_debounce_timeout: Keyword.get(tweaks, :publication_alter_debounce_ms, 0), - refresh_period: Keyword.get(tweaks, :publication_refresh_period, 60_000)} - - shape_log_collector_spec = - {Electric.Replication.ShapeLogCollector, - stack_id: stack_id, inspector: inspector, persistent_kv: persistent_kv} - - schema_reconciler_spec = - {Electric.Replication.SchemaReconciler, - stack_id: stack_id, - inspector: inspector, - period: Keyword.get(tweaks, :schema_reconciler_period, 60_000)} - - expiry_manager_spec = - {Electric.ShapeCache.ExpiryManager, - max_shapes: Keyword.fetch!(opts, :max_shapes), stack_id: stack_id} - - child_spec = - Supervisor.child_spec( - { - Electric.Shapes.Supervisor, - stack_id: stack_id, - consumer_supervisor: consumer_supervisor_spec, - shape_cleaner: shape_cleaner_spec, - shape_cache: shape_cache_spec, - publication_manager: publication_manager_spec, - log_collector: shape_log_collector_spec, - schema_reconciler: schema_reconciler_spec, - expiry_manager: expiry_manager_spec - }, - restart: :transient - ) - - Supervisor.start_child(name(opts), child_spec) + Supervisor.restart_child(name(opts), Electric.Shapes.Supervisor) end @doc """ @@ -102,16 +49,6 @@ defmodule Electric.CoreSupervisor do Returns :ok if the supervisor was stopped or wasn't running. """ def stop_shapes_supervisor(opts) do - case Supervisor.terminate_child(name(opts), Electric.Shapes.Supervisor) do - :ok -> - Supervisor.delete_child(name(opts), Electric.Shapes.Supervisor) - :ok - - {:error, :not_found} -> - :ok - - {:error, reason} -> - {:error, reason} - end + Supervisor.terminate_child(name(opts), Electric.Shapes.Supervisor) end end diff --git a/packages/sync-service/lib/electric/plug/health_check_plug.ex b/packages/sync-service/lib/electric/plug/health_check_plug.ex index 7be0ec76cd..265a794fd6 100644 --- a/packages/sync-service/lib/electric/plug/health_check_plug.ex +++ b/packages/sync-service/lib/electric/plug/health_check_plug.ex @@ -17,6 +17,7 @@ defmodule Electric.Plug.HealthCheckPlug do {status_code, status_text} = case StatusMonitor.status(config[:stack_id]) do %{conn: :waiting_on_lock, shape: _} -> {202, "waiting"} + %{conn: :waiting_on_integrity_checks, shape: _} -> {202, "starting"} %{conn: :starting, shape: _} -> {202, "starting"} %{conn: _, shape: :starting} -> {202, "starting"} %{conn: :up, shape: :up} -> {200, "active"} diff --git a/packages/sync-service/lib/electric/shape_cache/expiry_manager.ex b/packages/sync-service/lib/electric/shape_cache/expiry_manager.ex index 7b8c444a38..bc8ea6daf4 100644 --- a/packages/sync-service/lib/electric/shape_cache/expiry_manager.ex +++ b/packages/sync-service/lib/electric/shape_cache/expiry_manager.ex @@ -29,37 +29,36 @@ defmodule Electric.ShapeCache.ExpiryManager do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - state = - %{ - stack_id: stack_id, - max_shapes: Keyword.fetch!(opts, :max_shapes), - period: Keyword.fetch!(opts, :period) - } + state = %{ + stack_id: stack_id, + period: Keyword.fetch!(opts, :period) + } - if not is_nil(state.max_shapes), do: schedule_next_check(state) + schedule_next_check(state.period) {:ok, state} end - defp schedule_next_check(state) do - Process.send_after(self(), :maybe_expire_shapes, state.period) + defp schedule_next_check(period) do + Process.send_after(self(), :maybe_expire_shapes, period) end def handle_info(:maybe_expire_shapes, state) do - maybe_expire_shapes(state) - schedule_next_check(state) + maybe_expire_shapes(state.stack_id, max_shapes_config(state.stack_id)) + schedule_next_check(state.period) {:noreply, state} end - defp maybe_expire_shapes(%{max_shapes: nil}), do: :ok + defp maybe_expire_shapes(_stack_id, max_shapes) when is_nil(max_shapes) or max_shapes == 0, + do: :ok - defp maybe_expire_shapes(%{max_shapes: max_shapes} = state) do - case StatusMonitor.status(state.stack_id) do + defp maybe_expire_shapes(stack_id, max_shapes) when is_integer(max_shapes) and max_shapes > 0 do + case StatusMonitor.status(stack_id) do %{shape: :up} -> - shape_count = shape_count(state) + shape_count = shape_count(stack_id) if shape_count > max_shapes do - expire_shapes(shape_count, state) + expire_shapes(shape_count, max_shapes, stack_id) end status -> @@ -70,38 +69,40 @@ defmodule Electric.ShapeCache.ExpiryManager do end end - defp expire_shapes(shape_count, state) do - number_to_expire = shape_count - state.max_shapes - {handles_to_expire, min_age} = least_recently_used(state, number_to_expire) + defp expire_shapes(shape_count, max_shapes, stack_id) do + number_to_expire = shape_count - max_shapes + {handles_to_expire, min_age} = least_recently_used(stack_id, number_to_expire) Logger.info( "Expiring #{number_to_expire} shapes as the number of shapes " <> - "has exceeded the limit (#{state.max_shapes})" + "has exceeded the limit (#{max_shapes})" ) OpenTelemetry.with_span( "expiry_manager.expire_shapes", [ - max_shapes: state.max_shapes, + max_shapes: max_shapes, shape_count: shape_count, number_to_expire: number_to_expire, elapsed_minutes_since_use: min_age ], - fn -> - Electric.ShapeCache.ShapeCleaner.remove_shapes(state.stack_id, handles_to_expire) - end + fn -> Electric.ShapeCache.ShapeCleaner.remove_shapes(stack_id, handles_to_expire) end ) end - defp least_recently_used(%{stack_id: stack_id}, number_to_expire) do + defp least_recently_used(stack_id, number_to_expire) do OpenTelemetry.with_span("expiry_manager.get_least_recently_used", [], fn -> ShapeStatus.least_recently_used(stack_id, number_to_expire) end) end - defp shape_count(%{stack_id: stack_id}) do + defp shape_count(stack_id) do OpenTelemetry.with_span("expiry_manager.get_shape_count", [], fn -> ShapeStatus.count_shapes(stack_id) end) end + + defp max_shapes_config(stack_id) do + Electric.StackConfig.lookup(stack_id, :max_shapes) + end end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_cleaner/cleanup_task_supervisor.ex b/packages/sync-service/lib/electric/shape_cache/shape_cleaner/cleanup_task_supervisor.ex index eb55f374a6..9069f28e03 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_cleaner/cleanup_task_supervisor.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_cleaner/cleanup_task_supervisor.ex @@ -19,16 +19,7 @@ defmodule Electric.ShapeCache.ShapeCleaner.CleanupTaskSupervisor do end def start_link(opts) do - {:ok, stack_id} = Keyword.fetch(opts, :stack_id) - - if on_cleanup_callback = Keyword.get(opts, :on_cleanup, nil) do - Electric.StackConfig.put( - stack_id, - {Electric.ShapeCache.ShapeCleaner, :on_cleanup}, - on_cleanup_callback - ) - end - + stack_id = Keyword.fetch!(opts, :stack_id) Task.Supervisor.start_link(name: name(stack_id)) end diff --git a/packages/sync-service/lib/electric/shapes/api.ex b/packages/sync-service/lib/electric/shapes/api.ex index 5d3081b858..5734d43b95 100644 --- a/packages/sync-service/lib/electric/shapes/api.ex +++ b/packages/sync-service/lib/electric/shapes/api.ex @@ -27,7 +27,7 @@ defmodule Electric.Shapes.Api do required: true ], allow_shape_deletion: [type: :boolean], - feature_flags: [type: {:list, :string}, default: []], + allow_subqueries?: [type: :boolean], keepalive_interval: [type: :integer], long_poll_timeout: [type: :integer], sse_timeout: [type: :integer], @@ -58,9 +58,9 @@ defmodule Electric.Shapes.Api do :stack_events_registry, :stack_id, :storage, - :feature_flags, :max_concurrent_requests, allow_shape_deletion: false, + allow_subqueries?: false, keepalive_interval: 21_000, long_poll_timeout: 20_000, sse_timeout: 60_000, @@ -152,7 +152,11 @@ defmodule Electric.Shapes.Api do def predefined_shape(%Api{} = api, shape_params) do with :ok <- hold_until_stack_ready(api), {:ok, params} <- normalise_shape_params(shape_params), - opts = Keyword.merge(params, inspector: api.inspector, feature_flags: api.feature_flags), + opts = + Keyword.merge(params, + inspector: api.inspector, + allow_subqueries?: api.allow_subqueries? + ), {:ok, shape} <- Shapes.Shape.new(opts) do {:ok, %{api | shape: shape}} end diff --git a/packages/sync-service/lib/electric/shapes/api/params.ex b/packages/sync-service/lib/electric/shapes/api/params.ex index c8390fbe20..08a3a48127 100644 --- a/packages/sync-service/lib/electric/shapes/api/params.ex +++ b/packages/sync-service/lib/electric/shapes/api/params.ex @@ -322,7 +322,7 @@ defmodule Electric.Shapes.Api.Params do columns: columns, replica: replica, inspector: api.inspector, - feature_flags: api.feature_flags, + allow_subqueries?: api.allow_subqueries?, storage: %{compaction: if(compaction_enabled?, do: :enabled, else: :disabled)}, log_mode: fetch_field!(changeset, :log) ) do diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 797c190e26..b9faf2739b 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -380,9 +380,7 @@ defmodule Electric.Shapes.Consumer do end defp consumer_suspend_enabled?(%{stack_id: stack_id}) do - # TODO: remove additional feature flag check once feature is stabilised - Electric.StackConfig.lookup(stack_id, :shape_enable_suspend?, true) and - "suspend_consumers" in Electric.StackConfig.lookup(stack_id, :feature_flags, []) + Electric.StackConfig.lookup!(stack_id, :shape_enable_suspend?) end defp consumer_can_suspend?(state) do @@ -813,7 +811,7 @@ defmodule Electric.Shapes.Consumer do end defp clean_table(table_oid, state) do - inspector = Electric.StackConfig.lookup!(state.stack_id, :inspector) + inspector = Electric.Postgres.Inspector.for_stack(state.stack_id) Inspector.clean(table_oid, inspector) end diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index dc71e5309f..803b36e86b 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -149,7 +149,9 @@ defmodule Electric.Shapes.Shape do type: :mod_arg, default: {Electric.Postgres.Inspector, Electric.DbPool} ], - feature_flags: [type: {:list, :string}, default: Electric.Config.get_env(:feature_flags)], + # The default value is only used in tests. In normal usage we expect this option to be + # passed in by the caller. + allow_subqueries?: [type: :boolean, default: Electric.Config.get_env(:allow_subqueries?)], storage: [ type: { :or, @@ -247,7 +249,7 @@ defmodule Electric.Shapes.Shape do defp validate_where_clause(where, %{inspector: inspector} = opts, refs) do with {:ok, where} <- Parser.parse_query(where), {:ok, subqueries} <- Parser.extract_subqueries(where), - :ok <- check_feature_flag(subqueries, opts), + :ok <- check_if_subqueries_are_allowed(subqueries, opts), {:ok, shape_dependencies} <- build_shape_dependencies(subqueries, opts), {:ok, dependency_refs} <- build_dependency_refs(shape_dependencies, inspector), all_refs = Map.merge(refs, dependency_refs), @@ -266,12 +268,11 @@ defmodule Electric.Shapes.Shape do end end - defp check_feature_flag(subqueries, opts) do - if subqueries != [] and - not Enum.member?(opts.feature_flags, "allow_subqueries") do - {:error, {:where, "Subqueries are not supported"}} - else + defp check_if_subqueries_are_allowed(subqueries, opts) do + if subqueries == [] or opts.allow_subqueries? do :ok + else + {:error, {:where, "Subqueries are not supported"}} end end diff --git a/packages/sync-service/lib/electric/shapes/supervisor.ex b/packages/sync-service/lib/electric/shapes/supervisor.ex index 06170e579d..36e0d58bd9 100644 --- a/packages/sync-service/lib/electric/shapes/supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/supervisor.ex @@ -16,9 +16,7 @@ defmodule Electric.Shapes.Supervisor do Electric.ProcessRegistry.name(stack_ref, __MODULE__) end - def reset_storage(opts) do - shape_cache_opts = Keyword.fetch!(opts, :shape_cache_opts) - stack_id = Keyword.fetch!(shape_cache_opts, :stack_id) + def reset_storage(stack_id) do stack_storage = Electric.ShapeCache.Storage.for_stack(stack_id) Logger.info("Purging all shapes.") @@ -26,37 +24,56 @@ defmodule Electric.Shapes.Supervisor do end def start_link(opts) do - name = Access.get(opts, :name, name(opts)) - Supervisor.start_link(__MODULE__, opts, name: name) + stack_id = Keyword.fetch!(opts, :stack_id) + + # Start the sup only if the connection subsystem is up + case Electric.StatusMonitor.status(stack_id) do + %{conn: ready} when ready in [:up, :waiting_on_integrity_checks] -> + name = Access.get(opts, :name, name(opts)) + Supervisor.start_link(__MODULE__, opts, name: name) + + _ -> + :ignore + end end @impl Supervisor def init(opts) do stack_id = Keyword.fetch!(opts, :stack_id) + Process.set_label({:replication_supervisor, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) Logger.info("Starting shape replication pipeline") - shape_cleaner = Keyword.fetch!(opts, :shape_cleaner) - log_collector = Keyword.fetch!(opts, :log_collector) - publication_manager = Keyword.fetch!(opts, :publication_manager) - consumer_supervisor = Keyword.fetch!(opts, :consumer_supervisor) - shape_cache = Keyword.fetch!(opts, :shape_cache) - expiry_manager = Keyword.fetch!(opts, :expiry_manager) - schema_reconciler = Keyword.fetch!(opts, :schema_reconciler) + inspector = Keyword.fetch!(opts, :inspector) + persistent_kv = Keyword.fetch!(opts, :persistent_kv) + + connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts) + replication_opts = Keyword.fetch!(connection_manager_opts, :replication_opts) + tweaks = Keyword.fetch!(opts, :tweaks) children = [ {Task.Supervisor, name: Electric.ProcessRegistry.name(stack_id, Electric.StackTaskSupervisor)}, - shape_cleaner, - log_collector, - publication_manager, - consumer_supervisor, - shape_cache, - expiry_manager, - schema_reconciler, + {Electric.ShapeCache.ShapeCleaner.CleanupTaskSupervisor, stack_id: stack_id}, + {Electric.Replication.ShapeLogCollector, + stack_id: stack_id, inspector: inspector, persistent_kv: persistent_kv}, + {Electric.Replication.PublicationManager, + stack_id: stack_id, + publication_name: Keyword.fetch!(replication_opts, :publication_name), + manual_table_publishing?: Keyword.get(opts, :manual_table_publishing?, false), + db_pool: Electric.Connection.Manager.admin_pool(stack_id), + update_debounce_timeout: Keyword.get(tweaks, :publication_alter_debounce_ms, 0), + refresh_period: Keyword.get(tweaks, :publication_refresh_period, 60_000)}, + {Electric.Shapes.DynamicConsumerSupervisor, stack_id: stack_id}, + {Electric.ShapeCache, stack_id: stack_id}, + {Electric.ShapeCache.ExpiryManager, stack_id: stack_id}, + {Electric.Replication.SchemaReconciler, + stack_id: stack_id, + inspector: inspector, + period: Keyword.get(tweaks, :schema_reconciler_period, 60_000)}, canary_spec(stack_id) ] diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index 4f9695c177..ab578d6a0d 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -24,8 +24,7 @@ defmodule Electric.StackConfig do snapshot_timeout_to_first_data: :timer.seconds(30), shape_hibernate_after: Electric.Config.default(:shape_hibernate_after), shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), - chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), - feature_flags: [] + chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold() ] end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 8a76028c96..b33d271eb2 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -100,7 +100,7 @@ defmodule Electric.StackSupervisor do type: :pos_integer, default: LogChunker.default_chunk_size_threshold() ], - feature_flags: [type: {:list, :string}, default: []], + feature_flags: [type: {:list, :atom}, default: []], tweaks: [ type: :keyword_list, required: false, @@ -306,11 +306,12 @@ defmodule Electric.StackSupervisor do shape_changes_registry_name = registry_name(stack_id) shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) - shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) - shape_cache_opts = [ - stack_id: stack_id - ] + # The feature_flags check is temporary. Once we're sure in the correct functioning of shape + # customer suspension, we'll leave just the :shape_enable_suspend? config. + shape_enable_suspend? = + Keyword.fetch!(config.tweaks, :shape_enable_suspend?) and + Electric.Config.feature_flag_suspend_consumers() in config.feature_flags shape_log_collector = Electric.Replication.ShapeLogCollector.name(stack_id) @@ -330,13 +331,7 @@ defmodule Electric.StackSupervisor do timeline_opts: [ stack_id: stack_id, persistent_kv: config.persistent_kv - ], - persistent_kv: config.persistent_kv, - shape_cache_opts: shape_cache_opts, - inspector: inspector, - max_shapes: config.max_shapes, - tweaks: config.tweaks, - manual_table_publishing?: config.manual_table_publishing? + ] ] registry_partitions = @@ -354,9 +349,11 @@ defmodule Electric.StackSupervisor do chunk_bytes_threshold: config.chunk_bytes_threshold, snapshot_timeout_to_first_data: config.tweaks[:snapshot_timeout_to_first_data], inspector: inspector, + persistent_kv: config.persistent_kv, shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, - feature_flags: Map.get(config, :feature_flags, []) + manual_table_publishing?: config.manual_table_publishing?, + tweaks: config.tweaks ]}, {Electric.AsyncDeleter, stack_id: stack_id, @@ -369,7 +366,12 @@ defmodule Electric.StackSupervisor do stack_id: stack_id, pool: metadata_db_pool, persistent_kv: config.persistent_kv}, {Electric.ShapeCache.ShapeStatusOwner, [stack_id: stack_id, storage: storage]}, {Electric.MonitoredCoreSupervisor, - stack_id: stack_id, connection_manager_opts: connection_manager_opts} + stack_id: stack_id, + connection_manager_opts: connection_manager_opts, + inspector: inspector, + persistent_kv: config.persistent_kv, + manual_table_publishing?: config.manual_table_publishing?, + tweaks: config.tweaks} ] |> Enum.reject(&is_nil/1) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index e3e98c3fd7..9425f538a9 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -5,7 +5,7 @@ defmodule Electric.StatusMonitor do require Logger @type status() :: %{ - conn: :waiting_on_lock | :starting | :up | :sleeping, + conn: :waiting_on_lock | :waiting_on_integrity_checks | :starting | :up | :sleeping, shape: :starting | :up } @@ -16,7 +16,7 @@ defmodule Electric.StatusMonitor do :snapshot_connection_pool_ready, :shape_log_collector_ready, :supervisor_processes_ready, - :integrety_checks_passed + :integrity_checks_passed ] @default_results for condition <- @conditions, into: %{}, do: {condition, {false, %{}}} @@ -60,9 +60,10 @@ defmodule Electric.StatusMonitor do replication_client_ready: {true, _}, admin_connection_pool_ready: {true, _}, snapshot_connection_pool_ready: {true, _}, - integrety_checks_passed: {true, _} - }), - do: :up + integrity_checks_passed: {integrity_checks_passed?, _} + }) do + if integrity_checks_passed?, do: :up, else: :waiting_on_integrity_checks + end defp conn_status_from_results(_), do: :starting @@ -106,8 +107,8 @@ defmodule Electric.StatusMonitor do mark_condition_met(stack_id, :supervisor_processes_ready, canary_pid) end - def mark_integrety_checks_passed(stack_id, connection_manager_pid) do - mark_condition_met(stack_id, :integrety_checks_passed, connection_manager_pid) + def mark_integrity_checks_passed(stack_id, connection_manager_pid) do + mark_condition_met(stack_id, :integrity_checks_passed, connection_manager_pid) end def mark_pg_lock_as_errored(stack_id, message) when is_binary(message) do @@ -377,8 +378,8 @@ defmodule Electric.StatusMonitor do %{supervisor_processes_ready: {false, details}} -> "Timeout waiting for stack restart" <> format_details(details) - %{integrety_checks_passed: {false, details}} -> - "Timeout waiting for integrety checks" <> format_details(details) + %{integrity_checks_passed: {false, details}} -> + "Timeout waiting for integrity checks" <> format_details(details) end end diff --git a/packages/sync-service/test/electric/connection/manager_test.exs b/packages/sync-service/test/electric/connection/manager_test.exs index 127e0b1de6..139e5f16e2 100644 --- a/packages/sync-service/test/electric/connection/manager_test.exs +++ b/packages/sync-service/test/electric/connection/manager_test.exs @@ -43,20 +43,17 @@ defmodule Electric.Connection.ConnectionManagerTest do pool_opts: [pool_size: 2], connection_backoff: Connection.Manager.ConnectionBackoff.init(50, 50), timeline_opts: [stack_id: stack_id, persistent_kv: ctx.persistent_kv], - inspector: ctx.inspector, - shape_cache_opts: [ - stack_id: stack_id - ], - tweaks: [], - max_shapes: nil, - persistent_kv: ctx.persistent_kv, stack_events_registry: stack_events_registry ] core_sup = start_link_supervised!( {Electric.CoreSupervisor, - stack_id: stack_id, connection_manager_opts: connection_manager_opts}, + stack_id: stack_id, + connection_manager_opts: connection_manager_opts, + inspector: ctx.inspector, + persistent_kv: ctx.persistent_kv, + tweaks: []}, # The test supervisor under which this one is started has `auto_shutdown` set to # `:never`, so we need to make sure the core supervisor is not a significant # child, otherwise we'd get the following error: diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index fde5108005..6508a9f256 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -70,12 +70,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do :ok end) - shape_cache_opts = - [ - stack_id: stack_id - ] - - shape_cache_pid = start_link_supervised!({Electric.ShapeCache, shape_cache_opts}) + shape_cache_pid = start_link_supervised!({Electric.ShapeCache, stack_id: stack_id}) assert_receive :shape_log_collector_ready, 1000 diff --git a/packages/sync-service/test/electric/shape_cache/expiry_manager_test.exs b/packages/sync-service/test/electric/shape_cache/expiry_manager_test.exs index 6b0242c66e..0cc2ad00d0 100644 --- a/packages/sync-service/test/electric/shape_cache/expiry_manager_test.exs +++ b/packages/sync-service/test/electric/shape_cache/expiry_manager_test.exs @@ -21,8 +21,10 @@ defmodule Electric.ExpiryManagerTest do @max_shapes 10 setup %{stack_id: stack_id} do + Electric.StackConfig.put(stack_id, :max_shapes, @max_shapes) + expiry_manager = - start_supervised!({ExpiryManager, max_shapes: @max_shapes, period: 1, stack_id: stack_id}) + start_supervised!({ExpiryManager, period: 1, stack_id: stack_id}) Repatch.patch(ShapeCleaner, :remove_shapes, [mode: :shared], fn stack_id, shape_handles -> Enum.each(shape_handles, &ShapeStatus.remove_shape(stack_id, &1)) diff --git a/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs b/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs index f141b10219..c1a15497bd 100644 --- a/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs +++ b/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs @@ -159,6 +159,8 @@ defmodule Electric.Shapes.PartitionedTablesTest do end test "truncation of partition truncates the partition root", ctx do + Support.ComponentSetup.put_shape_cleaner_callback(ctx) + {:ok, shape} = Shape.new("public.partitioned_items", inspector: ctx.inspector) {:ok, partition_shape} = Shape.new("public.partitioned_items_100", inspector: ctx.inspector) @@ -211,6 +213,8 @@ defmodule Electric.Shapes.PartitionedTablesTest do end test "truncation of partition root truncates all partitions", ctx do + Support.ComponentSetup.put_shape_cleaner_callback(ctx) + {:ok, shape} = Shape.new("public.partitioned_items", inspector: ctx.inspector) {:ok, partition_shape} = Shape.new("public.partitioned_items_100", inspector: ctx.inspector) diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index 72b15152b8..9b48de8d64 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -32,12 +32,14 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) - StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(stack_id, self()) StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.status(stack_id) == %{conn: :up, shape: :up} end - test "when integrety checks not passed, returns :starting", %{stack_id: stack_id} do + test "when integrity checks not passed, returns :waiting_on_integrity_checks", %{ + stack_id: stack_id + } do start_link_supervised!({StatusMonitor, stack_id: stack_id}) StatusMonitor.mark_pg_lock_acquired(stack_id, self()) StatusMonitor.mark_replication_client_ready(stack_id, self()) @@ -46,7 +48,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) StatusMonitor.wait_for_messages_to_be_processed(stack_id) - assert StatusMonitor.status(stack_id) == %{conn: :starting, shape: :up} + assert StatusMonitor.status(stack_id) == %{conn: :waiting_on_integrity_checks, shape: :up} end test "when replication client not ready, returns :starting", %{stack_id: stack_id} do @@ -77,7 +79,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) StatusMonitor.mark_replication_client_ready(stack_id, self()) StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) - StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(stack_id, self()) StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.status(stack_id) == %{conn: :up, shape: :starting} end @@ -89,7 +91,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) StatusMonitor.mark_replication_client_ready(stack_id, self()) StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) - StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(stack_id, self()) StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.status(stack_id) == %{conn: :up, shape: :starting} end @@ -120,7 +122,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) - StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(stack_id, self()) StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.status(stack_id) == %{conn: :up, shape: :up} @@ -152,7 +154,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_connection_pool_ready(stack_id, :admin, self()) StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) - StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(stack_id, self()) refute_receive :active, 20 assert StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) == :ok @@ -175,7 +177,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.mark_connection_pool_ready(stack_id, :admin, self()) StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) - StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + StatusMonitor.mark_integrity_checks_passed(stack_id, self()) refute_receive :active, 20 assert StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) == :ok @@ -248,7 +250,7 @@ defmodule Electric.StatusMonitorTest do {:error, "Timeout waiting for shape data to be loaded"} end - test "returns error on timeout waiting for integrety checks", %{stack_id: stack_id} do + test "returns error on timeout waiting for integrity checks", %{stack_id: stack_id} do start_link_supervised!({StatusMonitor, stack_id: stack_id}) StatusMonitor.mark_pg_lock_acquired(stack_id, self()) StatusMonitor.mark_replication_client_ready(stack_id, self()) @@ -259,7 +261,7 @@ defmodule Electric.StatusMonitorTest do StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.wait_until_active(stack_id, timeout: 1) == - {:error, "Timeout waiting for integrety checks"} + {:error, "Timeout waiting for integrity checks"} end test "returns explicit error on timeout when supplied", %{ diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index fc83883606..f72c56d631 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -112,8 +112,7 @@ defmodule Support.ComponentSetup do inspector: Map.get(ctx, :inspector, nil), shape_changes_registry: Map.get(ctx, :registry, Electric.StackSupervisor.registry_name(stack_id)), - shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000), - feature_flags: Electric.Config.get_env(:feature_flags) + shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000) ], seed_config )} @@ -198,9 +197,10 @@ defmodule Support.ComponentSetup do end def with_shape_cleaner(ctx) do + put_shape_cleaner_callback(ctx) + start_supervised!( - {Electric.ShapeCache.ShapeCleaner.CleanupTaskSupervisor, - Keyword.merge(shape_cleaner_opts(ctx), stack_id: ctx.stack_id)} + {Electric.ShapeCache.ShapeCleaner.CleanupTaskSupervisor, stack_id: ctx.stack_id} ) :ok @@ -391,17 +391,6 @@ defmodule Support.ComponentSetup do %{} end - def shape_cleaner_opts(ctx) do - parent = self() - - on_cleanup = - Map.get(ctx, :on_shape_cleanup, fn handle -> - send(parent, {Electric.ShapeCache.ShapeCleaner, :cleanup, handle}) - end) - - [on_cleanup: on_cleanup] - end - def with_complete_stack(ctx) do stack_id = full_test_name(ctx) @@ -453,10 +442,7 @@ defmodule Support.ComponentSetup do max_restarts: 0, pool_size: 2 ], - tweaks: [ - registry_partitions: 1, - shape_cleaner_opts: shape_cleaner_opts(ctx) - ], + tweaks: [registry_partitions: 1], manual_table_publishing?: Map.get(ctx, :manual_table_publishing?, false), feature_flags: Electric.Config.get_env(:feature_flags)}, restart: :temporary, @@ -492,6 +478,7 @@ defmodule Support.ComponentSetup do max_age: 60, stale_age: 300, allow_shape_deletion: true, + allow_subqueries?: true, secret: ctx[:secret] ] |> Keyword.merge( @@ -506,4 +493,23 @@ defmodule Support.ComponentSetup do |> Keyword.merge(overrides) |> Electric.Shapes.Api.plug_opts() end + + # In tests we want to know when shape cleaning occurs. This puts a callback function into + # StackConfig, exactly where Electric.ShapeCache.ShapeCleaner.CleanupTaskSupervisor expects + # to find it. + def put_shape_cleaner_callback(ctx) do + Electric.StackConfig.put( + ctx.stack_id, + {Electric.ShapeCache.ShapeCleaner, :on_cleanup}, + shape_cleaner_callback(ctx) + ) + end + + def shape_cleaner_callback(ctx) do + parent = self() + + Map.get(ctx, :on_shape_cleanup, fn handle -> + send(parent, {Electric.ShapeCache.ShapeCleaner, :cleanup, handle}) + end) + end end diff --git a/packages/sync-service/test/support/test_utils.ex b/packages/sync-service/test/support/test_utils.ex index a29bd66247..9c1a7b76e1 100644 --- a/packages/sync-service/test/support/test_utils.ex +++ b/packages/sync-service/test/support/test_utils.ex @@ -77,7 +77,7 @@ defmodule Support.TestUtils do Electric.StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, self()) Electric.StatusMonitor.mark_shape_log_collector_ready(stack_id, self()) Electric.StatusMonitor.mark_supervisor_processes_ready(stack_id, self()) - Electric.StatusMonitor.mark_integrety_checks_passed(stack_id, self()) + Electric.StatusMonitor.mark_integrity_checks_passed(stack_id, self()) Electric.StatusMonitor.wait_for_messages_to_be_processed(stack_id) end