Skip to content

Commit 0b4bf14

Browse files
committed
feat(compact): Add Pushgateway metrics support for one-shot runs
This change introduces the ability for the `thanos compact` command to push its metrics to a Prometheus Pushgateway when running in one-shot mode (without the `--wait` flag). This is particularly useful for monitoring the compactor when it is run as a periodic batch job, such as a cron job. The following changes have been made: - Refactored progress metric calculation to run in one-shot mode. - Added `--push-gateway.url` and `--push-gateway.job` flags to configure the Pushgateway endpoint and job name. - Implemented the logic to push all registered metrics upon successful completion of a one-shot compaction run. Signed-off-by: Enno Richter <[email protected]>
1 parent 3f26e05 commit 0b4bf14

File tree

1 file changed

+112
-52
lines changed

1 file changed

+112
-52
lines changed

cmd/thanos/compact.go

Lines changed: 112 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pkg/errors"
2323
"github.com/prometheus/client_golang/prometheus"
2424
"github.com/prometheus/client_golang/prometheus/promauto"
25+
"github.com/prometheus/client_golang/prometheus/push"
2526
"github.com/prometheus/common/model"
2627
"github.com/prometheus/common/route"
2728
"github.com/prometheus/prometheus/storage"
@@ -445,6 +446,58 @@ func runCompact(
445446
return nil
446447
}
447448

