From 1e500c42a82c514221135b858c678f15a8e8e957 Mon Sep 17 00:00:00 2001 From: Enno Richter Date: Mon, 15 Sep 2025 12:45:36 +0200 Subject: [PATCH] feat(compact): Add Pushgateway metrics support for one-shot runs This change introduces the ability for the `thanos compact` command to push its metrics to a Prometheus Pushgateway when running in one-shot mode (without the `--wait` flag). This is particularly useful for monitoring the compactor when it is run as a periodic batch job, such as a cron job. The following changes have been made: - Refactored progress metric calculation to run in one-shot mode. - Added `--push-gateway.url` and `--push-gateway.job` flags to configure the Pushgateway endpoint and job name. - Implemented the logic to push all registered metrics upon successful completion of a one-shot compaction run. Signed-off-by: Enno Richter --- CHANGELOG.md | 1 + cmd/thanos/compact.go | 164 ++++++++++++++++++++++++++++-------------- 2 files changed, 113 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c4edac0e83..bf311406a75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#8488](https://github.com/thanos-io/thanos/pull/8488) Compact: Add ability to push metrics to a Pushgateway for one-shot runs, including metrics for failed runs. - [#8366](https://github.com/thanos-io/thanos/pull/8366) Store: optionally ignore Parquet migrated blocks - [#8359](https://github.com/thanos-io/thanos/pull/8359) Tools: add `--shipper.upload-compacted` flag for uploading compacted blocks to bucket upload-blocks diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 22a8dbbf2f3..55231b4d24b 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/push" "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/storage" @@ -445,6 +446,58 @@ func runCompact( return nil } + ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner) + rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution) + var ds *compact.DownsampleProgressCalculator + if !conf.disableDownsampling { + ds = compact.NewDownsampleProgressCalculator(reg) + } + progressCalcFn := func() error { + if err := sy.SyncMetas(ctx); err != nil { + // The RetryError signals that we hit an retriable error (transient error, no connection). + // You should alert on this being triggered too frequently. + if compact.IsRetryError(err) { + level.Error(logger).Log("msg", "retriable error", "err", err) + compactMetrics.retried.Inc() + + return nil + } + + return errors.Wrapf(err, "could not sync metas") + } + + metas := sy.Metas() + groups, err := grouper.Groups(metas) + if err != nil { + return errors.Wrapf(err, "could not group metadata for compaction") + } + + if err = ps.ProgressCalculate(ctx, groups); err != nil { + return errors.Wrapf(err, "could not calculate compaction progress") + } + + retGroups, err := grouper.Groups(metas) + if err != nil { + return errors.Wrapf(err, "could not group metadata for retention") + } + + if err = rs.ProgressCalculate(ctx, retGroups); err != nil { + return errors.Wrapf(err, "could not calculate retention progress") + } + + if !conf.disableDownsampling { + groups, err = grouper.Groups(metas) + if err != nil { + return errors.Wrapf(err, "could not group metadata into downsample groups") + } + if err := ds.ProgressCalculate(ctx, groups); err != nil { + return errors.Wrapf(err, "could not calculate downsampling progress") + } + } + + return nil + } + compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction") @@ -535,7 +588,57 @@ func runCompact( defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client") if !conf.wait { - return compactMainFn() + pushMetrics := func() error { + if conf.pushGatewayURL == "" { + return nil + } + level.Info(logger).Log("msg", "pushing metrics to Pushgateway", "url", conf.pushGatewayURL) + + job := conf.pushGatewayJob + if job == "" { + job = component.String() + } + + pusher := push.New(conf.pushGatewayURL, job).Gatherer(reg) + + hostname, err := os.Hostname() + if err != nil { + level.Warn(logger).Log("msg", "failed to get hostname for pushgateway grouping key", "err", err) + } else { + pusher = pusher.Grouping("instance", hostname) + } + + if err := pusher.Push(); err != nil { + return errors.Wrap(err, "failed to push metrics to Pushgateway") + } + + level.Info(logger).Log("msg", "successfully pushed metrics to Pushgateway") + return nil + } + + err := compactMainFn() + if err != nil { + if compact.IsHaltError(err) { + level.Error(logger).Log("msg", "critical error detected; halting", "err", err) + compactMetrics.halted.Set(1) + } else if compact.IsRetryError(err) { + level.Error(logger).Log("msg", "retriable error", "err", err) + compactMetrics.retried.Inc() + } + + if pushErr := pushMetrics(); pushErr != nil { + level.Error(logger).Log("msg", "failed to push metrics on error", "push_err", pushErr) + } + return err // Always exit with an error code. + } + + compactMetrics.iterations.Inc() + + if err := progressCalcFn(); err != nil { + level.Warn(logger).Log("msg", "failed to calculate progress metrics", "err", err) + } + + return pushMetrics() } // --wait=true is specified. @@ -643,58 +746,8 @@ func runCompact( // Periodically calculate the progress of compaction, downsampling and retention. if conf.progressCalculateInterval > 0 { g.Add(func() error { - ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner) - rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution) - var ds *compact.DownsampleProgressCalculator - if !conf.disableDownsampling { - ds = compact.NewDownsampleProgressCalculator(reg) - } - return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error { - - if err := sy.SyncMetas(ctx); err != nil { - // The RetryError signals that we hit an retriable error (transient error, no connection). - // You should alert on this being triggered too frequently. - if compact.IsRetryError(err) { - level.Error(logger).Log("msg", "retriable error", "err", err) - compactMetrics.retried.Inc() - - return nil - } - - return errors.Wrapf(err, "could not sync metas") - } - - metas := sy.Metas() - groups, err := grouper.Groups(metas) - if err != nil { - return errors.Wrapf(err, "could not group metadata for compaction") - } - - if err = ps.ProgressCalculate(ctx, groups); err != nil { - return errors.Wrapf(err, "could not calculate compaction progress") - } - - retGroups, err := grouper.Groups(metas) - if err != nil { - return errors.Wrapf(err, "could not group metadata for retention") - } - - if err = rs.ProgressCalculate(ctx, retGroups); err != nil { - return errors.Wrapf(err, "could not calculate retention progress") - } - - if !conf.disableDownsampling { - groups, err = grouper.Groups(metas) - if err != nil { - return errors.Wrapf(err, "could not group metadata into downsample groups") - } - if err := ds.ProgressCalculate(ctx, groups); err != nil { - return errors.Wrapf(err, "could not calculate downsampling progress") - } - } - - return nil + return progressCalcFn() }) }, func(err error) { cancel() @@ -742,6 +795,8 @@ type compactConfig struct { progressCalculateInterval time.Duration filterConf *store.FilterConfig disableAdminOperations bool + pushGatewayURL string + pushGatewayJob string } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -855,5 +910,10 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label) + cmd.Flag("push-gateway.url", "Prometheus Pushgateway URL to push metrics to. If not empty, compactor will push metrics on exit when in one-shot mode (--wait=false)."). + Default("").StringVar(&cc.pushGatewayURL) + cmd.Flag("push-gateway.job", "Job name to use when pushing metrics to Prometheus Pushgateway."). + Default("thanos-compact").StringVar(&cc.pushGatewayJob) + cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&cc.disableAdminOperations) }