Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
95bb206
Store the result of connection opts validation in StackConfig
alco Oct 12, 2025
e2a20ba
Implement periodic retained WAL size check in Restarter
alco Oct 13, 2025
8ffaf6c
Implement ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_IDLE_WAL_S…
alco Oct 13, 2025
6e62d86
Add changeset
alco Oct 13, 2025
546f19d
Add Electric.Postgres.OneOffConnection module to run one-off DB queries
alco Dec 11, 2025
81ec971
Reimplement Electric.Postgres.LockBreakerConnection as a one-off query
alco Dec 11, 2025
7f00c73
Remove Electric.Connection.Manager.ConnectionResolver's internal Simp…
alco Dec 11, 2025
aca04e8
Split OneOffConnection's API into separate attempt_connection() and q…
alco Dec 18, 2025
599eb07
Query retained WAL size in Restarter periodically
alco Dec 11, 2025
dcefc73
Pass the user config for periodic WAL size checks to Restarter
alco Dec 18, 2025
5f12c5f
Pretty-print retained WAL size and threshold when logging Restarter n…
alco Dec 18, 2025
1fdacf2
Set a more sensible default for WAL size threshold
alco Dec 18, 2025
f2f0ec9
Remove superfluous logging from OneOffConnection
alco Dec 18, 2025
2101152
Do not persist validation connection opts across conn man restarts
alco Dec 20, 2025
7314c01
fixup! Do not persist validation connection opts across conn man rest…
alco Dec 21, 2025
9d32a33
Use consistent prefix ELECTRIC_REPLICATION_IDLE for the new configura…
alco Dec 21, 2025
0c93d47
Cleanups for the new Lux test
alco Dec 27, 2025
3d53437
Parse WAL size threshold as human-readable size value
alco Dec 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/little-seas-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Add two new configuration options for periodic retained WAL size checks in scaled down mode: ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD.
79 changes: 79 additions & 0 deletions integration-tests/tests/wal-size-check-while-scaled-down.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[doc Verify Electric can scale down database connections after a lull in stream transactions]

[include _macros.luxinc]

[global pg_container_name=wal-size-check-while-scaled-down__pg]
[global database_url=postgresql://electric_test:password@localhost:$pg_host_port/electric?sslmode=disable]

###

# Start Postgres and create an additional role that will be used by Electric in this test
[invoke setup_pg_with_shell_name \
"pg" \
"-e INIT_DB_SQL=\"\
CREATE ROLE electric_test LOGIN PASSWORD 'password' REPLICATION;\
GRANT CREATE ON DATABASE electric TO electric_test\"" \
"" \
"-v $(realpath ../scripts/init_db.sh):/docker-entrypoint-initdb.d/initdb-init_db.sh"]

# Create a table for subsequent shape requests
[invoke start_psql]

[shell psql]
!create table items(val text);
??CREATE

!insert into items values ('1'), ('2');
??INSERT

!alter table items owner to electric_test;
??ALTER TABLE

# Start Electric and wait for it to finish initialization.
# Have to disable the ConnectionManagerPing process, otherwise it will error when
# Connection.Manager itself goes down as part of the test scenario.
[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD=1s ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD=10000"]
[shell electric]
[timeout 10]
??[debug] Replication client started streaming

# Force Electric to scale down by sending the :idle_check message to the replication client.
# Normally the client performs the check once a minute, so we're speeding it up here for
# testing purposes.
[shell electric]
!Electric.Postgres.ReplicationClient.name("single_stack") |> GenServer.whereis() |> send(:check_if_idle)
??[notice] Closing all database connections after the replication stream has been idle for

??[debug] Terminating connection manager with reason :shutdown.

# Confirm that Postgres no longer sees any open connections for the electric user
[shell psql]
[sleep 2]

!select datname, usename, backend_type, query from pg_stat_activity where usename = 'electric_test';
??(0 rows)


# Sleep until it's time to verify that Electric has checked retained WAL size in Postgres after ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD
[shell electric]
[sleep 2]

??Opening a database connection to check the retained WAL size
?Retained WAL size [0-9]+B is below the threshold of ~10\.0KB\. Scheduling the next check to take place after 1sec

# Write to the database enough data to force the retained WAL size to go over ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD
[shell psql]
!insert into items select * from generate_series(1,1000);
??INSERT

# Verify that Electric wakes up after checking the WAL size and seeing it has exceeded the threshold
[shell electric]
??Opening a database connection to check the retained WAL size
?Retained WAL size ~[0-9.]+KB has exceeded the threshold of ~10\.0KB\. Time to wake up the connection subsystem

??[info] Acquiring lock from postgres
??[info] Starting replication from postgres
??[debug] Replication client started streaming

[cleanup]
[invoke teardown]
12 changes: 12 additions & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ config :electric,
"ELECTRIC_REPLICATION_IDLE_TIMEOUT",
&Electric.Config.parse_human_readable_time!/1,
nil
),
idle_wal_size_check_period:
env!(
"ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD",
&Electric.Config.parse_human_readable_time!/1,
nil
),
idle_wal_size_threshold:
env!(
"ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD",
&Electric.Config.parse_human_readable_size!/1,
nil
)

if Electric.telemetry_enabled?() do
Expand Down
4 changes: 3 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ defmodule Electric.Application do
shape_hibernate_after: get_env(opts, :shape_hibernate_after),
shape_enable_suspend?: get_env(opts, :shape_enable_suspend?),
conn_max_requests: get_env(opts, :conn_max_requests),
process_spawn_opts: get_env(opts, :process_spawn_opts)
process_spawn_opts: get_env(opts, :process_spawn_opts),
idle_wal_size_check_period: get_env(opts, :idle_wal_size_check_period),
idle_wal_size_threshold: get_env(opts, :idle_wal_size_threshold)
],
manual_table_publishing?: get_env(opts, :manual_table_publishing?)
)
Expand Down
74 changes: 72 additions & 2 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ defmodule Electric.Config do
max_txn_size: 250 * 1024 * 1024,
# Scaling down on idle is disabled by default
replication_idle_timeout: 0,
# If the database provider scales down after 5 min and provided that the
# replication_idle_timeout is about a minute or less, checking WAL size once an hour
# ends up using about 10% of the compute on an otherwise idle database.
idle_wal_size_check_period: 3_600_000,
# We want to wake up and process any transactions that have accumulated in the WAL, hence
# the low threshold.
idle_wal_size_threshold: 100 * 1024 * 1024,
manual_table_publishing?: false,
## HTTP API
# set enable_http_api: false to turn off the HTTP server totally
Expand Down Expand Up @@ -426,9 +433,9 @@ defmodule Electric.Config do
end
end

