Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
const (
compressionNone = "none"
metricNamesFilter = "metric-names-filter"
extLabelsInTSDB = "ext-labels-in-tsdb"
)

func registerReceive(app *extkingpin.App) {
Expand Down Expand Up @@ -152,6 +153,11 @@ func runReceive(
multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled())
level.Info(logger).Log("msg", "metric name filter feature enabled")
}

if feature == extLabelsInTSDB {
multiTSDBOptions = append(multiTSDBOptions, receive.WithExternalLabelsInTSDB())
level.Info(logger).Log("msg", "external labels in TSDB feature enabled. This will make Receive dump the head block if external labels are changed.")
}
}

rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA, conf.rwServerTlsMinVersion)
Expand Down Expand Up @@ -1100,7 +1106,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("receive.otlp-enable-target-info", "Enables target information in OTLP metrics ingested by Receive. If enabled, it converts the resource to the target info metric").Default("true").BoolVar(&rc.otlpEnableTargetInfo)
cmd.Flag("receive.otlp-promote-resource-attributes", "(Repeatable) Resource attributes to include in OTLP metrics ingested by Receive.").Default("").StringsVar(&rc.otlpResourceAttributes)

rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+","+extLabelsInTSDB+".").Default("").Strings()

cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses)
Expand Down
14 changes: 13 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,18 @@ func (h *Handler) writeQuorum() int {

So, if the replication factor is 2 then at least one write must succeed. With RF=3, two writes must succeed, and so on.

## Feature Flags

### metric-names-filter

If enabled then every 15 seconds Receiver will query all available metric names in each tenant and build a bloom filter from them.

This allows filtering out certain tenants from queriers and thus it will not require spawning a Go routine for them.

### ext-labels-in-tsdb

If enabled then it will put the current external labels as "normal" labels inside of the TSDB. This also adds a special marker to the meta files in blocks so that it would be known whether external labels are part of the series inside of the TSDB.
Copy link

@ringerc ringerc Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if this proposed addition to the readme for feature flags you've added looks sensible?:

...
If all blocks matched by a request had this feature flag enabled at the time they were written, Receive can stream responses to Series gRPC requests from Query instead of buffering them in memory. This lowers Receive's peak memory consumption for high-cardinality or high-sample-count requests.

However, storing external labels in the TSDB will increase TSDB index sizes because each external label must appear in the inverted index for every series. TSDB blocks will be somewhat larger and need more disk I/O to read, and HEAD chunks will require somewhat more memory.

If the configured external labels are changed, Receive will flush the current HEAD block to disk and start a new HEAD block. No data is discarded.


## Flags

```$ mdox-exec="thanos receive --help"
Expand Down Expand Up @@ -664,7 +676,7 @@ Flags:
OTLP metrics ingested by Receive.
--enable-feature= ... Comma separated experimental feature names
to enable. The current list of features is
metric-names-filter.
metric-names-filter,ext-labels-in-tsdb.
--receive.lazy-retrieval-max-buffered-responses=20
The lazy retrieval strategy can buffer up to
this number of responses. This is to limit the
Expand Down
5 changes: 5 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ const (
// that the block has been migrated to parquet format and can be safely ignored
// by store gateways.
ParquetMigratedExtensionKey = "parquet_migrated"

// ExtLabelsInTSDBKey is the key used in block extensions to indicate that
// external labels have been put in the TSDB and the Series() API can
// stream.
ExtLabelsInTSDBKey = "ext_labels_in_tsdb"
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
Expand Down
148 changes: 138 additions & 10 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type MultiTSDB struct {
exemplarClients map[string]*exemplars.TSDB

metricNameFilterEnabled bool
extLabelsInTSDB bool

headExpandedPostingsCacheSize uint64
blockExpandedPostingsCacheSize uint64
Expand All @@ -80,6 +81,14 @@ type MultiTSDB struct {
// MultiTSDBOption is a functional option for MultiTSDB.
type MultiTSDBOption func(mt *MultiTSDB)

// WithExternalLabelsInTSDB enables putting external labels in the TSDB.
// This permits streaming from the TSDB to the querier.
Comment on lines +84 to +85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// WithExternalLabelsInTSDB enables putting external labels in the TSDB.
// This ensures that external labels and normal labels are sorted correctly, so a Series gRPC request
// can be streamed without the need for a buffer-and-re-sort pass after injecting external labels.

func WithExternalLabelsInTSDB() MultiTSDBOption {
return func(s *MultiTSDB) {
s.extLabelsInTSDB = true
}
}

// WithMetricNameFilterEnabled enables metric name filtering on TSDB clients.
func WithMetricNameFilterEnabled() MultiTSDBOption {
return func(s *MultiTSDB) {
Expand Down Expand Up @@ -302,10 +311,13 @@ func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
return deletable
}

func newTenant() *tenant {
func newTenant(extLabels labels.Labels, addExtLabels bool) *tenant {
return &tenant{
readyS: &ReadyStorage{},
mtx: &sync.RWMutex{},
readyS: &ReadyStorage{
extLabels: extLabels,
addExtLabels: addExtLabels,
},
mtx: &sync.RWMutex{},
}
}

Expand Down Expand Up @@ -705,6 +717,71 @@ func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ..
return result
}

func (t *MultiTSDB) getLastBlockPath(dataDir string, s *tsdb.DB) string {
bls := s.Blocks()
if len(bls) == 0 {
return ""
}

sort.Slice(bls, func(i, j int) bool {
return bls[i].MinTime() > bls[j].MinTime()
})

lastBlock := bls[0]

return path.Join(dataDir, lastBlock.Meta().ULID.String())

}

func (t *MultiTSDB) maybePruneHead(dataDir, tenantID, lastMetaPath string, curLset labels.Labels, pruneHead func() error) error {
if !t.extLabelsInTSDB {
return nil
}

if lastMetaPath == "" {
return nil
}

m, err := metadata.ReadFromDir(lastMetaPath)
if err != nil {
return fmt.Errorf("reading meta %s: %w", lastMetaPath, err)
}

oldLset := labels.FromMap(m.Thanos.Labels)
if labels.Equal(oldLset, curLset) {
return nil
}

level.Info(t.logger).Log("msg", "changed external labelset detected, dumping the head block", "newLset", curLset.String(), "oldLset", oldLset.String())
Comment on lines +750 to +755
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider "flushing ... to disk" instead of "dumping"; the latter makes it sound like the data is being discarded.


if err := pruneHead(); err != nil {
return fmt.Errorf("flushing head: %w", err)
}

if t.bucket != nil {
logger := log.With(t.logger, "tenant", tenantID, "oldLset", oldLset.String())
reg := NewUnRegisterer(prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg))

ship := shipper.New(
t.bucket,
dataDir,
shipper.WithLogger(logger),
shipper.WithRegisterer(reg),
shipper.WithSource(metadata.ReceiveSource),
shipper.WithHashFunc(t.hashFunc),
shipper.WithMetaFileName(shipper.DefaultMetaFilename),
shipper.WithLabels(func() labels.Labels { return oldLset }),
shipper.WithAllowOutOfOrderUploads(t.allowOutOfOrderUpload),
shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks),
)
if _, err := ship.Sync(context.Background()); err != nil {
return fmt.Errorf("syncing head for old label set: %w", err)
}
}

return nil
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
reg = NewUnRegisterer(reg)
Expand Down Expand Up @@ -754,19 +831,35 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.removeTenantLocked(tenantID)
return err
}

if err := t.maybePruneHead(dataDir, tenantID, t.getLastBlockPath(dataDir, s), lset, func() error { return t.flushHead(s) }); err != nil {
return err
}

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
t.bucket,
dataDir,
shipper.WithLogger(logger),
shipperOpts := []shipper.Option{}

shipperOpts = append(shipperOpts, shipper.WithLogger(logger),
shipper.WithRegisterer(reg),
shipper.WithSource(metadata.ReceiveSource),
shipper.WithHashFunc(t.hashFunc),
shipper.WithMetaFileName(shipper.DefaultMetaFilename),
shipper.WithLabels(func() labels.Labels { return lset }),
shipper.WithAllowOutOfOrderUploads(t.allowOutOfOrderUpload),
shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks),
shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks))

if t.extLabelsInTSDB {
shipperOpts = append(shipperOpts, shipper.WithExtensions(
map[string]any{
metadata.ExtLabelsInTSDBKey: "",
},
))
}
ship = shipper.New(
t.bucket,
dataDir,
shipperOpts...,
)
}
var options []store.TSDBStoreOption
Expand All @@ -776,7 +869,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
if t.matcherCache != nil {
options = append(options, store.WithMatcherCacheInstance(t.matcherCache))
}
options = append(options, store.WithExtLabelsInTSDB(t.extLabelsInTSDB))

tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset), reg.(*UnRegisterer))

t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
Expand Down Expand Up @@ -805,7 +901,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
return tenant, nil
}

tenant = newTenant()
tenant = newTenant(t.labels, t.extLabelsInTSDB)
t.addTenantUnlocked(tenantID, tenant)
t.mtx.Unlock()

Expand Down Expand Up @@ -866,10 +962,12 @@ var ErrNotReady = errors.New("TSDB not ready")

// ReadyStorage implements the Storage interface while allowing to set the actual
// storage at a later point in time.
// TODO: Replace this with upstream Prometheus implementation when it is exposed.
type ReadyStorage struct {
mtx sync.RWMutex
a *adapter

extLabels labels.Labels
addExtLabels bool
}

// Set the storage.
Expand Down Expand Up @@ -920,9 +1018,39 @@ func (s *ReadyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQue
return nil, ErrNotReady
}

type wrappingAppender struct {
addLabels labels.Labels
storage.Appender
gr storage.GetRef
}

var _ storage.Appender = (*wrappingAppender)(nil)
var _ storage.GetRef = (*wrappingAppender)(nil)

func (w *wrappingAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
return w.gr.GetRef(labelpb.ExtendSortedLabels(lset, w.addLabels), hash)
}

func (w *wrappingAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
l = labelpb.ExtendSortedLabels(l, w.addLabels)
return w.Appender.Append(ref, l, t, v)
}

// Appender implements the Storage interface.
func (s *ReadyStorage) Appender(ctx context.Context) (storage.Appender, error) {
if x := s.get(); x != nil {
if s.addExtLabels {
app, err := x.Appender(ctx)
if err != nil {
return nil, err
}

return &wrappingAppender{
Appender: app,
gr: app.(storage.GetRef),
addLabels: s.extLabels,
}, nil
}
return x.Appender(ctx)
}
return nil, ErrNotReady
Expand Down
67 changes: 67 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"context"
"encoding/json"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -959,3 +960,69 @@ func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) {
}, tenant.blocksToDelete(nil))
})
}

func TestDumpsIfUnequalLabels(t *testing.T) {
var dumped = false

dumpHead := func() error { //nolint:unparam
dumped = true
return nil
}

td := t.TempDir()

ul := ulid.MustNewDefault(time.Now())
require.NoError(t, os.MkdirAll(
path.Join(td, ul.String()), os.ModePerm,
))

m := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ul,
Version: 1,
},
Thanos: metadata.Thanos{
Version: 1,
Labels: map[string]string{
"foo": "bar",
},
},
}

mm, err := json.Marshal(m)
require.NoError(t, err)

require.NoError(t, os.WriteFile(path.Join(td, ul.String(), metadata.MetaFilename), mm, os.ModePerm))

t.Run("unequal labels", func(t *testing.T) {
m := &MultiTSDB{
extLabelsInTSDB: true,
logger: log.NewNopLogger(),
}

require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("aa", "bb"), dumpHead))
require.True(t, dumped)

m.extLabelsInTSDB = false
dumped = false

require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("aa", "bb"), dumpHead))
require.False(t, dumped)
})

t.Run("equal labels", func(t *testing.T) {
m := &MultiTSDB{
extLabelsInTSDB: true,
logger: log.NewNopLogger(),
}

dumped = false
require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("foo", "bar"), dumpHead))
require.False(t, dumped)

m.extLabelsInTSDB = false
require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("foo", "bar"), dumpHead))
require.False(t, dumped)
})

}
Loading
Loading