449+
ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner)
450+
rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution)
451+
var ds *compact.DownsampleProgressCalculator
452+
if !conf.disableDownsampling {
453+
ds = compact.NewDownsampleProgressCalculator(reg)
454+
}
455+
progressCalcFn := func() error {
456+
if err := sy.SyncMetas(ctx); err != nil {
457+
// The RetryError signals that we hit an retriable error (transient error, no connection).
458+
// You should alert on this being triggered too frequently.
459+
if compact.IsRetryError(err) {
460+
level.Error(logger).Log("msg", "retriable error", "err", err)
461+
compactMetrics.retried.Inc()
462+
463+
return nil
464+
}
465+
466+
return errors.Wrapf(err, "could not sync metas")
467+
}
468+
469+
metas := sy.Metas()
470+
groups, err := grouper.Groups(metas)
471+
if err != nil {
472+
return errors.Wrapf(err, "could not group metadata for compaction")
473+
}
474+
475+
if err = ps.ProgressCalculate(ctx, groups); err != nil {
476+
return errors.Wrapf(err, "could not calculate compaction progress")
477+
}
478+
479+
retGroups, err := grouper.Groups(metas)
480+
if err != nil {
481+
return errors.Wrapf(err, "could not group metadata for retention")
482+
}
483+
484+
if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
485+
return errors.Wrapf(err, "could not calculate retention progress")
486+
}
487+
488+
if !conf.disableDownsampling {
489+
groups, err = grouper.Groups(metas)
490+
if err != nil {
491+
return errors.Wrapf(err, "could not group metadata into downsample groups")
492+
}
493+
if err := ds.ProgressCalculate(ctx, groups); err != nil {
494+
return errors.Wrapf(err, "could not calculate downsampling progress")
495+
}
496+
}
497+
498+
return nil
499+
}
500+
448501
compactMainFn := func() error {
449502
if err := compactor.Compact(ctx); err != nil {
450503
return errors.Wrap(err, "compaction")
@@ -535,7 +588,57 @@ func runCompact(
535588
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")
536589

537590
if !conf.wait {
538-
return compactMainFn()
591+
pushMetrics := func() error {
592+
if conf.pushGatewayURL == "" {
593+
return nil
594+
}
595+
level.Info(logger).Log("msg", "pushing metrics to Pushgateway", "url", conf.pushGatewayURL)
596+
597+
job := conf.pushGatewayJob
598+
if job == "" {
599+
job = component.String()
600+
}
601+
602+
pusher := push.New(conf.pushGatewayURL, job).Gatherer(reg)
603+
604+
hostname, err := os.Hostname()
605+
if err != nil {
606+
level.Warn(logger).Log("msg", "failed to get hostname for pushgateway grouping key", "err", err)
607+
} else {
608+
pusher = pusher.Grouping("instance", hostname)
609+
}
610+
611+
if err := pusher.Push(); err != nil {
612+
return errors.Wrap(err, "failed to push metrics to Pushgateway")
613+
}
614+
615+
level.Info(logger).Log("msg", "successfully pushed metrics to Pushgateway")
616+
return nil
617+
}
618+
619+
err := compactMainFn()
620+
if err != nil {
621+
if compact.IsHaltError(err) {
622+
level.Error(logger).Log("msg", "critical error detected; halting", "err", err)
623+
compactMetrics.halted.Set(1)
624+
} else if compact.IsRetryError(err) {
625+
level.Error(logger).Log("msg", "retriable error", "err", err)
626+
compactMetrics.retried.Inc()
627+
}
628+
629+
if pushErr := pushMetrics(); pushErr != nil {
630+
level.Error(logger).Log("msg", "failed to push metrics on error", "push_err", pushErr)
631+
}
632+
return err // Always exit with an error code.
633+
}
634+
635+
compactMetrics.iterations.Inc()
636+
637+
if err := progressCalcFn(); err != nil {
638+
level.Warn(logger).Log("msg", "failed to calculate progress metrics", "err", err)
639+
}
640+
641+
return pushMetrics()
539642
}
540643

541644
// --wait=true is specified.
@@ -643,58 +746,8 @@ func runCompact(
643746
// Periodically calculate the progress of compaction, downsampling and retention.
644747
if conf.progressCalculateInterval > 0 {
645748
g.Add(func() error {
646-
ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner)
647-
rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution)
648-
var ds *compact.DownsampleProgressCalculator
649-
if !conf.disableDownsampling {
650-
ds = compact.NewDownsampleProgressCalculator(reg)
651-
}
652-
653749
return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error {
654-
655-
if err := sy.SyncMetas(ctx); err != nil {
656-
// The RetryError signals that we hit an retriable error (transient error, no connection).
657-
// You should alert on this being triggered too frequently.
658-
if compact.IsRetryError(err) {
659-
level.Error(logger).Log("msg", "retriable error", "err", err)
660-
compactMetrics.retried.Inc()
661-
662-
return nil
663-
}
664-
665-
return errors.Wrapf(err, "could not sync metas")
666-
}
667-
668-
metas := sy.Metas()
669-
groups, err := grouper.Groups(metas)
670-
if err != nil {
671-
return errors.Wrapf(err, "could not group metadata for compaction")
672-
}
673-
674-
if err = ps.ProgressCalculate(ctx, groups); err != nil {
675-
return errors.Wrapf(err, "could not calculate compaction progress")
676-
}
677-
678-
retGroups, err := grouper.Groups(metas)
679-
if err != nil {
680-
return errors.Wrapf(err, "could not group metadata for retention")
681-
}
682-
683-
if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
684-
return errors.Wrapf(err, "could not calculate retention progress")
685-
}
686-
687-
if !conf.disableDownsampling {
688-
groups, err = grouper.Groups(metas)
689-
if err != nil {
690-
return errors.Wrapf(err, "could not group metadata into downsample groups")
691-
}
692-
if err := ds.ProgressCalculate(ctx, groups); err != nil {
693-
return errors.Wrapf(err, "could not calculate downsampling progress")
694-
}
695-
}
696-
697-
return nil
750+
return progressCalcFn()
698751
})
699752
}, func(err error) {
700753
cancel()
@@ -742,6 +795,8 @@ type compactConfig struct {
742795
progressCalculateInterval time.Duration
743796
filterConf *store.FilterConfig
744797
disableAdminOperations bool
798+
pushGatewayURL string
799+
pushGatewayJob string
745800
}
746801

747802
func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
@@ -855,5 +910,10 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
855910

856911
cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label)
857912

913+
cmd.Flag("push-gateway.url", "Prometheus Pushgateway URL to push metrics to. If not empty, compactor will push metrics on exit when in one-shot mode (--wait=false).").
914+
Default("").StringVar(&cc.pushGatewayURL)
915+
cmd.Flag("push-gateway.job", "Job name to use when pushing metrics to Prometheus Pushgateway.").
916+
Default("thanos-compact").StringVar(&cc.pushGatewayJob)
917+
858918
cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&cc.disableAdminOperations)
859919
}

0 commit comments

Comments
 (0)