diff --git a/.changeset/eleven-nails-argue.md b/.changeset/eleven-nails-argue.md new file mode 100644 index 0000000000..147520356d --- /dev/null +++ b/.changeset/eleven-nails-argue.md @@ -0,0 +1,5 @@ +--- +'@core/electric-telemetry': patch +--- + +Add binary memory, average number of off-heap binaries and their ref counts to top processes by memory metric. diff --git a/.changeset/forty-pillows-laugh.md b/.changeset/forty-pillows-laugh.md new file mode 100644 index 0000000000..1ebb0e06a6 --- /dev/null +++ b/.changeset/forty-pillows-laugh.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Include stack_id in otel opts for stack metrics. It had been omitted by mistake before. diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 28c03cda4d..d980111e73 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -29,6 +29,7 @@ jobs: - uses: actions/checkout@v4 - uses: erlef/setup-beam@v1 + id: setup_beam with: version-type: strict version-file: '.tool-versions' @@ -63,6 +64,15 @@ jobs: run: mix compile working-directory: packages/sync-service + - name: Cache lux + uses: actions/cache@v4 + with: + path: integration-tests/lux + key: '${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }}' + restore-keys: | + ${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }} + ${{ runner.os }}-lux + - name: Setup lux run: make diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux new file mode 100644 index 0000000000..400dd5e856 --- /dev/null +++ b/integration-tests/tests/otel-export.lux @@ -0,0 +1,169 @@ +[doc Verify that application and stack metrics are correctly exported via Otel] + +[include _macros.luxinc] + +[global pg_container_name=otel-export__pg] + +### + +[invoke setup_otel_collector] + +[invoke setup_pg] + +[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] + +# Span a process containing off-heap binary references and ensure its in the top 5 by memory footprint. +# Otel Collector sorts metrics in its debug output, so the process label for our process needs +# to be first lexicographically for easier output matching further down. +[shell electric] + """! + _pid = spawn_link(fn -> + Process.set_label(:A_memory_hog) + + on_heap_strings = + Enum.map(1..1_000_000, fn _ -> String.duplicate("on heap", :random.uniform(8)) end) + + off_heap_strings = + Enum.map(1..100_000, fn i -> String.duplicate("1234567890", 70 * i) end) + + receive do + pid -> send(pid, {on_heap_strings, off_heap_strings}) + end + end) + """ + + ??#PID + +[shell otel_collector] + # Verify that the Collector receives expected application metrics from Electric + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + ScopeMetrics #0 + """ + + # Verify the presence of process.memory.* metrics + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.avg_bin_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.avg_ref_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: 1\.000000 + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.binary + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.total + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + # Verify that the Collector receives stack metrics from Electric with expected resource attributes + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + -> stack_id: Str\(single_stack\) + ScopeMetrics #0 + """ + + # Verify that LSN metrics are exported + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.pg_wal_offset + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.slot_retained_wal_size + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + +### + +[cleanup] + [invoke teardown] diff --git a/integration-tests/tests/stack-telemetry.lux b/integration-tests/tests/stack-telemetry.lux deleted file mode 100644 index c9be1a5b20..0000000000 --- a/integration-tests/tests/stack-telemetry.lux +++ /dev/null @@ -1,68 +0,0 @@ -[doc Verify that all of the expected stack metrics are exported via Otel] - -[include _macros.luxinc] - -[global pg_container_name=stack-telemetry__pg] - -### - -[invoke setup_otel_collector] - -[invoke setup_pg] - -[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] - -[shell otel_collector] - # Verify that the Collector receives metrics from Electric with expected resource attributes - ??info ResourceMetrics - ??Resource attributes: - ?? -> custom.attr: Str(electric.val) - ?? -> instance.id: Str( - ?? -> name: Str(metrics) - ?? -> service.name: Str(electric) - ?? -> service.version: Str( - - # Verify that LSN metrics are exported - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.pg_wal_offset - -> Description: - -> Unit: - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag - -> Description: - -> Unit: By - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.slot_retained_wal_size - -> Description: - -> Unit: By - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - -### - -[cleanup] - [invoke teardown] diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 8a379790c8..a79af922d3 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -86,6 +86,9 @@ defmodule ElectricTelemetry.ApplicationTelemetry do def metrics(telemetry_opts) do [ last_value("process.memory.total", tags: [:process_type], unit: :byte), + last_value("process.memory.binary", tags: [:process_type], unit: :byte), + last_value("process.memory.avg_bin_count", tags: [:process_type]), + last_value("process.memory.avg_ref_count", tags: [:process_type]), last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), last_value("system.load_percent.avg1"), @@ -170,9 +173,17 @@ defmodule ElectricTelemetry.ApplicationTelemetry do end def process_memory(%{intervals_and_thresholds: %{top_process_count: process_count}}) do - for %{type: type, memory: memory} <- - ElectricTelemetry.Processes.top_memory_by_type(process_count) do - :telemetry.execute([:process, :memory], %{total: memory}, %{process_type: to_string(type)}) + for map <- ElectricTelemetry.Processes.top_memory_by_type(process_count) do + :telemetry.execute( + [:process, :memory], + %{ + total: map.proc_mem, + binary: map.binary_mem, + avg_bin_count: map.avg_bin_count, + avg_ref_count: map.avg_ref_count + }, + %{process_type: to_string(map.type)} + ) end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 01bfe44639..942ae786aa 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -20,19 +20,38 @@ defmodule ElectricTelemetry.Processes do process_list |> Enum.map(&type_and_memory/1) |> Enum.reject(&(&1.type == :dead)) - |> Enum.group_by(& &1.type, & &1.memory) - |> Enum.map(fn {type, memory} -> %{type: type, memory: Enum.sum(memory)} end) - |> Enum.sort_by(&(-&1.memory)) + |> Enum.group_by(& &1.type) + |> Enum.map(fn {type, list_of_maps} -> + {proc_mem, binary_mem, ref_count_sum, num_binaries, num_procs} = + Enum.reduce(list_of_maps, {0, 0, 0, 0, 0}, fn map, + {proc_mem, binary_mem, ref_count_sum, + num_binaries, num_procs} -> + {proc_mem + map.proc_mem, binary_mem + map.binary_mem, + ref_count_sum + map.ref_count_sum, num_binaries + map.num_binaries, num_procs + 1} + end) + + %{ + type: type, + proc_mem: proc_mem, + binary_mem: binary_mem, + avg_bin_count: num_binaries / num_procs, + avg_ref_count: if(num_binaries == 0, do: 0, else: ref_count_sum / num_binaries) + } + end) + |> Enum.sort_by(&(-&1.proc_mem)) |> Enum.take(count) end defp type_and_memory(pid) do info = info(pid) - %{type: proc_type(pid, info), memory: memory_from_info(info)} + + info + |> memory_from_info() + |> Map.put(:type, proc_type(pid, info)) end defp info(pid) do - Process.info(pid, [:dictionary, :initial_call, :label, :memory]) + Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary]) end defp proc_type(pid, info) do @@ -66,9 +85,29 @@ defmodule ElectricTelemetry.Processes do end defp memory_from_info(info) do - case info[:memory] do - bytes when is_integer(bytes) -> bytes - _ -> 0 + memory = + case info[:memory] do + bytes when is_integer(bytes) -> bytes + _ -> 0 + end + + case info[:binary] do + list when is_list(list) -> + {binary_mem, {ref_sum, num_entries}} = + Enum.reduce(list, {0, {0, 0}}, fn {_reference, size, ref_count}, + {total_size, {ref_sum, num_entries}} -> + {total_size + size, {ref_sum + ref_count, num_entries + 1}} + end) + + %{ + proc_mem: memory, + binary_mem: binary_mem, + ref_count_sum: ref_sum, + num_binaries: num_entries + } + + _ -> + %{proc_mem: memory} end end end diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index 8e314d0d8d..26391c3788 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -27,25 +27,33 @@ defmodule ElectricTelemetry.ProcessesTest do refute Process.alive?(pid1) - assert [%{memory: memory, type: :erlang}] = top_memory_by_type([pid1, pid2]) + assert [ + %{ + proc_mem: memory, + binary_mem: _, + avg_bin_count: _, + avg_ref_count: _, + type: :erlang + } + ] = top_memory_by_type([pid1, pid2]) assert is_integer(memory) end test "defaults to top 5 of all processes" do assert [ - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _} + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _} ] = top_memory_by_type() end test "allows for setting limit" do assert [ - %{memory: _, type: _}, - %{memory: _, type: _} + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _} ] = top_memory_by_type(2) end end diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 7a3bb4dfa5..9f516517c6 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -20,9 +20,14 @@ defmodule Electric.StackSupervisor.Telemetry do if Code.ensure_loaded?(ElectricTelemetry.StackTelemetry) do def child_spec(config) when is_map(config) do + otel_opts = + Keyword.get(config.telemetry_opts, :otel_opts, []) + |> Keyword.put_new(:resource, %{stack_id: config.stack_id}) + telemetry_opts = config.telemetry_opts |> Keyword.put(:stack_id, config.stack_id) + |> Keyword.put(:otel_opts, otel_opts) # Use user-provided periodic measurements or default ones otherwise |> Keyword.update( :periodic_measurements,