@time_units ~w[ms msec s sec m min]
@time_units ~w[ms msec s sec m min h hr]

@spec parse_human_readable_time(binary | nil) :: {:ok, pos_integer} | {:error, binary}
@spec parse_human_readable_time(binary) :: {:ok, pos_integer} | {:error, binary}

def parse_human_readable_time(str) do
with {num, suffix} <- Float.parse(str),
Expand All @@ -445,6 +452,7 @@ defmodule Electric.Config do
defp time_multiplier(millisecond) when millisecond in ["ms", "msec"], do: 1
defp time_multiplier(second) when second in ["s", "sec"], do: 1000
defp time_multiplier(minute) when minute in ["m", "min"], do: 1000 * 60
defp time_multiplier(hour) when hour in ["h", "hr"], do: 1000 * 60 * 60

def parse_human_readable_time!(str) do
case parse_human_readable_time(str) do
Expand All @@ -453,6 +461,68 @@ defmodule Electric.Config do
end
end

@doc """
Parse human-readable memory/storage size string into bytes.

## Examples

iex> parse_human_readable_size("1GiB")
{:ok, #{1024 * 1024 * 1024}}

iex> parse_human_readable_size("2.23GB")
{:ok, 2_230_000_000}

iex> parse_human_readable_size("256MiB")
{:ok, #{256 * 1024 * 1024}}

iex> parse_human_readable_size("377MB")
{:ok, 377_000_000}

iex> parse_human_readable_size("430KiB")
{:ok, #{430 * 1024}}

iex> parse_human_readable_size("142888KB")
{:ok, 142_888_000}

iex> parse_human_readable_size("123456789")
{:ok, 123_456_789}

iex> parse_human_readable_size("")
{:error, ~S'invalid size unit: "". Must be one of ["KB", "KiB", "MB", "MiB", "GB", "GiB"]'}

iex> parse_human_readable_size("foo")
{:error, ~S'invalid size unit: "foo". Must be one of ["KB", "KiB", "MB", "MiB", "GB", "GiB"]'}
"""
@spec parse_human_readable_size(binary) :: {:ok, pos_integer} | {:error, binary}

@size_units ~w[KB KiB MB MiB GB GiB]

def parse_human_readable_size(str) do
with {num, suffix} <- Float.parse(str),
true <- num > 0,
suffix = String.trim(suffix),
true <- suffix == "" or suffix in @size_units do
{:ok, trunc(num * size_multiplier(suffix))}
else
_ -> {:error, "invalid size unit: #{inspect(str)}. Must be one of #{inspect(@size_units)}"}
end
end

defp size_multiplier(""), do: 1
defp size_multiplier("KB"), do: 1_000
defp size_multiplier("KiB"), do: 1024
defp size_multiplier("MB"), do: 1_000_000
defp size_multiplier("MiB"), do: 1024 * 1024
defp size_multiplier("GB"), do: 1_000_000_000
defp size_multiplier("GiB"), do: 1024 * 1024 * 1024

def parse_human_readable_size!(str) do
case parse_human_readable_size(str) do
{:ok, result} -> result
{:error, message} -> raise Dotenvy.Error, message: message
end
end

def validate_security_config!(secret, insecure) do
cond do
insecure && secret != nil ->
Expand Down
Loading