diff --git a/.changeset/little-seas-guess.md b/.changeset/little-seas-guess.md new file mode 100644 index 0000000000..b435e40782 --- /dev/null +++ b/.changeset/little-seas-guess.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Add two new configuration options for periodic retained WAL size checks in scaled down mode: ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD. diff --git a/integration-tests/tests/wal-size-check-while-scaled-down.lux b/integration-tests/tests/wal-size-check-while-scaled-down.lux new file mode 100644 index 0000000000..1e3d69db1d --- /dev/null +++ b/integration-tests/tests/wal-size-check-while-scaled-down.lux @@ -0,0 +1,79 @@ +[doc Verify Electric can scale down database connections after a lull in stream transactions] + +[include _macros.luxinc] + +[global pg_container_name=wal-size-check-while-scaled-down__pg] +[global database_url=postgresql://electric_test:password@localhost:$pg_host_port/electric?sslmode=disable] + +### + +# Start Postgres and create an additional role that will be used by Electric in this test +[invoke setup_pg_with_shell_name \ + "pg" \ + "-e INIT_DB_SQL=\"\ + CREATE ROLE electric_test LOGIN PASSWORD 'password' REPLICATION;\ + GRANT CREATE ON DATABASE electric TO electric_test\"" \ + "" \ + "-v $(realpath ../scripts/init_db.sh):/docker-entrypoint-initdb.d/initdb-init_db.sh"] + +# Create a table for subsequent shape requests +[invoke start_psql] + +[shell psql] + !create table items(val text); + ??CREATE + + !insert into items values ('1'), ('2'); + ??INSERT + + !alter table items owner to electric_test; + ??ALTER TABLE + +# Start Electric and wait for it to finish initialization. +# Have to disable the ConnectionManagerPing process, otherwise it will error when +# Connection.Manager itself goes down as part of the test scenario. +[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD=1s ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD=10000"] +[shell electric] + [timeout 10] + ??[debug] Replication client started streaming + +# Force Electric to scale down by sending the :idle_check message to the replication client. +# Normally the client performs the check once a minute, so we're speeding it up here for +# testing purposes. +[shell electric] + !Electric.Postgres.ReplicationClient.name("single_stack") |> GenServer.whereis() |> send(:check_if_idle) + ??[notice] Closing all database connections after the replication stream has been idle for + + ??[debug] Terminating connection manager with reason :shutdown. + +# Confirm that Postgres no longer sees any open connections for the electric user +[shell psql] + [sleep 2] + + !select datname, usename, backend_type, query from pg_stat_activity where usename = 'electric_test'; + ??(0 rows) + + +# Sleep until it's time to verify that Electric has checked retained WAL size in Postgres after ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD +[shell electric] + [sleep 2] + + ??Opening a database connection to check the retained WAL size + ?Retained WAL size [0-9]+B is below the threshold of ~10\.0KB\. Scheduling the next check to take place after 1sec + +# Write to the database enough data to force the retained WAL size to go over ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD +[shell psql] + !insert into items select * from generate_series(1,1000); + ??INSERT + +# Verify that Electric wakes up after checking the WAL size and seeing it has exceeded the threshold +[shell electric] + ??Opening a database connection to check the retained WAL size + ?Retained WAL size ~[0-9.]+KB has exceeded the threshold of ~10\.0KB\. Time to wake up the connection subsystem + + ??[info] Acquiring lock from postgres + ??[info] Starting replication from postgres + ??[debug] Replication client started streaming + +[cleanup] + [invoke teardown] diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 17b34d2e4d..e49d2871a9 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -272,6 +272,18 @@ config :electric, "ELECTRIC_REPLICATION_IDLE_TIMEOUT", &Electric.Config.parse_human_readable_time!/1, nil + ), + idle_wal_size_check_period: + env!( + "ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD", + &Electric.Config.parse_human_readable_time!/1, + nil + ), + idle_wal_size_threshold: + env!( + "ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD", + &Electric.Config.parse_human_readable_size!/1, + nil ) if Electric.telemetry_enabled?() do diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 7ed4262612..9b401bda02 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -143,7 +143,9 @@ defmodule Electric.Application do shape_hibernate_after: get_env(opts, :shape_hibernate_after), shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), conn_max_requests: get_env(opts, :conn_max_requests), - process_spawn_opts: get_env(opts, :process_spawn_opts) + process_spawn_opts: get_env(opts, :process_spawn_opts), + idle_wal_size_check_period: get_env(opts, :idle_wal_size_check_period), + idle_wal_size_threshold: get_env(opts, :idle_wal_size_threshold) ], manual_table_publishing?: get_env(opts, :manual_table_publishing?) ) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index ad6b48398e..113b3b8fd1 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -50,6 +50,13 @@ defmodule Electric.Config do max_txn_size: 250 * 1024 * 1024, # Scaling down on idle is disabled by default replication_idle_timeout: 0, + # If the database provider scales down after 5 min and provided that the + # replication_idle_timeout is about a minute or less, checking WAL size once an hour + # ends up using about 10% of the compute on an otherwise idle database. + idle_wal_size_check_period: 3_600_000, + # We want to wake up and process any transactions that have accumulated in the WAL, hence + # the low threshold. + idle_wal_size_threshold: 100 * 1024 * 1024, manual_table_publishing?: false, ## HTTP API # set enable_http_api: false to turn off the HTTP server totally @@ -426,9 +433,9 @@ defmodule Electric.Config do end end - @time_units ~w[ms msec s sec m min] + @time_units ~w[ms msec s sec m min h hr] - @spec parse_human_readable_time(binary | nil) :: {:ok, pos_integer} | {:error, binary} + @spec parse_human_readable_time(binary) :: {:ok, pos_integer} | {:error, binary} def parse_human_readable_time(str) do with {num, suffix} <- Float.parse(str), @@ -445,6 +452,7 @@ defmodule Electric.Config do defp time_multiplier(millisecond) when millisecond in ["ms", "msec"], do: 1 defp time_multiplier(second) when second in ["s", "sec"], do: 1000 defp time_multiplier(minute) when minute in ["m", "min"], do: 1000 * 60 + defp time_multiplier(hour) when hour in ["h", "hr"], do: 1000 * 60 * 60 def parse_human_readable_time!(str) do case parse_human_readable_time(str) do @@ -453,6 +461,68 @@ defmodule Electric.Config do end end + @doc """ + Parse human-readable memory/storage size string into bytes. + + ## Examples + + iex> parse_human_readable_size("1GiB") + {:ok, #{1024 * 1024 * 1024}} + + iex> parse_human_readable_size("2.23GB") + {:ok, 2_230_000_000} + + iex> parse_human_readable_size("256MiB") + {:ok, #{256 * 1024 * 1024}} + + iex> parse_human_readable_size("377MB") + {:ok, 377_000_000} + + iex> parse_human_readable_size("430KiB") + {:ok, #{430 * 1024}} + + iex> parse_human_readable_size("142888KB") + {:ok, 142_888_000} + + iex> parse_human_readable_size("123456789") + {:ok, 123_456_789} + + iex> parse_human_readable_size("") + {:error, ~S'invalid size unit: "". Must be one of ["KB", "KiB", "MB", "MiB", "GB", "GiB"]'} + + iex> parse_human_readable_size("foo") + {:error, ~S'invalid size unit: "foo". Must be one of ["KB", "KiB", "MB", "MiB", "GB", "GiB"]'} + """ + @spec parse_human_readable_size(binary) :: {:ok, pos_integer} | {:error, binary} + + @size_units ~w[KB KiB MB MiB GB GiB] + + def parse_human_readable_size(str) do + with {num, suffix} <- Float.parse(str), + true <- num > 0, + suffix = String.trim(suffix), + true <- suffix == "" or suffix in @size_units do + {:ok, trunc(num * size_multiplier(suffix))} + else + _ -> {:error, "invalid size unit: #{inspect(str)}. Must be one of #{inspect(@size_units)}"} + end + end + + defp size_multiplier(""), do: 1 + defp size_multiplier("KB"), do: 1_000 + defp size_multiplier("KiB"), do: 1024 + defp size_multiplier("MB"), do: 1_000_000 + defp size_multiplier("MiB"), do: 1024 * 1024 + defp size_multiplier("GB"), do: 1_000_000_000 + defp size_multiplier("GiB"), do: 1024 * 1024 * 1024 + + def parse_human_readable_size!(str) do + case parse_human_readable_size(str) do + {:ok, result} -> result + {:error, message} -> raise Dotenvy.Error, message: message + end + end + def validate_security_config!(secret, insecure) do cond do insecure && secret != nil -> diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 0a2e516b28..3db72ab629 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -118,7 +118,9 @@ defmodule Electric.Connection.Manager do end use GenServer, shutdown: :infinity - alias Electric.Postgres.LockBreakerConnection + + import Electric.Utils, only: [quote_string: 1] + alias Electric.Connection.Manager.ConnectionBackoff alias Electric.Connection.Manager.ConnectionResolver alias Electric.DbConnectionError @@ -146,6 +148,8 @@ defmodule Electric.Connection.Manager do # the same reconnection period rather than a new one. @replication_liveness_confirmation_duration 5_000 + @validated_conn_opts_config_key {__MODULE__, :validated_connection_opts} + def child_spec(init_arg) do %{ id: __MODULE__, @@ -181,6 +185,12 @@ defmodule Electric.Connection.Manager do pool_name(Access.fetch!(opts, :stack_id), Access.fetch!(opts, :role)) end + def validated_connection_opts(stack_id, type) do + stack_id + |> Electric.StackConfig.lookup!(@validated_conn_opts_config_key) + |> Map.fetch!(type) + end + def drop_replication_slot_on_stop(manager) do GenServer.cast(manager, :drop_replication_slot_on_stop) end @@ -291,14 +301,15 @@ defmodule Electric.Connection.Manager do manual_table_publishing?: Keyword.get(opts, :manual_table_publishing?, false), max_shapes: Keyword.fetch!(opts, :max_shapes) } - |> initialize_connection_opts(opts) + |> init_connection_opts(opts) + |> init_validated_connection_opts() # Wait for the connection resolver to start before continuing with # connection setup. {:ok, state} end - defp initialize_connection_opts(state, opts) do + defp init_connection_opts(state, opts) do connection_opts = Keyword.fetch!(opts, :connection_opts) replication_opts = @@ -310,14 +321,27 @@ defmodule Electric.Connection.Manager do %{state | connection_opts: connection_opts, replication_opts: replication_opts} end + defp init_validated_connection_opts(%{stack_id: stack_id} = state) do + # Wipe any previously stored validated connection opts on startup to avoid persisting a + # configuration that is no longer valid. We always start connection manager with the + # options specified by the user to avoid gettting into an unrecoverable failure state. + Electric.StackConfig.erase(stack_id, @validated_conn_opts_config_key) + state + end + + defp update_validated_connection_opts(%{stack_id: stack_id} = state, type, validated_opts) do + validated_connection_opts = Map.put(state.validated_connection_opts, type, validated_opts) + Electric.StackConfig.put(stack_id, @validated_conn_opts_config_key, validated_connection_opts) + %{state | validated_connection_opts: validated_connection_opts} + end + defp validate_connection(conn_opts, type, state) do - if opts = state.validated_connection_opts[type] do + if opts = Map.get(state.validated_connection_opts, type) do {:ok, opts, state} else try do with {:ok, validated_opts} <- ConnectionResolver.validate(state.stack_id, conn_opts) do - {:ok, validated_opts, - Map.update!(state, :validated_connection_opts, &Map.put(&1, type, validated_opts))} + {:ok, validated_opts, update_validated_connection_opts(state, type, validated_opts)} end catch :exit, {:killed, _} -> {:error, :killed} @@ -543,28 +567,24 @@ defmodule Electric.Connection.Manager do def handle_continue( :check_lock_not_abandoned, - %State{replication_pg_backend_pid: pg_backend_pid} = state - ) do - if state.current_step == {:start_replication_client, :acquiring_lock} and - not is_nil(pg_backend_pid) do - with {:ok, conn_opts, state} <- - validate_connection(pooled_connection_opts(state), :pool, state), - {:ok, breaker_pid} <- - LockBreakerConnection.start(connection_opts: conn_opts, stack_id: state.stack_id) do - lock_name = Keyword.fetch!(state.replication_opts, :slot_name) - - LockBreakerConnection.stop_backends_and_close(breaker_pid, lock_name, pg_backend_pid) - else - {:error, reason} -> - # no-op, this is a one-shot attempt at fixing a lock - Logger.warning("Failed try and break stuck lock connection: #{inspect(reason)}") - :ok - end - end + %State{ + current_step: {:start_replication_client, :acquiring_lock}, + replication_pg_backend_pid: pg_backend_pid + } = state + ) + when not is_nil(pg_backend_pid) do + pooled_conn_opts = pooled_connection_opts(state) - {:noreply, state} + with {:ok, conn_opts, state} <- validate_connection(pooled_conn_opts, :pool, state) do + run_lock_breaker_query(conn_opts, state) + {:noreply, state} + else + _ -> {:noreply, state} + end end + def handle_continue(:check_lock_not_abandoned, state), do: {:noreply, state} + @impl true def handle_info( {:timeout, tref, {:check_status, :replication_lock}}, @@ -1275,4 +1295,67 @@ defmodule Electric.Connection.Manager do do: %{state | replication_configuration_timer: nil} defp nillify_timer(state, _tref), do: state + + # Electric takes out a session-level advisory lock on a separate connection to better manage the + # ownership of the replication slot. Unfortunately, we have seen instances (especially on Neon), + # where the Electric disconnects, but the lock is not auto-released. + # + # For these cases, this breaker exists - it'll connect to the database, and check that for + # a given lock name, if that lock is taken, there also exists an active replication slot with the + # same name. If not, it'll terminate the backend that is holding the lock, under the assumption + # that it's one of the abandoned locks. + def run_lock_breaker_query(conn_opts, state) do + database = Keyword.fetch!(conn_opts, :database) + lock_name = Keyword.fetch!(state.replication_opts, :slot_name) + query = lock_breaker_query(database, lock_name, state.replication_pg_backend_pid) + opts = [stack_id: state.stack_id, label: :lock_breaker_connection, connection_opts: conn_opts] + + retval = Electric.Postgres.OneOffConnection.query(query, opts) + + case retval do + {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} -> + Logger.debug("No stuck backends found") + + {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"]}} -> + Logger.notice( + "Terminated a stuck backend to free the lock #{lock_name} because slot with same name was inactive" + ) + + {:error, reason} -> + # No retries here since this is a one-shot attempt at fixing a lock + Logger.warning("Failed try and break stuck lock connection: #{inspect(reason)}") + end + + retval + end + + defp lock_breaker_query(database, lock_name, lock_connection_pg_backend_pid) + when is_integer(lock_connection_pg_backend_pid) or is_nil(lock_connection_pg_backend_pid) do + # We're using a `WITH` clause to execute all this in one statement + # - See if there are existing but inactive replication slots with the given name + # - Find all backends that are holding locks with the same name + # - Terminate those backends + # + # It's generally impossible for this to return more than one row + + """ + WITH inactive_slots AS ( + select slot_name + from pg_replication_slots + where active = false and database = #{quote_string(database)} and slot_name = #{quote_string(lock_name)} + ), + stuck_backends AS ( + select pid + from pg_locks, inactive_slots + where + hashtext(slot_name) = (classid::bigint << 32) | objid::bigint + and locktype = 'advisory' + and objsubid = 1 + and database = (select oid from pg_database where datname = #{quote_string(database)}) + and granted + and pid != #{lock_connection_pg_backend_pid || 0} + ) + SELECT pg_terminate_backend(pid) FROM stuck_backends; + """ + end end diff --git a/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex b/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex index 1f0b6a63a1..6addd252f2 100644 --- a/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex +++ b/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex @@ -4,21 +4,6 @@ defmodule Electric.Connection.Manager.ConnectionResolver do require Logger - defmodule Connection do - @moduledoc false - @behaviour Postgrex.SimpleConnection - - def init(stack_id) do - Logger.metadata(stack_id: stack_id, is_connection_process?: true) - Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - {:ok, []} - end - - def notify(_channel, _payload, _state) do - :ok - end - end - def name(stack_ref) do Electric.ProcessRegistry.name(stack_ref, __MODULE__) end @@ -29,29 +14,20 @@ defmodule Electric.Connection.Manager.ConnectionResolver do end end - def validate(stack_id, db_connection) do - GenServer.call(name(stack_id), {:validate, db_connection}, :infinity) + def validate(stack_id, conn_opts) do + GenServer.call(name(stack_id), {:validate, conn_opts}, :infinity) end @impl GenServer def init(opts) do stack_id = Keyword.fetch!(opts, :stack_id) - # ignore exits from connection processes that fail to start due to - # connection errors or from us killing the connection after we're - # done - Process.flag(:trap_exit, true) - Process.set_label({:connection_resolver, stack_id}) metadata = [is_connection_process?: true, stack_id: stack_id] Logger.metadata(metadata) Electric.Telemetry.Sentry.set_tags_context(metadata) - {_m, _f, _a} = - connection_mod = - Keyword.get(opts, :connection_mod, {Postgrex.SimpleConnection, :start_link, []}) - - {:ok, %{connection_mod: connection_mod, stack_id: stack_id}, {:continue, :notify_ready}} + {:ok, %{stack_id: stack_id}, {:continue, :notify_ready}} end @impl GenServer @@ -62,11 +38,11 @@ defmodule Electric.Connection.Manager.ConnectionResolver do end @impl GenServer - def handle_call({:validate, connection}, _from, state) do + def handle_call({:validate, conn_opts}, _from, state) do # convert to postgrex style for return to conn.manager - connection = populate_connection_opts(connection) + conn_opts = populate_connection_opts(conn_opts) - result = attempt_connection({:cont, connection}, state) + result = attempt_connection({:cont, conn_opts}, state) {:reply, result, state, :hibernate} end @@ -80,22 +56,10 @@ defmodule Electric.Connection.Manager.ConnectionResolver do def handle_info({:EXIT, _pid, _reason}, state), do: {:noreply, state} defp attempt_connection({:cont, conn_opts}, state) do - %{ - connection_mod: {connection_mod, connection_fun, connection_args} - } = state - - connection_opts = - Keyword.merge(Electric.Utils.deobfuscate_password(conn_opts), - auto_reconnect: false, - sync_connect: true - ) - - args = [Connection, state.stack_id, connection_opts | connection_args] - - case apply(connection_mod, connection_fun, args) do - {:ok, conn} -> - Process.exit(conn, :kill) + opts = [stack_id: state.stack_id, label: :connection_resolver, connection_opts: conn_opts] + case Electric.Postgres.OneOffConnection.attempt_connection(opts) do + :success -> {:ok, conn_opts} {:error, error} -> diff --git a/packages/sync-service/lib/electric/connection/restarter.ex b/packages/sync-service/lib/electric/connection/restarter.ex index 5205207fe0..ed95e9a424 100644 --- a/packages/sync-service/lib/electric/connection/restarter.ex +++ b/packages/sync-service/lib/electric/connection/restarter.ex @@ -10,12 +10,18 @@ defmodule Electric.Connection.Restarter do - publication manager - schema reconciler + Once the connection subsystem is scaled down, Restarter starts a timer to check the retained + WAL size periodically. If the size exceeds the configured threshold, Restart will restart the + connection subsystem. Both the period and the size threshold are passed to `start_link/1`. + """ use GenServer alias Electric.StatusMonitor + require Logger + def name(stack_ref) do Electric.ProcessRegistry.name(stack_ref, __MODULE__) end @@ -75,30 +81,42 @@ defmodule Electric.Connection.Restarter do %{ stack_id: Keyword.fetch!(opts, :stack_id), stack_events_registry: Keyword.fetch!(opts, :stack_events_registry), - wait_until_conn_up_ref: nil + slot_name: Keyword.fetch!(opts, :slot_name), + # NOTE(alco): These defaults are really only used in testing when these config options + # aren't set. The actual real-world defaults are defined in Electric.Config. + wal_size_check_period: Keyword.get(opts, :wal_size_check_period, 3600 * 1000), + wal_size_threshold: Keyword.get(opts, :wal_size_threshold, 100 * 1024 * 1024), + wait_until_conn_up_ref: nil, + wal_size_check_timer: nil }} end - def handle_cast(:stop_connection_subsystem, state) do - StatusMonitor.database_connections_going_to_sleep(state.stack_id) - Electric.Connection.Manager.Supervisor.stop_connection_manager(stack_id: state.stack_id) + def handle_cast(:stop_connection_subsystem, %{stack_id: stack_id} = state) do + StatusMonitor.database_connections_going_to_sleep(stack_id) + Electric.Connection.Manager.Supervisor.stop_connection_manager(stack_id: stack_id) Electric.StackSupervisor.dispatch_stack_event( state.stack_events_registry, - state.stack_id, + stack_id, :scaled_down_database_connections ) + state = schedule_wal_size_check(state) + {:noreply, state} end def handle_cast(:restore_connection_subsystem, %{wait_until_conn_up_ref: nil} = state) do - StatusMonitor.database_connections_waking_up(state.stack_id) - Electric.Connection.Manager.Supervisor.restart(stack_id: state.stack_id) + %{stack_id: stack_id} = state + + StatusMonitor.database_connections_waking_up(stack_id) + Electric.Connection.Manager.Supervisor.restart(stack_id: stack_id) - ref = StatusMonitor.wait_until_conn_up_async(state.stack_id) + ref = StatusMonitor.wait_until_conn_up_async(stack_id) - {:noreply, %{state | wait_until_conn_up_ref: ref}} + if timer = state.wal_size_check_timer, do: Process.cancel_timer(timer) + + {:noreply, %{state | wait_until_conn_up_ref: ref, wal_size_check_timer: nil}} end def handle_cast(:restore_connection_subsystem, state) do @@ -108,11 +126,92 @@ defmodule Electric.Connection.Restarter do end def handle_call(:restart_connection_subsystem, _from, state) do - :ok = Electric.Connection.Manager.Supervisor.restart(stack_id: state.stack_id) + :ok = do_restart_connection_subsystem(state.stack_id) {:reply, :ok, state} end def handle_info({ref, :ok}, %{wait_until_conn_up_ref: ref} = state) do {:noreply, %{state | wait_until_conn_up_ref: nil}} end + + # The timer has already been cancelled and reset, ignore this message. + def handle_info(:check_wal_size, %{wal_size_check_timer: nil} = state) do + {:noreply, state} + end + + def handle_info(:check_wal_size, state) do + state = %{state | wal_size_check_timer: nil} + + wal_size = query_retained_wal_size(state) + formatted_wal_size = Electric.Utils.format_bytes_to_human_readable_size(wal_size) + + formatted_threshold = + Electric.Utils.format_bytes_to_human_readable_size(state.wal_size_threshold) + + formatted_period = + Electric.Utils.format_milliseconds_to_human_readable_interval(state.wal_size_check_period) + + state = + if wal_size >= state.wal_size_threshold do + Logger.info( + "Retained WAL size #{formatted_wal_size} has exceeded the threshold of #{formatted_threshold}. Time to wake up the connection subsystem." + ) + + :ok = do_restart_connection_subsystem(state.stack_id) + state + else + Logger.info( + "Retained WAL size #{formatted_wal_size} is below the threshold of #{formatted_threshold}. Scheduling the next check to take place after #{formatted_period}" + ) + + schedule_wal_size_check(state) + end + + {:noreply, state} + end + + defp do_restart_connection_subsystem(stack_id) do + Electric.Connection.Manager.Supervisor.restart(stack_id: stack_id) + end + + defp schedule_wal_size_check( + %{wal_size_check_timer: nil, wal_size_check_period: period} = state + ) + when is_integer(period) and period > 0 do + timer = Process.send_after(self(), :check_wal_size, period) + %{state | wal_size_check_timer: timer} + end + + defp schedule_wal_size_check(state), do: state + + @retained_wal_size_query """ + SELECT + pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal_size + FROM + pg_replication_slots + WHERE + slot_name = + """ + + defp query_retained_wal_size(state) do + Logger.info("Opening a database connection to check the retained WAL size") + + query = @retained_wal_size_query <> Electric.Utils.quote_string(state.slot_name) + + opts = [ + stack_id: state.stack_id, + label: :retained_wal_size_query, + connection_opts: + Electric.Connection.Manager.validated_connection_opts(state.stack_id, :pool) + ] + + case Electric.Postgres.OneOffConnection.query(query, opts) do + {:ok, %Postgrex.Result{columns: ["retained_wal_size"], rows: [[wal_bytes_str]]}} -> + String.to_integer(wal_bytes_str) + + error -> + Logger.info("Error querying retained WAL size: #{inspect(error)}") + 0 + end + end end diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index ea32f196f4..5b0df11bdc 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -57,10 +57,12 @@ defmodule Electric.Connection.Supervisor do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + restarter_opts = Keyword.fetch!(opts, :restarter_opts) + connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts) + children = [ - {Electric.Connection.Restarter, - stack_id: stack_id, stack_events_registry: Keyword.fetch!(opts, :stack_events_registry)}, - {Electric.Connection.Manager.Supervisor, opts} + {Electric.Connection.Restarter, restarter_opts}, + {Electric.Connection.Manager.Supervisor, connection_manager_opts} ] Supervisor.init(children, strategy: :rest_for_one) diff --git a/packages/sync-service/lib/electric/core_supervisor.ex b/packages/sync-service/lib/electric/core_supervisor.ex index 52eb3ab3b4..b3290c46a6 100644 --- a/packages/sync-service/lib/electric/core_supervisor.ex +++ b/packages/sync-service/lib/electric/core_supervisor.ex @@ -24,10 +24,8 @@ defmodule Electric.CoreSupervisor do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts) - children = [ - {Electric.Connection.Supervisor, connection_manager_opts} + {Electric.Connection.Supervisor, opts} ] Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) diff --git a/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex b/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex deleted file mode 100644 index eb2427bb18..0000000000 --- a/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex +++ /dev/null @@ -1,133 +0,0 @@ -defmodule Electric.Postgres.LockBreakerConnection do - @moduledoc """ - A Postgres connection that is used to break an abandoned lock. - - Electric takes out a session-level advisory lock on a separate connection to better manage the - ownership of the replication slot. Unfortunately, we have seen instances (especially on Neon), - where the Electric disconnects, but the lock is not auto-released. - - For these cases, this breaker exists - it'll connect to the database, and check that for - a given lock name, if that lock is taken, there also exists an active replication slot with the - same name. If not, it'll terminate the backend that is holding the lock, under the assumption - that it's one of the abandoned locks. - """ - require Logger - - import Electric.Utils, only: [quote_string: 1] - - @behaviour Postgrex.SimpleConnection - - @type option :: - {:connection_opts, Keyword.t()} - | {:stack_id, String.t()} - - @type options :: [option] - - @spec start(options()) :: {:ok, pid()} | {:error, Postgrex.Error.t() | term()} - def start(opts) do - {connection_opts, init_opts} = Keyword.pop(opts, :connection_opts) - - connection_opts = Electric.Utils.deobfuscate_password(connection_opts) - - with {:ok, pid} <- - Postgrex.SimpleConnection.start_link( - __MODULE__, - init_opts |> Keyword.put(:database, connection_opts[:database]), - [ - auto_reconnect: false, - sync_connect: true - ] ++ - connection_opts - ) do - # unlink the lock breaker so that if it crashes it does not affect the caller, - # since it is a one shot fix attempt anyway - Process.unlink(pid) - {:ok, pid} - end - end - - def stop_backends_and_close(server, lock_name, lock_connection_pg_backend_pid \\ nil) do - send(server, {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid}) - end - - @impl true - def init(opts) do - opts = Map.new(opts) - - Process.set_label({:lock_breaker_connection, opts.stack_id}) - - metadata = [ - is_connection_process?: true, - stack_id: opts.stack_id - ] - - Logger.metadata(metadata) - Electric.Telemetry.Sentry.set_tags_context(metadata) - - {:ok, opts} - end - - @impl true - def handle_connect(state) do - {:noreply, state} - end - - @impl true - def handle_info( - {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid}, - state - ) do - {:query, lock_breaker_query(lock_name, lock_connection_pg_backend_pid, state.database), - Map.put(state, :lock_name, lock_name)} - end - - @impl true - def handle_result([%Postgrex.Result{columns: ["pg_terminate_backend"]} = result], state) do - if result.num_rows == 0 do - Logger.debug("No stuck backends found") - else - Logger.notice( - "Terminated a stuck backend to free the lock #{state.lock_name} because slot with same name was inactive" - ) - end - - exit(:shutdown) - end - - def handle_result(%Postgrex.Error{} = error, _) do - raise error - end - - @impl true - def notify(_, _, _), do: :ok - - defp lock_breaker_query(lock_name, lock_connection_pg_backend_pid, database) - when is_integer(lock_connection_pg_backend_pid) or is_nil(lock_connection_pg_backend_pid) do - # We're using a `WITH` clause to execute all this in one statement - # - See if there are existing but inactive replication slots with the given name - # - Find all backends that are holding locks with the same name - # - Terminate those backends - # - # It's generally impossible for this to return more than one row - - """ - WITH inactive_slots AS ( - select slot_name - from pg_replication_slots - where active = false and database = #{quote_string(database)} and slot_name = #{quote_string(lock_name)} - ), - stuck_backends AS ( - select pid - from pg_locks, inactive_slots - where - hashtext(slot_name) = (classid::bigint << 32) | objid::bigint - and locktype = 'advisory' - and objsubid = 1 - and database = (select oid from pg_database where datname = #{quote_string(database)}) - and granted - and pid != #{lock_connection_pg_backend_pid || 0} - ) - SELECT pg_terminate_backend(pid) FROM stuck_backends; - """ - end -end diff --git a/packages/sync-service/lib/electric/postgres/one_off_connection.ex b/packages/sync-service/lib/electric/postgres/one_off_connection.ex new file mode 100644 index 0000000000..d7dbfdce2c --- /dev/null +++ b/packages/sync-service/lib/electric/postgres/one_off_connection.ex @@ -0,0 +1,137 @@ +defmodule Electric.Postgres.OneOffConnection do + @moduledoc """ + A wrapper around Postgrex.SimpleConnection that provides synchronous API for querying the + database. + """ + + @behaviour Postgrex.SimpleConnection + + @default_timeout 5000 + + @doc """ + Attempt a database connection using the given connection options. + + This function is useful to verify that a database connection can be established using the + provided connection options. Once a connection has been established, the connection process + shuts down synchronously before this function returns. + """ + @spec attempt_connection(keyword()) :: :success | {:error, Postgrex.Error.t()} + def attempt_connection(kwopts) do + connect_and_maybe_query(kwopts, &handle_connection/1) + end + + @doc """ + Open a one-off database connection and execute a simple query. + + Once a connection has been established, the query is executed, the connection process shuts + down and the query result is returned from the function. + """ + @spec query(String.t(), keyword()) :: + {:ok, Postgrex.Result.t()} | {:error, Postgrex.Error.t() | :timeout} + def query(query, kwopts) do + timeout = Keyword.get(kwopts, :timeout, @default_timeout) + connect_and_maybe_query([query: query] ++ kwopts, &handle_query_result(&1, timeout)) + end + + ### + + defp connect_and_maybe_query(kwopts, on_connected_callback_fn) do + {connection_opts, kwopts} = Keyword.pop(kwopts, :connection_opts) + + connection_opts = + connection_opts + |> Electric.Utils.deobfuscate_password() + |> Keyword.merge(auto_reconnect: false, sync_connect: true) + + trap_exit_val = Process.flag(:trap_exit, true) + + result = + with {:ok, pid} <- + Postgrex.SimpleConnection.start_link( + __MODULE__, + [parent_pid: self()] ++ kwopts, + connection_opts + ) do + on_connected_callback_fn.(pid) + end + + Process.flag(:trap_exit, trap_exit_val) + + result + end + + # Callback executed after Postgrex.SimpleConnection has successfully connected and there's no query to run. + defp handle_connection(pid) do + exit_connection_process(pid) + :success + end + + # Callback executed after Postgrex.SimpleConnection has successfully connected and sent off a query to the database. + defp handle_query_result(pid, timeout) do + mon = Process.monitor(pid) + + result = + receive do + {^pid, %Postgrex.Result{} = result} -> {:ok, result} + {^pid, %Postgrex.Error{} = error} -> {:error, error} + {:DOWN, ^mon, :process, ^pid, reason} -> {:error, reason} + after + timeout -> {:error, :timeout} + end + + Process.demonitor(mon, [:flush]) + exit_connection_process(pid) + + result + end + + defp exit_connection_process(pid) do + Process.exit(pid, :shutdown) + + receive do + {:EXIT, ^pid, _reason} -> :ok + end + end + + ### + + @impl true + def init(kwopts) do + config = + kwopts + |> Map.new() + + %{stack_id: stack_id} = config + + Process.set_label({config.label, stack_id}) + Logger.metadata(stack_id: stack_id, is_connection_process?: true) + Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + + {:ok, config} + end + + @impl true + def handle_connect(state) do + if query = state[:query] do + {:query, query, state} + else + {:noreply, state} + end + end + + @impl true + def handle_result([%Postgrex.Result{} = result], state) do + send(state.parent_pid, {self(), result}) + {:noreply, state} + end + + def handle_result(%Postgrex.Error{} = error, state) do + send(state.parent_pid, {self(), error}) + {:noreply, state} + end + + @impl true + def notify(_channel, _payload, _state) do + :ok + end +end diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..a7935dc90b 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -9,12 +9,6 @@ defmodule Electric.StackConfig do :ets.lookup_element(table(stack_id), key, 2, default) end - def spawn_opts(stack_id, process_name) do - stack_id - |> lookup(:process_spawn_opts, %{}) - |> Map.get(process_name, []) - end - def lookup!(stack_id, key) do :ets.lookup_element(table(stack_id), key, 2) rescue @@ -23,6 +17,18 @@ defmodule Electric.StackConfig do message: "stack config value #{inspect(key)} is missing for stack #{stack_id}" end + def erase(stack_id, key) do + :ets.delete(table(stack_id), key) + rescue + ArgumentError -> :ok + end + + def spawn_opts(stack_id, process_name) do + stack_id + |> lookup(:process_spawn_opts, %{}) + |> Map.get(process_name, []) + end + @doc false # Should provide all required values not defined dynamically at stack init def default_seed_config do diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index e23ed6b9f8..a141e61a88 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -136,7 +136,15 @@ defmodule Electric.StackSupervisor do type: :pos_integer, default: Electric.Config.default(:conn_max_requests) ], - process_spawn_opts: [type: :map, default: %{}] + process_spawn_opts: [type: :map, default: %{}], + idle_wal_size_check_period: [ + type: :integer, + default: Electric.Config.default(:idle_wal_size_check_period) + ], + idle_wal_size_threshold: [ + type: :integer, + default: Electric.Config.default(:idle_wal_size_threshold) + ] ] ], manual_table_publishing?: [ @@ -347,6 +355,14 @@ defmodule Electric.StackSupervisor do manual_table_publishing?: config.manual_table_publishing? ] + restarter_opts = [ + stack_id: stack_id, + stack_events_registry: config.stack_events_registry, + slot_name: Keyword.fetch!(config.replication_opts, :slot_name), + wal_size_check_period: Keyword.fetch!(config.tweaks, :idle_wal_size_check_period), + wal_size_threshold: Keyword.fetch!(config.tweaks, :idle_wal_size_threshold) + ] + registry_partitions = Keyword.get(config.tweaks, :registry_partitions, System.schedulers_online()) @@ -377,7 +393,9 @@ defmodule Electric.StackSupervisor do {Electric.Postgres.Inspector.EtsInspector, stack_id: stack_id, pool: metadata_db_pool, persistent_kv: config.persistent_kv}, {Electric.MonitoredCoreSupervisor, - stack_id: stack_id, connection_manager_opts: connection_manager_opts} + stack_id: stack_id, + connection_manager_opts: connection_manager_opts, + restarter_opts: restarter_opts} ] |> Enum.reject(&is_nil/1) diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index 878770f7d1..d93df53629 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -599,4 +599,80 @@ defmodule Electric.Utils do Map.new([{prefix, nested} | rest]) end end + + @doc """ + Format the given milliseconds into a human-readable time interval string. + + ## Examples + + iex> format_milliseconds_to_human_readable_interval(100) + "100ms" + + iex> format_milliseconds_to_human_readable_interval(13500) + "13sec 500ms" + + iex> format_milliseconds_to_human_readable_interval(960001) + "16min 1ms" + + iex> format_milliseconds_to_human_readable_interval(3630000) + "1hr 30sec" + """ + def format_milliseconds_to_human_readable_interval(millisec) do + hours = div(millisec, 3600 * 1000) + remainder = millisec - hours * 3600 * 1000 + + minutes = div(remainder, 60 * 1000) + remainder = remainder - minutes * 60 * 1000 + + seconds = div(remainder, 1000) + remainder = remainder - seconds * 1000 + + hrs_unit = if hours > 1, do: "hrs", else: "hr" + + [{hours, hrs_unit}, {minutes, "min"}, {seconds, "sec"}, {remainder, "ms"}] + |> Enum.reject(fn {num, _} -> num == 0 end) + |> Enum.map(fn {num, units} -> to_string(num) <> units end) + |> Enum.join(" ") + end + + @doc """ + Format the given bytes into a human-readable size string. + + ## Examples + + iex> format_bytes_to_human_readable_size(100) + "100B" + + iex> format_bytes_to_human_readable_size(999) + "999B" + + iex> format_bytes_to_human_readable_size(1400) + "~1.4KB" + + iex> format_bytes_to_human_readable_size(123444) + "~123.4KB" + + iex> format_bytes_to_human_readable_size(98_800_431) + "~98.8MB" + + iex> format_bytes_to_human_readable_size(999_999_999) + "~1000.0MB" + + iex> format_bytes_to_human_readable_size(1_234_567_890) + "~1.2GB" + """ + def format_bytes_to_human_readable_size(bytes) do + cond do + bytes >= 1_000_000_000 -> "~#{round_decimal(bytes / 1_000_000_000)}GB" + bytes >= 1_000_000 -> "~#{round_decimal(bytes / 1_000_000)}MB" + bytes >= 1_000 -> "~#{round_decimal(bytes / 1_000)}KB" + true -> "#{bytes}B" + end + end + + defp round_decimal(float) do + float + |> Decimal.from_float() + |> Decimal.round(1) + end end diff --git a/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs b/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs index 44563af061..225ae8fafe 100644 --- a/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs +++ b/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs @@ -7,40 +7,16 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do import Support.ComponentSetup, only: [with_stack_id_from_test: 1] import Support.DbSetup - defp start_connection_resolver!(ctx, connection_mod \\ nil) do - opts = [stack_id: ctx.stack_id] - - opts = - if connection_mod do - Keyword.put(opts, :connection_mod, connection_mod) - else - opts - end - - start_supervised!({ConnectionResolver, opts}) - end - setup [ :with_unique_db, - :with_stack_id_from_test + :with_stack_id_from_test, + :start_connection_resolver ] - defmodule ErrorConnection do - def start_link(_handler, _args, conn_opts, match_fun) do - match_fun.(conn_opts) - end - end - - defp assert_obfuscated_password(conn_opts) do - assert is_function(Keyword.get(conn_opts, :password), 0) - end - # actually connect to make sure we can do that # overwrite :connection_mod with custom modules that implement start_link but exit with some pre-defined postgres error # need to assert that the connection options are mutated between attempts test "valid connection opts", ctx do - start_connection_resolver!(ctx) - db_config = Keyword.put(ctx.db_config, :sslmode, :disable) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -55,8 +31,6 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "fallback to no-ssl works with ssmodle: :prefer", ctx do - start_connection_resolver!(ctx) - db_config = Keyword.put(ctx.db_config, :sslmode, :prefer) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -71,34 +45,13 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "sslmode: :require with no ssl returns error", ctx do - start_connection_resolver!(ctx) db_config = Keyword.put(ctx.db_config, :sslmode, :require) assert {:error, %Postgrex.Error{message: "ssl not available"}} = ConnectionResolver.validate(ctx.stack_id, db_config) end - test "fly connection can fallback to no ssl", ctx do - conn = spawn(fn -> Process.sleep(:infinity) end) - - start_connection_resolver!( - ctx, - {ErrorConnection, :start_link, - [ - fn conn_opts -> - if Keyword.get(conn_opts, :ssl) do - {:error, - %DBConnection.ConnectionError{ - message: "ssl connect: closed", - severity: :error - }} - else - {:ok, conn} - end - end - ]} - ) - + test "connection can fallback to no ssl", ctx do db_config = Keyword.put(ctx.db_config, :sslmode, :prefer) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -113,8 +66,6 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "fallback to ipv4 works", ctx do - start_connection_resolver!(ctx) - db_config = Keyword.merge(ctx.db_config, ipv6: true, hostname: "local-ipv4-only.electric-sql.dev") @@ -130,27 +81,8 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "fallback to ipv4 handles various error results", ctx do - conn = spawn(fn -> Process.sleep(:infinity) end) - - start_connection_resolver!( - ctx, - {ErrorConnection, :start_link, - [ - fn conn_opts -> - if Keyword.get(conn_opts, :ipv6, true) do - {:error, - %DBConnection.ConnectionError{ - message: ipv6_error_message("localhost"), - severity: :error - }} - else - {:ok, conn} - end - end - ]} - ) - - db_config = Keyword.put(ctx.db_config, :ipv6, true) + # Use an IPv4 address for the hostname to ensure that connection attempts with socket_options: [:inet6] fail. + db_config = Keyword.merge(ctx.db_config, hostname: "127.0.0.1", ipv6: true) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -163,12 +95,12 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do assert_obfuscated_password(resolved_db_config) end - defp ipv6_error_message(hostname) do - "tcp connect (#{hostname}): " <> - Enum.random([ - "non-existing domain - :nxdomain", - "host is unreachable - :ehostunreach", - "network is unreachable - :enetunreach" - ]) + defp start_connection_resolver(ctx) do + _pid = start_supervised!({ConnectionResolver, stack_id: ctx.stack_id}) + :ok + end + + defp assert_obfuscated_password(conn_opts) do + assert is_function(Keyword.get(conn_opts, :password), 0) end end diff --git a/packages/sync-service/test/electric/connection/manager_test.exs b/packages/sync-service/test/electric/connection/manager_test.exs index d2c006dbf9..c83825a090 100644 --- a/packages/sync-service/test/electric/connection/manager_test.exs +++ b/packages/sync-service/test/electric/connection/manager_test.exs @@ -5,8 +5,8 @@ defmodule Electric.Connection.ConnectionManagerTest do import Support.ComponentSetup import Support.DbSetup - alias Electric.Replication.ShapeLogCollector alias Electric.Connection + alias Electric.Replication.ShapeLogCollector alias Electric.StatusMonitor setup [ @@ -54,10 +54,18 @@ defmodule Electric.Connection.ConnectionManagerTest do stack_events_registry: stack_events_registry ] + restarter_opts = [ + stack_id: stack_id, + stack_events_registry: stack_events_registry, + slot_name: ctx.slot_name + ] + core_sup = start_link_supervised!( {Electric.CoreSupervisor, - stack_id: stack_id, connection_manager_opts: connection_manager_opts}, + stack_id: stack_id, + connection_manager_opts: connection_manager_opts, + restarter_opts: restarter_opts}, # The test supervisor under which this one is started has `auto_shutdown` set to # `:never`, so we need to make sure the core supervisor is not a significant # child, otherwise we'd get the following error: @@ -412,6 +420,103 @@ defmodule Electric.Connection.ConnectionManagerTest do end end + describe "lock breaker query" do + test "should break an abandoned lock if slot is inactive", ctx do + Postgrex.query!( + ctx.db_conn, + "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" + ) + + test_pid = self() + + # Start a task that will hold the lock until the end of the test + start_supervised!({ + Task, + fn -> + DBConnection.run(ctx.db_conn, fn conn -> + Postgrex.query!(conn, "SELECT pg_advisory_lock(hashtext('#{ctx.slot_name}'))") + + send(test_pid, :lock_acquired) + + Process.sleep(:infinity) + end) + end + }) + + assert_receive :lock_acquired + + # Verify there's an entry for the acquired lock in pg_locks + assert %Postgrex.Result{rows: [[pg_backend_pid]], num_rows: 1} = query_lock_status(ctx) + + # Passing a pg_backend_pid to the run_lock_breaker_query() protects that backend pid from termination + conn_man_state = %{ + stack_id: ctx.stack_id, + replication_opts: [slot_name: ctx.slot_name], + replication_pg_backend_pid: pg_backend_pid + } + + assert {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} = + Electric.Connection.Manager.run_lock_breaker_query(ctx.db_config, conn_man_state) + + # Verify the lock is still held + assert %Postgrex.Result{rows: [[^pg_backend_pid]], num_rows: 1} = query_lock_status(ctx) + + # Make sure we can stop the lock connection above, so we're passing nil for replication_pg_backend_pid + conn_man_state = %{ + stack_id: ctx.stack_id, + replication_opts: [slot_name: ctx.slot_name], + replication_pg_backend_pid: nil + } + + assert {:ok, + %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 1, rows: [["t"]]}} = + Electric.Connection.Manager.run_lock_breaker_query(ctx.db_config, conn_man_state) + + # Verify that the pg_locks entry is gone + assert %Postgrex.Result{rows: [], num_rows: 0} = query_lock_status(ctx) + end + + test "doesn't break the lock if it's taken from expected lock connection", ctx do + Postgrex.query!( + ctx.db_conn, + "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" + ) + + {:ok, replication_client_pid} = + start_supervised( + {Electric.Postgres.ReplicationClient, + stack_id: ctx.stack_id, + replication_opts: [ + connection_opts: ctx.db_config, + stack_id: ctx.stack_id, + publication_name: ctx.slot_name, + try_creating_publication?: false, + slot_name: ctx.slot_name, + handle_event: nil, + connection_manager: self() + ]} + ) + + replication_client_monitor = Process.monitor(replication_client_pid) + + assert_receive {:"$gen_cast", {:pg_info_obtained, %{pg_backend_pid: pg_backend_pid}}} + assert_receive {:"$gen_cast", :replication_client_lock_acquired} + + conn_man_state = %{ + stack_id: ctx.stack_id, + replication_opts: [slot_name: ctx.slot_name], + replication_pg_backend_pid: pg_backend_pid + } + + assert {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} = + Electric.Connection.Manager.run_lock_breaker_query(ctx.db_config, conn_man_state) + + refute_received {:DOWN, ^replication_client_monitor, :process, _pid, _reason} + + stop_supervised(Electric.Postgres.ReplicationClient) + end + end + defp wait_until_active(stack_id) do assert_receive {:stack_status, _, :waiting_for_connection_lock} assert_receive {:stack_status, _, :connection_lock_acquired} @@ -426,4 +531,12 @@ defmodule Electric.Connection.ConnectionManagerTest do |> GenServer.whereis() |> Process.monitor() end + + defp query_lock_status(ctx) do + Postgrex.query!( + ctx.db_conn, + "SELECT pid FROM pg_locks WHERE (classid::bigint << 32) | objid::bigint = hashtext($1) AND locktype = 'advisory'", + [ctx.slot_name] + ) + end end diff --git a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs deleted file mode 100644 index 18fb168f20..0000000000 --- a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs +++ /dev/null @@ -1,116 +0,0 @@ -defmodule Electric.Postgres.LockBreakerConnectionTest do - use ExUnit.Case, async: true - import Support.DbSetup, except: [with_publication: 1] - import Support.ComponentSetup - - alias Electric.Postgres.LockBreakerConnection - alias Electric.Postgres.ReplicationClient - - setup [ - :with_unique_db, - :with_stack_id_from_test, - :with_lsn_tracker, - :with_slot_name - ] - - test "should break an abandoned lock if slot is inactive", ctx do - Postgrex.query!( - ctx.db_conn, - "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" - ) - - test_pid = self() - - start_supervised!({ - Task, - fn -> - DBConnection.run(ctx.db_conn, fn conn -> - Postgrex.query!( - conn, - "SELECT pg_advisory_lock(hashtext('#{ctx.slot_name}'))", - [] - ) - - send(test_pid, :lock_acquired) - - Process.sleep(:infinity) - end) - end - }) - - {:ok, lock_breaker_pid} = - Electric.Postgres.LockBreakerConnection.start( - connection_opts: ctx.db_config, - stack_id: ctx.stack_id - ) - - lock_breaker_monitor = Process.monitor(lock_breaker_pid) - - assert_receive :lock_acquired - - # Verify there's an entry for the acquired lock in pg_locks - assert %Postgrex.Result{rows: [_pg_backend_pid], num_rows: 1} = - Postgrex.query!( - ctx.db_conn, - "SELECT pid FROM pg_locks WHERE objid::bigint = hashtext($1) AND locktype = 'advisory'", - [ctx.slot_name] - ) - - # Make sure we can stop the lock connection above, so we're not specifying current pid - LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name) - - assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown} - - # Verify that the pg_locks entry is gone - assert %Postgrex.Result{rows: [], num_rows: 0} = - Postgrex.query!( - ctx.db_conn, - "SELECT pid FROM pg_locks WHERE objid::bigint = hashtext($1) AND locktype = 'advisory'", - [ctx.slot_name] - ) - end - - test "doesn't break the lock if it's taken from expected lock connection", ctx do - Postgrex.query!( - ctx.db_conn, - "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" - ) - - {:ok, replication_client_pid} = - start_supervised( - {ReplicationClient, - stack_id: ctx.stack_id, - replication_opts: [ - connection_opts: ctx.db_config, - stack_id: ctx.stack_id, - publication_name: ctx.slot_name, - try_creating_publication?: false, - slot_name: ctx.slot_name, - handle_event: nil, - connection_manager: self() - ]} - ) - - replication_client_monitor = Process.monitor(replication_client_pid) - - {:ok, lock_breaker_pid} = - start_supervised(%{ - id: :lock_breaker, - start: - {Electric.Postgres.LockBreakerConnection, :start, - [[connection_opts: ctx.db_config, stack_id: ctx.stack_id]]} - }) - - lock_breaker_monitor = Process.monitor(lock_breaker_pid) - - assert_receive {:"$gen_cast", {:pg_info_obtained, %{pg_backend_pid: pg_backend_pid}}} - assert_receive {:"$gen_cast", :replication_client_lock_acquired} - - LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name, pg_backend_pid) - - assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown} - refute_received {:DOWN, ^replication_client_monitor, :process, _pid, _reason} - - stop_supervised(ReplicationClient) - end -end