Skip to content
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Include W3C TraceFlags (bits 0–7) in the OTLP `Span.Flags` field in `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracehttp` and `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracegrpc`. (#7438)
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427, #7474)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
7 changes: 4 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ func (b Builder[N]) ExplicitBucketHistogram(
boundaries []float64,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
default:
return b.filter(h.measure), h.cumulative
h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
}
}

Expand Down
91 changes: 91 additions & 0 deletions sdk/metric/internal/aggregate/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,97 @@ func (n *atomicCounter[N]) add(value N) {
}
}

// reset resets the internal state, and is not safe to call concurrently.
func (n *atomicCounter[N]) reset() {
n.nFloatBits.Store(0)
n.nInt.Store(0)
}

// atomicN is a generic atomic number value.
type atomicN[N int64 | float64] struct {
val atomic.Uint64
}

func (a *atomicN[N]) Load() (value N) {
v := a.val.Load()
switch any(value).(type) {
case int64:
value = N(v)
case float64:
value = N(math.Float64frombits(v))
default:
panic("unsupported type")
}
return value
}

func (a *atomicN[N]) Store(v N) {
var val uint64
switch any(v).(type) {
case int64:
val = uint64(v)
case float64:
val = math.Float64bits(float64(v))
default:
panic("unsupported type")
}
a.val.Store(val)
}

func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool {
var o, n uint64
switch any(oldN).(type) {
case int64:
o, n = uint64(oldN), uint64(newN)
case float64:
o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN))
default:
panic("unsupported type")
}
return a.val.CompareAndSwap(o, n)
}

type atomicMinMax[N int64 | float64] struct {
minimum, maximum atomicN[N]
set atomic.Bool
mu sync.Mutex
}

// init returns true if the value was used to initialize min and max.
func (s *atomicMinMax[N]) init(val N) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.set.Load() {
defer s.set.Store(true)
s.minimum.Store(val)
s.maximum.Store(val)
return true
}
return false
}

func (s *atomicMinMax[N]) Update(val N) {
if !s.set.Load() && s.init(val) {
return
}

old := s.minimum.Load()
for val < old {
if s.minimum.CompareAndSwap(old, val) {
return
}
old = s.minimum.Load()
}

old = s.maximum.Load()
for old < val {
if s.maximum.CompareAndSwap(old, val) {
return
}
old = s.maximum.Load()
}
}

// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
Expand Down
174 changes: 174 additions & 0 deletions sdk/metric/internal/aggregate/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
assert.Equal(t, int64(15), aSum.load())
}

func BenchmarkAtomicCounter(b *testing.B) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added a benchmark but I didn't see any results posted on PR or anywhere.

Is this useful benchmark then? Or there is some policy to add benchmarks for all low-level things just in case? (reminds me of YAGNI)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see tha rationales #7474 (comment)

Tests are great.

Just 2c, but adding benchmarks without ever planning to use it (realistically) might is same as adding a dead code. They could be added when we want to execute and measure. Is it used anywhere now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We run benchmarks as part of CI, and also do a diff on push: https://github.com/open-telemetry/opentelemetry-go/blob/main/.github/workflows/benchmark.yml. Example: https://github.com/open-telemetry/opentelemetry-go/actions/runs/18314631364 from my PR for improving sum measure performance.

So if someone did make a change, we would at least be able to tell if it made performance significantly worse afterwards...

b.Run("Int64", benchmarkAtomicCounter[int64])
b.Run("Float64", benchmarkAtomicCounter[float64])
}

func benchmarkAtomicCounter[N int64 | float64](b *testing.B) {
b.Run("add", func(b *testing.B) {
var a atomicCounter[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.add(2)
}
})
})
b.Run("load", func(b *testing.B) {
var a atomicCounter[N]
a.add(2)
var v N
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
v = a.load()
}
})
assert.Equal(b, N(2), v)
})
}

func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
hcwg := &hotColdWaitGroup{}
Expand All @@ -76,3 +103,150 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
}
wg.Wait()
}

func TestAtomicN(t *testing.T) {
t.Run("Int64", testAtomicN[int64])
t.Run("Float64", testAtomicN[float64])
}

func testAtomicN[N int64 | float64](t *testing.T) {
var v atomicN[N]
assert.Equal(t, N(0), v.Load())
assert.True(t, v.CompareAndSwap(0, 6))
assert.Equal(t, N(6), v.Load())
assert.False(t, v.CompareAndSwap(0, 6))
v.Store(22)
assert.Equal(t, N(22), v.Load())
}

func TestAtomicNConcurrentSafe(t *testing.T) {
t.Run("Int64", testAtomicNConcurrentSafe[int64])
t.Run("Float64", testAtomicNConcurrentSafe[float64])
}

func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) {
var wg sync.WaitGroup
var v atomicN[N]

for range 2 {
wg.Add(1)
go func() {
defer wg.Done()
got := v.Load()
assert.Equal(t, int64(0), int64(got)%6)
}()
wg.Add(1)
go func() {
defer wg.Done()
v.Store(12)
}()
wg.Add(1)
go func() {
defer wg.Done()
v.CompareAndSwap(0, 6)
}()
}
wg.Wait()
}

func BenchmarkAtomicN(b *testing.B) {
b.Run("Int64", benchmarkAtomicN[int64])
b.Run("Float64", benchmarkAtomicN[float64])
}

func benchmarkAtomicN[N int64 | float64](b *testing.B) {
b.Run("Load", func(b *testing.B) {
var a atomicN[N]
a.Store(2)
var v N
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
v = a.Load()
}
})
assert.Equal(b, N(2), v)
})
b.Run("Store", func(b *testing.B) {
var a atomicN[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Store(3)
}
})
})
b.Run("CompareAndSwap", func(b *testing.B) {
var a atomicN[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
// Make sure we swap back and forth, in-case that matters.
if i%2 == 0 {
a.CompareAndSwap(0, 1)
} else {
a.CompareAndSwap(1, 0)
}
i++
}
})
})
}

func TestAtomicMinMaxConcurrentSafe(t *testing.T) {
t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64])
t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64])
}

func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) {
var wg sync.WaitGroup
var minMax atomicMinMax[N]

assert.False(t, minMax.set.Load())
for _, i := range []float64{2, 4, 6, 8, -3, 0, 8, 0} {
wg.Add(1)
go func() {
defer wg.Done()
minMax.Update(N(i))
}()
}
wg.Wait()

assert.True(t, minMax.set.Load())
assert.Equal(t, N(-3), minMax.minimum.Load())
assert.Equal(t, N(8), minMax.maximum.Load())
}

func BenchmarkAtomicMinMax(b *testing.B) {
b.Run("Int64", benchmarkAtomicMinMax[int64])
b.Run("Float64", benchmarkAtomicMinMax[float64])
}

func benchmarkAtomicMinMax[N int64 | float64](b *testing.B) {
b.Run("UpdateIncreasing", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
a.Update(N(i))
i++
}
})
})
b.Run("UpdateDecreasing", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
a.Update(N(i))
i--
}
})
})
b.Run("UpdateConstant", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Update(N(5))
}
})
})
}
Loading
Loading