diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c9a059d6..fc9067c23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated dependencies ## Added +- Added support for `Histogram`, `ExponentialHistogram`, and `Summary` metric + types to the OpenTelemetry metrics payload generator. Each type is generated + with valid structure (bucket invariants, quantile ordering, three-way count + partitioning) and re-randomized per tick so successive payloads carry varied + data. Histogram supports optional `min`/`max` fields; `ExponentialHistogram` + varies `zero_threshold`; `Summary` randomises across five common quantile + configurations. - Added new `!concat` generator to the `templated_json` payload generator. - Use `mise` for tooling management - Added new `StaticTimestamped` payload generator that parses timestamps from a diff --git a/ci/fingerprints/otel_metrics/fingerprint.txt b/ci/fingerprints/otel_metrics/fingerprint.txt index 061d84011..a1b314c98 100644 --- a/ci/fingerprints/otel_metrics/fingerprint.txt +++ b/ci/fingerprints/otel_metrics/fingerprint.txt @@ -1 +1 @@ -otel_metrics_grpc: 1a99c0f4baf2470e547d0038dcaa6e7f134a1a31ae9dea50460e21f5e4e3a562 entropy=6.7343 +otel_metrics_grpc: a271e739e8b7c2a1fbfd248ad4c14096a5dd0f224c4c690742b7ca4da598603a entropy=6.7762 diff --git a/lading_payload/benches/opentelemetry_metric.rs b/lading_payload/benches/opentelemetry_metric.rs index 587ebeb3a..d659bc21c 100644 --- a/lading_payload/benches/opentelemetry_metric.rs +++ b/lading_payload/benches/opentelemetry_metric.rs @@ -20,6 +20,9 @@ fn opentelemetry_metric_setup(c: &mut Criterion) { gauge: 50, sum_delta: 25, sum_cumulative: 25, + histogram: 15, + exponential_histogram: 3, + summary: 2, }, contexts: Contexts { total_contexts: ConfRange::Constant(100), @@ -49,6 +52,9 @@ fn opentelemetry_metric_throughput(c: &mut Criterion) { gauge: 50, sum_delta: 25, sum_cumulative: 25, + histogram: 15, + exponential_histogram: 3, + summary: 2, }, contexts: Contexts { total_contexts: ConfRange::Constant(100), diff --git a/lading_payload/src/opentelemetry/metric.rs b/lading_payload/src/opentelemetry/metric.rs index 01067fa7f..fb45a2260 100644 --- a/lading_payload/src/opentelemetry/metric.rs +++ b/lading_payload/src/opentelemetry/metric.rs @@ -49,7 +49,10 @@ use crate::opentelemetry::common::templates::PoolError; use crate::{Error, common::config::ConfRange, common::strings}; use bytes::BytesMut; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_proto::tonic::metrics::v1::{ResourceMetrics, metric::Data, number_data_point}; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; +use opentelemetry_proto::tonic::metrics::v1::{ + Metric, ResourceMetrics, metric::Data, number_data_point, +}; use prost::Message; use serde::Deserialize; use templates::{Pool, ResourceTemplateGenerator}; @@ -63,6 +66,87 @@ pub const SMALLEST_PROTOBUF: usize = 31; /// Increment timestamps by 100 milliseconds (in nanoseconds) per tick const TIME_INCREMENT_NANOS: u64 = 1_000_000; +fn randomize_point_attributes(attributes: &mut [KeyValue], rng: &mut R) +where + R: rand::Rng + ?Sized, +{ + for attribute in attributes { + if attribute.key == "sample" || attribute.key == "spread" || attribute.key == "jitter" { + attribute.value = Some(AnyValue { + value: Some(any_value::Value::DoubleValue( + rng.random_range(0.0_f64..=1_000_000.0), + )), + }); + } + } +} + +fn metric_data_points_total(metric: &Metric) -> usize { + match &metric.data { + Some(Data::Gauge(gauge)) => gauge.data_points.len(), + Some(Data::Sum(sum)) => sum.data_points.len(), + Some(Data::Histogram(histogram)) => histogram.data_points.len(), + Some(Data::ExponentialHistogram(histogram)) => histogram.data_points.len(), + Some(Data::Summary(summary)) => summary.data_points.len(), + None => 0, + } +} + +fn trim_one_data_point(resource_metrics: &mut ResourceMetrics) -> bool { + for scope_idx in (0..resource_metrics.scope_metrics.len()).rev() { + let mut trimmed = false; + let mut remove_metric_idx = None; + let remove_scope; + + { + let scope_metrics = &mut resource_metrics.scope_metrics[scope_idx]; + for metric_idx in (0..scope_metrics.metrics.len()).rev() { + let metric = &mut scope_metrics.metrics[metric_idx]; + let did_trim = match &mut metric.data { + Some(Data::Gauge(gauge)) => gauge.data_points.pop().is_some(), + Some(Data::Sum(sum)) => sum.data_points.pop().is_some(), + Some(Data::Histogram(histogram)) => histogram.data_points.pop().is_some(), + Some(Data::ExponentialHistogram(histogram)) => { + histogram.data_points.pop().is_some() + } + Some(Data::Summary(summary)) => summary.data_points.pop().is_some(), + None => false, + }; + if did_trim { + trimmed = true; + if metric_data_points_total(metric) == 0 { + remove_metric_idx = Some(metric_idx); + } + break; + } + } + if let Some(metric_idx) = remove_metric_idx { + scope_metrics.metrics.remove(metric_idx); + } + remove_scope = scope_metrics.metrics.is_empty(); + } + + if trimmed { + if remove_scope { + resource_metrics.scope_metrics.remove(scope_idx); + } + return true; + } + } + + false +} + +fn total_data_points(resource_metrics: &ResourceMetrics) -> u64 { + let mut total = 0_u64; + for scope_metrics in &resource_metrics.scope_metrics { + for metric in &scope_metrics.metrics { + total += metric_data_points_total(metric) as u64; + } + } + total +} + /// Configure the OpenTelemetry metric payload. #[derive(Debug, Deserialize, serde::Serialize, Clone, PartialEq, Copy)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] @@ -109,14 +193,23 @@ pub struct MetricWeights { pub sum_delta: u8, /// The relative probability of generating a sum cumulative metric. pub sum_cumulative: u8, + /// The relative probability of generating a histogram metric. + pub histogram: u8, + /// The relative probability of generating an exponential histogram metric. + pub exponential_histogram: u8, + /// The relative probability of generating a summary metric. + pub summary: u8, } impl Default for MetricWeights { fn default() -> Self { Self { - gauge: 50, // 50% - sum_delta: 25, // 25% - sum_cumulative: 25, // 25% + gauge: 40, // 40% + sum_delta: 25, // 20% + sum_cumulative: 25, // 20% + histogram: 15, //15% + exponential_histogram: 3, // 3% + summary: 2, //2% } } } @@ -143,7 +236,9 @@ impl Config { // we can generate a diverse set of metrics if self.metric_weights.gauge == 0 || self.metric_weights.sum_delta == 0 - || self.metric_weights.sum_cumulative == 0 + || self.metric_weights.summary == 0 + || self.metric_weights.histogram == 0 + || self.metric_weights.exponential_histogram == 0 { return Err("Metric weights cannot be 0".to_string()); } @@ -342,6 +437,7 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { /// * Timestamps advance monotonically based on internal tick counter /// starting at epoch /// * Each call advances the tick counter by a random amount (1-60) + #[allow(clippy::too_many_lines)] fn generate(&'a mut self, rng: &mut R, budget: &mut usize) -> Result where R: rand::Rng + ?Sized, @@ -350,6 +446,7 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { self.incr_f += rng.random_range(1.0..=100.0); self.incr_i += rng.random_range(1_i64..=100_i64); + let budget_before_fetch = *budget; let mut tpl: ResourceMetrics = match self.pool.fetch(rng, budget) { Ok(t) => t.to_owned(), Err(PoolError::EmptyChoice) => { @@ -362,14 +459,12 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { // Update data points in each metric. For gauges we use random values, // for accumulating sums we increment by a fixed amount per tick. // All timestamps are updated based on the current tick. - let mut data_points_count = 0; for scope_metrics in &mut tpl.scope_metrics { for metric in &mut scope_metrics.metrics { if let Some(data) = &mut metric.data { match data { Data::Gauge(gauge) => { - data_points_count += gauge.data_points.len() as u64; for point in &mut gauge.data_points { point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; if let Some(value) = &mut point.value { @@ -385,7 +480,6 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { } } Data::Sum(sum) => { - data_points_count += sum.data_points.len() as u64; let is_accumulating = sum.is_monotonic; for point in &mut sum.data_points { point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; @@ -419,13 +513,110 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { } } } - _ => unimplemented!(), + Data::Histogram(histogram) => { + for point in &mut histogram.data_points { + randomize_point_attributes(&mut point.attributes, rng); + point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; + point.explicit_bounds = templates::random_histogram_bounds( + point.explicit_bounds.len(), + rng, + ); + let n_buckets = point.explicit_bounds.len() + 1; + let new_count: u64 = + rng.random_range(n_buckets as u64..=250_000_u64); + point.count = new_count; + point.bucket_counts = + templates::random_partition(new_count, n_buckets, rng); + point.sum = Some(rng.random_range(0.0_f64..=1_000_000.0)); + let (min, max) = templates::random_non_negative_min_max(rng); + point.min = min; + point.max = max; + } + } + Data::ExponentialHistogram(eh) => { + for point in &mut eh.data_points { + randomize_point_attributes(&mut point.attributes, rng); + point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; + + let n_pos = + point.positive.as_ref().map_or(0, |b| b.bucket_counts.len()); + let n_neg = + point.negative.as_ref().map_or(0, |b| b.bucket_counts.len()); + let partition = templates::random_partition( + rng.random_range(3_u64..=250_000_u64), + 3, + rng, + ); + let pos_count = partition[0]; + let negative_count = partition[1]; + let zero_count = partition[2]; + + point.count = pos_count + negative_count + zero_count; + point.zero_count = zero_count; + point.sum = Some(rng.random_range(-1_000_000.0_f64..=1_000_000.0)); + point.scale = rng.random_range(-20_i32..=20); + point.zero_threshold = rng.random_range(0.0_f64..=10_000.0); + let (min, max) = templates::random_signed_min_max(rng); + point.min = min; + point.max = max; + + if let Some(pos) = &mut point.positive { + pos.offset = rng.random_range(-256_i32..=256); + pos.bucket_counts = + templates::random_partition(pos_count, n_pos, rng); + } + if let Some(neg) = &mut point.negative { + neg.offset = rng.random_range(-256_i32..=256); + neg.bucket_counts = + templates::random_partition(negative_count, n_neg, rng); + } + } + } + Data::Summary(summary) => { + for point in &mut summary.data_points { + randomize_point_attributes(&mut point.attributes, rng); + point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; + let quantiles = templates::random_summary_quantiles( + point.quantile_values.len(), + rng, + ); + point.count = rng.random_range( + point.quantile_values.len().max(1) as u64..=100_000_u64, + ); + let mut val = rng.random_range(0.0_f64..=10_000.0); + for (qv, quantile) in + point.quantile_values.iter_mut().zip(quantiles) + { + qv.quantile = quantile; + val += rng.random_range(0.25_f64..=25_000.0); + qv.value = val; + } + point.sum = val * rng.random_range(1.0_f64..=4.0); + } + } } } } } - self.data_points_per_resource = data_points_count; + while tpl.encoded_len() > budget_before_fetch { + if !trim_one_data_point(&mut tpl) { + debug!( + budget_before_fetch, + actual_size = tpl.encoded_len(), + "metric payload could not be trimmed to fit budget" + ); + Err(PoolError::EmptyChoice)?; + } + } + + self.data_points_per_resource = total_data_points(&tpl); + + // The pool deducted the template's stored encoded size from budget, but + // mutations (timestamp, values, bucket counts) may change varint sizes. + // Correct the budget so it reflects the actual bytes the caller will + // consume. + *budget = budget_before_fetch.saturating_sub(tpl.encoded_len()); Ok(tpl) } @@ -893,8 +1084,15 @@ mod test { sum.aggregation_temporality.hash(&mut hasher); sum.is_monotonic.hash(&mut hasher); } - // Add other metric types as needed - _ => "unknown".hash(&mut hasher), + Data::Histogram(h) => { + "histogram".hash(&mut hasher); + h.aggregation_temporality.hash(&mut hasher); + } + Data::ExponentialHistogram(eh) => { + "exponential_histogram".hash(&mut hasher); + eh.aggregation_temporality.hash(&mut hasher); + } + Data::Summary(_) => "summary".hash(&mut hasher), } } } @@ -1006,7 +1204,30 @@ mod test { .push(point.time_unix_nano); } }, - _ => {}, + Data::Histogram(histogram) => { + for point in &histogram.data_points { + timestamps_by_metric + .entry(id) + .or_default() + .push(point.time_unix_nano); + } + }, + Data::ExponentialHistogram(eh) => { + for point in &eh.data_points { + timestamps_by_metric + .entry(id) + .or_default() + .push(point.time_unix_nano); + } + }, + Data::Summary(summary) => { + for point in &summary.data_points { + timestamps_by_metric + .entry(id) + .or_default() + .push(point.time_unix_nano); + } + }, } } } @@ -1059,6 +1280,9 @@ mod test { gauge: 0, // Only generate sum metrics sum_delta: 50, sum_cumulative: 50, + histogram: 0, + exponential_histogram: 0, + summary: 0, }, }; @@ -1099,6 +1323,9 @@ mod test { gauge: 0, // Only generate sum metrics sum_delta: 50, sum_cumulative: 50, + histogram: 0, + exponential_histogram: 0, + summary: 0, }, }; @@ -1344,6 +1571,9 @@ mod test { gauge: 0, sum_delta: 25, sum_cumulative: 25, + histogram: 15, + exponential_histogram: 3, + summary: 2, }, ..valid_config }; @@ -1354,6 +1584,9 @@ mod test { gauge: 50, sum_delta: 0, sum_cumulative: 0, + histogram: 15, + exponential_histogram: 3, + summary: 2, }, ..valid_config }; @@ -1364,9 +1597,195 @@ mod test { gauge: 0, sum_delta: 0, sum_cumulative: 0, + histogram: 0, + exponential_histogram: 0, + summary: 0, }, ..valid_config }; assert!(zero_weights.valid().is_err()); } + + proptest! { + #[test] + fn histogram_data_points_are_valid( + seed: u64, + steps in 1..u8::MAX, + budget in SMALLEST_PROTOBUF..2048_usize, + ) { + let config = Config { + metric_weights: super::MetricWeights { + gauge: 0, + sum_delta: 0, + sum_cumulative: 0, + histogram: 100, + exponential_histogram: 0, + summary: 0, + }, + ..Default::default() + }; + + let mut budget = budget; + let mut rng = SmallRng::seed_from_u64(seed); + let mut otel_metrics = OpentelemetryMetrics::new(config, budget, &mut rng)?; + + for _ in 0..steps { + if let Ok(resource_metrics) = otel_metrics.generate(&mut rng, &mut budget) { + for scope_metrics in &resource_metrics.scope_metrics { + for metric in &scope_metrics.metrics { + if let Some(Data::Histogram(h)) = &metric.data { + for point in &h.data_points { + prop_assert_eq!( + point.bucket_counts.len(), + point.explicit_bounds.len() + 1, + "bucket_counts.len() must be explicit_bounds.len() + 1" + ); + let total: u64 = point.bucket_counts.iter().sum(); + prop_assert_eq!( + total, + point.count, + "sum of bucket_counts must equal count" + ); + for bounds in point.explicit_bounds.windows(2) { + prop_assert!( + bounds[0] < bounds[1], + "explicit bounds must be strictly increasing: {} >= {}", + bounds[0], + bounds[1] + ); + } + if let (Some(min), Some(max)) = (point.min, point.max) { + prop_assert!( + min <= max, + "min ({min}) must be <= max ({max})" + ); + } + } + } + } + } + } + } + } + } + + proptest! { + #[test] + fn exponential_histogram_data_points_are_valid( + seed: u64, + steps in 1..u8::MAX, + budget in SMALLEST_PROTOBUF..2048_usize, + ) { + let config = Config { + metric_weights: super::MetricWeights { + gauge: 0, + sum_delta: 0, + sum_cumulative: 0, + histogram: 0, + exponential_histogram: 100, + summary: 0, + }, + ..Default::default() + }; + + let mut budget = budget; + let mut rng = SmallRng::seed_from_u64(seed); + let mut otel_metrics = OpentelemetryMetrics::new(config, budget, &mut rng)?; + + for _ in 0..steps { + if let Ok(resource_metrics) = otel_metrics.generate(&mut rng, &mut budget) { + for scope_metrics in &resource_metrics.scope_metrics { + for metric in &scope_metrics.metrics { + if let Some(Data::ExponentialHistogram(eh)) = &metric.data { + for point in &eh.data_points { + let pos_count: u64 = point + .positive + .as_ref() + .map_or(0, |b| b.bucket_counts.iter().sum()); + let negative_count: u64 = point + .negative + .as_ref() + .map_or(0, |b| b.bucket_counts.iter().sum()); + prop_assert_eq!( + pos_count + negative_count + point.zero_count, + point.count, + "pos_count + negative_count + zero_count must equal count" + ); + prop_assert!( + point.zero_threshold >= 0.0, + "zero_threshold must be non-negative: {}", + point.zero_threshold + ); + if let (Some(min), Some(max)) = (point.min, point.max) { + prop_assert!( + min <= max, + "min ({min}) must be <= max ({max})" + ); + } + } + } + } + } + } + } + } + } + + proptest! { + #[test] + fn summary_quantile_values_are_valid( + seed: u64, + steps in 1..u8::MAX, + budget in SMALLEST_PROTOBUF..2048_usize, + ) { + let config = Config { + metric_weights: super::MetricWeights { + gauge: 0, + sum_delta: 0, + sum_cumulative: 0, + histogram: 0, + exponential_histogram: 0, + summary: 100, + }, + ..Default::default() + }; + + let mut budget = budget; + let mut rng = SmallRng::seed_from_u64(seed); + let mut otel_metrics = OpentelemetryMetrics::new(config, budget, &mut rng)?; + + for _ in 0..steps { + if let Ok(resource_metrics) = otel_metrics.generate(&mut rng, &mut budget) { + for scope_metrics in &resource_metrics.scope_metrics { + for metric in &scope_metrics.metrics { + if let Some(Data::Summary(s)) = &metric.data { + for point in &s.data_points { + let qvs = &point.quantile_values; + for quantile in qvs { + prop_assert!( + (0.0..=1.0).contains(&quantile.quantile), + "quantiles must stay in [0, 1]: {}", + quantile.quantile + ); + } + for i in 1..qvs.len() { + prop_assert!( + qvs[i].quantile >= qvs[i - 1].quantile, + "quantiles must be non-decreasing: {} < {}", + qvs[i].quantile, qvs[i - 1].quantile + ); + prop_assert!( + qvs[i].value >= qvs[i - 1].value, + "quantile values must be non-decreasing: {} < {}", + qvs[i].value, qvs[i - 1].value + ); + } + } + } + } + } + } + } + } + } } diff --git a/lading_payload/src/opentelemetry/metric/templates.rs b/lading_payload/src/opentelemetry/metric/templates.rs index 4a6a360ab..c4ca05aac 100644 --- a/lading_payload/src/opentelemetry/metric/templates.rs +++ b/lading_payload/src/opentelemetry/metric/templates.rs @@ -1,11 +1,13 @@ use std::{cmp, rc::Rc}; use opentelemetry_proto::tonic::{ - common::v1::InstrumentationScope, + common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value}, metrics::{ self, v1::{ - Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, metric::Data, number_data_point, + ExponentialHistogramDataPoint, HistogramDataPoint, Metric, NumberDataPoint, + ResourceMetrics, ScopeMetrics, SummaryDataPoint, exponential_histogram_data_point, + metric::Data, number_data_point, summary_data_point, }, }, resource, @@ -51,6 +53,62 @@ fn exponential_weighted_range(rng: &mut R, min: u32, max: u32) max } +pub(super) fn random_histogram_bounds(n_bounds: usize, rng: &mut R) -> Vec { + let mut next = rng.random_range(0.0_f64..=1_000.0); + let mut bounds = Vec::with_capacity(n_bounds); + for _ in 0..n_bounds { + next += rng.random_range(0.25_f64..=10_000.0); + bounds.push(next); + } + bounds +} + +pub(super) fn random_non_negative_min_max( + rng: &mut R, +) -> (Option, Option) { + let min = rng.random_range(0.0_f64..=100_000.0); + let max = min + rng.random_range(0.0_f64..=1_000_000.0); + (Some(min), Some(max)) +} + +pub(super) fn random_signed_min_max(rng: &mut R) -> (Option, Option) { + let min = rng.random_range(-100_000.0_f64..=100_000.0); + let max = min + rng.random_range(0.0_f64..=1_000_000.0); + (Some(min), Some(max)) +} + +pub(super) fn random_summary_quantiles( + quantile_count: usize, + rng: &mut R, +) -> Vec { + let mut quantiles: Vec = (0..quantile_count).map(|_| rng.random()).collect(); + quantiles.sort_by(f64::total_cmp); + quantiles +} + +fn point_attributes(metadata: &[KeyValue]) -> Vec { + let mut attributes: Vec = metadata.iter().take(2).cloned().collect(); + attributes.push(KeyValue { + key: "sample".to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::DoubleValue(0.0)), + }), + }); + attributes.push(KeyValue { + key: "spread".to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::DoubleValue(0.0)), + }), + }); + attributes.push(KeyValue { + key: "jitter".to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::DoubleValue(0.0)), + }), + }); + attributes +} + struct Ndp(NumberDataPoint); impl Distribution for StandardUniform { fn sample(&self, rng: &mut R) -> Ndp @@ -112,6 +170,9 @@ impl MetricTemplateGenerator { u16::from(config.metric_weights.gauge), u16::from(config.metric_weights.sum_delta), u16::from(config.metric_weights.sum_cumulative), + u16::from(config.metric_weights.histogram), + u16::from(config.metric_weights.exponential_histogram), + u16::from(config.metric_weights.summary), ])?, unit_gen: UnitGenerator::new(), str_pool: Rc::clone(str_pool), @@ -124,6 +185,7 @@ impl<'a> crate::SizedGenerator<'a> for MetricTemplateGenerator { type Output = Metric; type Error = GeneratorError; + #[allow(clippy::too_many_lines)] fn generate( &'a mut self, rng: &mut R, @@ -175,6 +237,14 @@ impl<'a> crate::SizedGenerator<'a> for MetricTemplateGenerator { aggregation_temporality: 2, is_monotonic: rng.random_bool(0.5), }, + 3 => Kind::Histogram { + aggregation_temporality: 2, + }, + 4 => Kind::ExponentialHistogram { + aggregation_temporality: 1, + scale: rng.random_range(-3_i32..=10), + }, + 5 => Kind::Summary, _ => unreachable!(), }; @@ -193,6 +263,117 @@ impl<'a> crate::SizedGenerator<'a> for MetricTemplateGenerator { aggregation_temporality, is_monotonic, }), + Kind::Summary => { + let data_points = (0..total_data_points) + .map(|_| { + let quantiles = + random_summary_quantiles(rng.random_range(2_usize..=8), rng); + let mut val = rng.random_range(0.0_f64..=10_000.0); + let quantile_values = quantiles + .into_iter() + .map(|quantile| { + val += rng.random_range(0.25_f64..=25_000.0); + summary_data_point::ValueAtQuantile { + quantile, + value: val, + } + }) + .collect::>(); + SummaryDataPoint { + attributes: point_attributes(&metadata), + start_time_unix_nano: 0, + time_unix_nano: rng.random(), + count: rng + .random_range(quantile_values.len().max(1) as u64..=100_000_u64), + sum: val * rng.random_range(1.0_f64..=4.0), + quantile_values, + flags: 0, + } + }) + .collect(); + Data::Summary(metrics::v1::Summary { data_points }) + } + Kind::Histogram { + aggregation_temporality, + } => { + let data_points = (0..total_data_points as usize) + .map(|_| { + let n_bounds = rng.random_range(4_usize..=16); + let bounds = random_histogram_bounds(n_bounds, rng); + let n_buckets = bounds.len() + 1; + let count: u64 = rng.random_range(n_buckets as u64..=250_000_u64); + let bucket_counts = random_partition(count, n_buckets, rng); + let (min, max) = random_non_negative_min_max(rng); + + HistogramDataPoint { + attributes: point_attributes(&metadata), + start_time_unix_nano: 0, + time_unix_nano: rng.random(), + count, + sum: Some(rng.random_range(0.0_f64..=1_000_000.0)), + explicit_bounds: bounds, + bucket_counts, + exemplars: Vec::new(), + flags: 0, + min, + max, + } + }) + .collect(); + Data::Histogram(metrics::v1::Histogram { + data_points, + aggregation_temporality, + }) + } + Kind::ExponentialHistogram { + aggregation_temporality, + scale, + } => { + let data_points = (0..total_data_points as usize) + .map(|_| { + let n_buckets = rng.random_range(4_usize..=32); + let count: u64 = rng.random_range((n_buckets as u64) * 2..=250_000_u64); + let partition = random_partition(count, 3, rng); + let pos_count = partition[0]; + let neg_count = partition[1]; + let zero_count = partition[2]; + + let positive = Some(exponential_histogram_data_point::Buckets { + offset: rng.random_range(-256_i32..=256), + bucket_counts: random_partition(pos_count, n_buckets, rng), + }); + + let negative = Some(exponential_histogram_data_point::Buckets { + offset: rng.random_range(-256_i32..=256), + bucket_counts: random_partition(neg_count, n_buckets, rng), + }); + + let (min, max) = random_signed_min_max(rng); + + ExponentialHistogramDataPoint { + attributes: point_attributes(&metadata), + start_time_unix_nano: 0, + time_unix_nano: rng.random(), + count, + sum: Some(rng.random_range(-1_000_000.0_f64..=1_000_000.0)), + scale, + zero_count, + positive, + negative, + exemplars: Vec::new(), + flags: 0, + min, + max, + zero_threshold: rng.random_range(0.0_f64..=10_000.0), + } + }) + .collect(); + + Data::ExponentialHistogram(metrics::v1::ExponentialHistogram { + data_points, + aggregation_temporality, + }) + } }; let mut metric = Metric { name, @@ -223,6 +404,45 @@ impl<'a> crate::SizedGenerator<'a> for MetricTemplateGenerator { } } +pub(super) fn random_partition( + count: u64, + n_buckets: usize, + rng: &mut R, +) -> Vec { + if n_buckets == 0 { + return Vec::new(); + } + + let mut result = vec![0; n_buckets]; + if count == 0 { + return result; + } + + let guaranteed_non_zero = n_buckets.min(usize::try_from(count).unwrap_or(usize::MAX)); + for value in result.iter_mut().take(guaranteed_non_zero) { + *value = 1; + } + + let mut remaining = count.saturating_sub(guaranteed_non_zero as u64); + while remaining > 0 { + let bucket_idx = rng.random_range(0..guaranteed_non_zero.max(1)); + let chunk = if remaining == 1 { + 1 + } else { + rng.random_range(1..=remaining) + }; + result[bucket_idx] += chunk; + remaining -= chunk; + } + + for idx in (1..result.len()).rev() { + let swap_idx = rng.random_range(0..=idx); + result.swap(idx, swap_idx); + } + + result +} + fn data_points_total(metric: &Metric) -> usize { let data = &metric.data; match data { @@ -230,8 +450,12 @@ fn data_points_total(metric: &Metric) -> usize { Data::Gauge(metrics::v1::Gauge { data_points }) | Data::Sum(metrics::v1::Sum { data_points, .. }), ) => data_points.len(), + Some(Data::Histogram(metrics::v1::Histogram { data_points, .. })) => data_points.len(), + Some(Data::ExponentialHistogram(metrics::v1::ExponentialHistogram { + data_points, .. + })) => data_points.len(), + Some(Data::Summary(metrics::v1::Summary { data_points })) => data_points.len(), None => 0, - _ => unimplemented!("only gauge/sum metrics supported"), } } @@ -261,8 +485,36 @@ fn cut_data_points(metric: Metric) -> Metric { is_monotonic, })) } + Some(Data::ExponentialHistogram(metrics::v1::ExponentialHistogram { + mut data_points, + aggregation_temporality, + })) => { + let new_len = data_points.len() / 2; + data_points.truncate(new_len); + Some(Data::ExponentialHistogram( + metrics::v1::ExponentialHistogram { + data_points, + aggregation_temporality, + }, + )) + } + Some(Data::Histogram(metrics::v1::Histogram { + mut data_points, + aggregation_temporality, + })) => { + let new_len = data_points.len() / 2; + data_points.truncate(new_len); + Some(Data::Histogram(metrics::v1::Histogram { + data_points, + aggregation_temporality, + })) + } + Some(Data::Summary(metrics::v1::Summary { mut data_points })) => { + let new_len = data_points.len() / 2; + data_points.truncate(new_len); + Some(Data::Summary(metrics::v1::Summary { data_points })) + } None => None, - _ => unimplemented!("only gauge/sum metrics supported"), }; Metric { @@ -281,6 +533,14 @@ pub(crate) enum Kind { aggregation_temporality: i32, is_monotonic: bool, }, + Histogram { + aggregation_temporality: i32, + }, + ExponentialHistogram { + aggregation_temporality: i32, + scale: i32, + }, + Summary, } #[derive(Clone, Debug)] @@ -538,6 +798,9 @@ mod test { config.metric_weights.gauge = gauge; config.metric_weights.sum_delta = sum_delta; config.metric_weights.sum_cumulative = sum_cumulative; + config.metric_weights.histogram = 0; + config.metric_weights.exponential_histogram = 0; + config.metric_weights.summary = 0; let mut rng = SmallRng::seed_from_u64(seed); @@ -563,9 +826,98 @@ mod test { _ => panic!("invalid aggregation temporality"), } } - _ => panic!("invalid metric data"), + Data::Histogram(_) | Data::ExponentialHistogram(_) | Data::Summary(_) => { + panic!("unexpected new metric type when weights are zero") + } + } + } + } + } + + proptest! { + #[test] + fn metric_template_generator_new_types_generate( + seed: u64, + histogram in 0..2_u8, + exponential_histogram in 0..2_u8, + summary in 0..2_u8, + ) { + if histogram == 0 && exponential_histogram == 0 && summary == 0 { + return Ok(()); + } + + let mut config = Config::default(); + config.metric_weights.gauge = 0; + config.metric_weights.sum_delta = 0; + config.metric_weights.sum_cumulative = 0; + config.metric_weights.histogram = histogram; + config.metric_weights.exponential_histogram = exponential_histogram; + config.metric_weights.summary = summary; + + let mut rng = SmallRng::seed_from_u64(seed); + let generator_result = MetricTemplateGenerator::new( + &config, + &Rc::new(strings::RandomStringPool::with_size(&mut rng, 1024)), + &mut rng, + ); + assert!(generator_result.is_ok()); + let mut generator = generator_result.unwrap(); + + for _ in 0..100 { + let result = generator.generate(&mut rng, &mut 1024); + assert!(result.is_ok()); + let metric = result.unwrap(); + assert!(metric.data.is_some()); + match metric.data.unwrap() { + Data::Histogram(_) => assert!(histogram >= 1), + Data::ExponentialHistogram(_) => assert!(exponential_histogram >= 1), + Data::Summary(_) => assert!(summary >= 1), + Data::Gauge(_) | Data::Sum(_) => { + panic!("unexpected gauge/sum when weights are zero") + } } } } } + + proptest! { + #[test] + fn random_partition_preserves_count_and_density( + seed: u64, + count in 0_u64..10_000, + n_buckets in 1_usize..128, + ) { + let mut rng = SmallRng::seed_from_u64(seed); + let partition = random_partition(count, n_buckets, &mut rng); + + prop_assert_eq!(partition.len(), n_buckets); + prop_assert_eq!(partition.iter().sum::(), count); + + if count >= n_buckets as u64 { + prop_assert!( + partition.iter().all(|value| *value > 0), + "count {count} should populate every bucket", + ); + } + } + } + + proptest! { + #[test] + fn random_summary_quantiles_are_sorted_and_bounded( + seed: u64, + quantile_count in 0_usize..32, + ) { + let mut rng = SmallRng::seed_from_u64(seed); + let quantiles = random_summary_quantiles(quantile_count, &mut rng); + + prop_assert_eq!(quantiles.len(), quantile_count); + for quantile in &quantiles { + prop_assert!((0.0..=1.0).contains(quantile)); + } + for pair in quantiles.windows(2) { + prop_assert!(pair[0] <= pair[1]); + } + } + } }