Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eleven-nails-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/electric-telemetry': patch
---

Add binary memory, average number of off-heap binaries and their ref counts to top processes by memory metric.
5 changes: 5 additions & 0 deletions .changeset/forty-pillows-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Include stack_id in otel opts for stack metrics. It had been omitted by mistake before.
10 changes: 10 additions & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
- uses: actions/checkout@v4

- uses: erlef/setup-beam@v1
id: setup_beam
with:
version-type: strict
version-file: '.tool-versions'
Expand Down Expand Up @@ -63,6 +64,15 @@ jobs:
run: mix compile
working-directory: packages/sync-service

- name: Cache lux
uses: actions/cache@v4
with:
path: integration-tests/lux
key: '${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }}'
restore-keys: |
${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }}
${{ runner.os }}-lux

- name: Setup lux
run: make

Expand Down
169 changes: 169 additions & 0 deletions integration-tests/tests/otel-export.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
[doc Verify that application and stack metrics are correctly exported via Otel]

[include _macros.luxinc]

[global pg_container_name=otel-export__pg]

###

[invoke setup_otel_collector]

[invoke setup_pg]

[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"]

# Span a process containing off-heap binary references and ensure its in the top 5 by memory footprint.
# Otel Collector sorts metrics in its debug output, so the process label for our process needs
# to be first lexicographically for easier output matching further down.
[shell electric]
"""!
_pid = spawn_link(fn ->
Process.set_label(:A_memory_hog)

on_heap_strings =
Enum.map(1..1_000_000, fn _ -> String.duplicate("on heap", :random.uniform(8)) end)

off_heap_strings =
Enum.map(1..100_000, fn i -> String.duplicate("1234567890", 70 * i) end)

receive do
pid -> send(pid, {on_heap_strings, off_heap_strings})
end
end)
"""

??#PID

[shell otel_collector]
# Verify that the Collector receives expected application metrics from Electric
"""?
info ResourceMetrics #0
Resource SchemaURL:
Resource attributes:
-> custom.attr: Str\(electric.val\)
-> instance.id: Str\([-a-f0-9]+\)
-> name: Str\(metrics\)
-> service.name: Str\(electric\)
-> service.version: Str\([0-9.]+\)
ScopeMetrics #0
"""

# Verify the presence of process.memory.* metrics
"""?
Metric #[0-9]+
Descriptor:
-> Name: process\.memory\.avg_bin_count
-> Description:
-> Unit:
-> DataType: Gauge
NumberDataPoints #0
Data point attributes:
-> process_type: Str\(A_memory_hog\)
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: [.0-9]+
"""

"""?
Metric #[0-9]+
Descriptor:
-> Name: process\.memory\.avg_ref_count
-> Description:
-> Unit:
-> DataType: Gauge
NumberDataPoints #0
Data point attributes:
-> process_type: Str\(A_memory_hog\)
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: 1\.000000
"""

"""?
Metric #[0-9]+
Descriptor:
-> Name: process\.memory\.binary
-> Description:
-> Unit: By
-> DataType: Gauge
NumberDataPoints #0
Data point attributes:
-> process_type: Str\(A_memory_hog\)
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: [0-9]+
"""

"""?
Metric #[0-9]+
Descriptor:
-> Name: process\.memory\.total
-> Description:
-> Unit: By
-> DataType: Gauge
NumberDataPoints #0
Data point attributes:
-> process_type: Str\(A_memory_hog\)
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: [0-9]+
"""

# Verify that the Collector receives stack metrics from Electric with expected resource attributes
"""?
info ResourceMetrics #0
Resource SchemaURL:
Resource attributes:
-> custom.attr: Str\(electric.val\)
-> instance.id: Str\([-a-f0-9]+\)
-> name: Str\(metrics\)
-> service.name: Str\(electric\)
-> service.version: Str\([0-9.]+\)
-> stack_id: Str\(single_stack\)
ScopeMetrics #0
"""

# Verify that LSN metrics are exported
"""?
Metric #[0-9]+
Descriptor:
-> Name: electric\.postgres\.replication\.pg_wal_offset
-> Description:
-> Unit:
-> DataType: Gauge
NumberDataPoints #0
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: [-+.0-9]+
"""

"""?
Metric #[0-9]+
Descriptor:
-> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag
-> Description:
-> Unit: By
-> DataType: Gauge
NumberDataPoints #0
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: [-+.0-9]+
"""

"""?
Metric #[0-9]+
Descriptor:
-> Name: electric\.postgres\.replication\.slot_retained_wal_size
-> Description:
-> Unit: By
-> DataType: Gauge
NumberDataPoints #0
StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC
Value: [-+.0-9]+
"""

###

[cleanup]
[invoke teardown]
68 changes: 0 additions & 68 deletions integration-tests/tests/stack-telemetry.lux

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ defmodule ElectricTelemetry.ApplicationTelemetry do
def metrics(telemetry_opts) do
[
last_value("process.memory.total", tags: [:process_type], unit: :byte),
last_value("process.memory.binary", tags: [:process_type], unit: :byte),
last_value("process.memory.avg_bin_count", tags: [:process_type]),
last_value("process.memory.avg_ref_count", tags: [:process_type]),
last_value("system.cpu.core_count"),
last_value("system.cpu.utilization.total"),
last_value("system.load_percent.avg1"),
Expand Down Expand Up @@ -170,9 +173,17 @@ defmodule ElectricTelemetry.ApplicationTelemetry do
end

def process_memory(%{intervals_and_thresholds: %{top_process_count: process_count}}) do
for %{type: type, memory: memory} <-
ElectricTelemetry.Processes.top_memory_by_type(process_count) do
:telemetry.execute([:process, :memory], %{total: memory}, %{process_type: to_string(type)})
for map <- ElectricTelemetry.Processes.top_memory_by_type(process_count) do
:telemetry.execute(
[:process, :memory],
%{
total: map.proc_mem,
binary: map.binary_mem,
avg_bin_count: map.avg_bin_count,
avg_ref_count: map.avg_ref_count
},
%{process_type: to_string(map.type)}
)
end
end

Expand Down
55 changes: 47 additions & 8 deletions packages/electric-telemetry/lib/electric/telemetry/processes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,38 @@ defmodule ElectricTelemetry.Processes do
process_list
|> Enum.map(&type_and_memory/1)
|> Enum.reject(&(&1.type == :dead))
|> Enum.group_by(& &1.type, & &1.memory)
|> Enum.map(fn {type, memory} -> %{type: type, memory: Enum.sum(memory)} end)
|> Enum.sort_by(&(-&1.memory))
|> Enum.group_by(& &1.type)
|> Enum.map(fn {type, list_of_maps} ->
{proc_mem, binary_mem, ref_count_sum, num_binaries, num_procs} =
Enum.reduce(list_of_maps, {0, 0, 0, 0, 0}, fn map,
{proc_mem, binary_mem, ref_count_sum,
num_binaries, num_procs} ->
{proc_mem + map.proc_mem, binary_mem + map.binary_mem,
ref_count_sum + map.ref_count_sum, num_binaries + map.num_binaries, num_procs + 1}
end)

%{
type: type,
proc_mem: proc_mem,
binary_mem: binary_mem,
avg_bin_count: num_binaries / num_procs,
avg_ref_count: if(num_binaries == 0, do: 0, else: ref_count_sum / num_binaries)
}
end)
|> Enum.sort_by(&(-&1.proc_mem))
|> Enum.take(count)
end

defp type_and_memory(pid) do
info = info(pid)
%{type: proc_type(pid, info), memory: memory_from_info(info)}

info
|> memory_from_info()
|> Map.put(:type, proc_type(pid, info))
end

defp info(pid) do
Process.info(pid, [:dictionary, :initial_call, :label, :memory])
Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary])
end

defp proc_type(pid, info) do
Expand Down Expand Up @@ -66,9 +85,29 @@ defmodule ElectricTelemetry.Processes do
end

defp memory_from_info(info) do
case info[:memory] do
bytes when is_integer(bytes) -> bytes
_ -> 0
memory =
case info[:memory] do
bytes when is_integer(bytes) -> bytes
_ -> 0
end

case info[:binary] do
list when is_list(list) ->
{binary_mem, {ref_sum, num_entries}} =
Enum.reduce(list, {0, {0, 0}}, fn {_reference, size, ref_count},
{total_size, {ref_sum, num_entries}} ->
{total_size + size, {ref_sum + ref_count, num_entries + 1}}
end)

%{
proc_mem: memory,
binary_mem: binary_mem,
ref_count_sum: ref_sum,
num_binaries: num_entries
}

_ ->
%{proc_mem: memory}
end
end
end
Loading