Skip to content

Commit 2bfaa32

Browse files
committed
thanos/receive: add feature flag to put ext labels into TSDB
Enable streaming by putting external labels into the TSDB. If the external labels ever change then the head is pruned during startup. Signed-off-by: Giedrius Statkevičius <[email protected]>
1 parent 80d53a8 commit 2bfaa32

File tree

10 files changed

+337
-21
lines changed

10 files changed

+337
-21
lines changed

cmd/thanos/receive.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
const (
6161
compressionNone = "none"
6262
metricNamesFilter = "metric-names-filter"
63+
extLabelsInTSDB = "ext-labels-in-tsdb"
6364
)
6465

6566
func registerReceive(app *extkingpin.App) {
@@ -152,6 +153,11 @@ func runReceive(
152153
multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled())
153154
level.Info(logger).Log("msg", "metric name filter feature enabled")
154155
}
156+
157+
if feature == extLabelsInTSDB {
158+
multiTSDBOptions = append(multiTSDBOptions, receive.WithExternalLabelsInTSDB())
159+
level.Info(logger).Log("msg", "external labels in TSDB feature enabled. This will make Receive dump the head block if external labels are changed.")
160+
}
155161
}
156162

157163
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA, conf.rwServerTlsMinVersion)
@@ -1100,7 +1106,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
11001106
cmd.Flag("receive.otlp-enable-target-info", "Enables target information in OTLP metrics ingested by Receive. If enabled, it converts the resource to the target info metric").Default("true").BoolVar(&rc.otlpEnableTargetInfo)
11011107
cmd.Flag("receive.otlp-promote-resource-attributes", "(Repeatable) Resource attributes to include in OTLP metrics ingested by Receive.").Default("").StringsVar(&rc.otlpResourceAttributes)
11021108

1103-
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
1109+
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+","+extLabelsInTSDB+".").Default("").Strings()
11041110

11051111
cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
11061112
Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses)

docs/components/receive.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,18 @@ func (h *Handler) writeQuorum() int {
388388

389389
So, if the replication factor is 2 then at least one write must succeed. With RF=3, two writes must succeed, and so on.
390390

391+
## Feature Flags
392+
393+
### metric-names-filter
394+
395+
If enabled then every 15 seconds Receiver will query all available metric names in each tenant and build a bloom filter from them.
396+
397+
This allows filtering out certain tenants from queriers and thus it will not require spawning a Go routine for them.
398+
399+
### ext-labels-in-tsdb
400+
401+
If enabled then it will put the current external labels as "normal" labels inside of the TSDB. This also adds a special marker to the meta files in blocks so that it would be known whether external labels are part of the series inside of the TSDB.
402+
391403
## Flags
392404

393405
```$ mdox-exec="thanos receive --help"
@@ -664,7 +676,7 @@ Flags:
664676
OTLP metrics ingested by Receive.
665677
--enable-feature= ... Comma separated experimental feature names
666678
to enable. The current list of features is
667-
metric-names-filter.
679+
metric-names-filter,ext-labels-in-tsdb.
668680
--receive.lazy-retrieval-max-buffered-responses=20
669681
The lazy retrieval strategy can buffer up to
670682
this number of responses. This is to limit the

pkg/block/metadata/meta.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ const (
5959
// that the block has been migrated to parquet format and can be safely ignored
6060
// by store gateways.
6161
ParquetMigratedExtensionKey = "parquet_migrated"
62+
63+
// ExtLabelsInTSDBKey is the key used in block extensions to indicate that
64+
// external labels have been put in the TSDB and the Series() API can
65+
// stream.
66+
ExtLabelsInTSDBKey = "ext_labels_in_tsdb"
6267
)
6368

6469
// Meta describes the a block's meta. It wraps the known TSDB meta structure and

pkg/receive/multitsdb.go

Lines changed: 138 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type MultiTSDB struct {
7272
exemplarClients map[string]*exemplars.TSDB
7373

7474
metricNameFilterEnabled bool
75+
extLabelsInTSDB bool
7576

7677
headExpandedPostingsCacheSize uint64
7778
blockExpandedPostingsCacheSize uint64
@@ -80,6 +81,14 @@ type MultiTSDB struct {
8081
// MultiTSDBOption is a functional option for MultiTSDB.
8182
type MultiTSDBOption func(mt *MultiTSDB)
8283

84+
// WithExternalLabelsInTSDB enables putting external labels in the TSDB.
85+
// This permits streaming from the TSDB to the querier.
86+
func WithExternalLabelsInTSDB() MultiTSDBOption {
87+
return func(s *MultiTSDB) {
88+
s.extLabelsInTSDB = true
89+
}
90+
}
91+
8392
// WithMetricNameFilterEnabled enables metric name filtering on TSDB clients.
8493
func WithMetricNameFilterEnabled() MultiTSDBOption {
8594
return func(s *MultiTSDB) {
@@ -302,10 +311,13 @@ func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
302311
return deletable
303312
}
304313

305-
func newTenant() *tenant {
314+
func newTenant(extLabels labels.Labels, addExtLabels bool) *tenant {
306315
return &tenant{
307-
readyS: &ReadyStorage{},
308-
mtx: &sync.RWMutex{},
316+
readyS: &ReadyStorage{
317+
extLabels: extLabels,
318+
addExtLabels: addExtLabels,
319+
},
320+
mtx: &sync.RWMutex{},
309321
}
310322
}
311323

@@ -705,6 +717,71 @@ func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ..
705717
return result
706718
}
707719

720+
func (t *MultiTSDB) getLastBlockPath(dataDir string, s *tsdb.DB) string {
721+
bls := s.Blocks()
722+
if len(bls) == 0 {
723+
return ""
724+
}
725+
726+
sort.Slice(bls, func(i, j int) bool {
727+
return bls[i].MinTime() > bls[j].MinTime()
728+
})
729+
730+
lastBlock := bls[0]
731+
732+
return path.Join(dataDir, lastBlock.Meta().ULID.String())
733+
734+
}
735+
736+
func (t *MultiTSDB) maybePruneHead(dataDir, tenantID, lastMetaPath string, curLset labels.Labels, pruneHead func() error) error {
737+
if !t.extLabelsInTSDB {
738+
return nil
739+
}
740+
741+
if lastMetaPath == "" {
742+
return nil
743+
}
744+
745+
m, err := metadata.ReadFromDir(lastMetaPath)
746+
if err != nil {
747+
return fmt.Errorf("reading meta %s: %w", lastMetaPath, err)
748+
}
749+
750+
oldLset := labels.FromMap(m.Thanos.Labels)
751+
if labels.Equal(oldLset, curLset) {
752+
return nil
753+
}
754+
755+
level.Info(t.logger).Log("msg", "changed external labelset detected, dumping the head block", "newLset", curLset.String(), "oldLset", oldLset.String())
756+
757+
if err := pruneHead(); err != nil {
758+
return fmt.Errorf("flushing head: %w", err)
759+
}
760+
761+
if t.bucket != nil {
762+
logger := log.With(t.logger, "tenant", tenantID, "oldLset", oldLset.String())
763+
reg := NewUnRegisterer(prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg))
764+
765+
ship := shipper.New(
766+
t.bucket,
767+
dataDir,
768+
shipper.WithLogger(logger),
769+
shipper.WithRegisterer(reg),
770+
shipper.WithSource(metadata.ReceiveSource),
771+
shipper.WithHashFunc(t.hashFunc),
772+
shipper.WithMetaFileName(shipper.DefaultMetaFilename),
773+
shipper.WithLabels(func() labels.Labels { return oldLset }),
774+
shipper.WithAllowOutOfOrderUploads(t.allowOutOfOrderUpload),
775+
shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks),
776+
)
777+
if _, err := ship.Sync(context.Background()); err != nil {
778+
return fmt.Errorf("syncing head for old label set: %w", err)
779+
}
780+
}
781+
782+
return nil
783+
}
784+
708785
func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
709786
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
710787
reg = NewUnRegisterer(reg)
@@ -754,19 +831,35 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
754831
t.removeTenantLocked(tenantID)
755832
return err
756833
}
834+
835+
if err := t.maybePruneHead(dataDir, tenantID, t.getLastBlockPath(dataDir, s), lset, func() error { return t.flushHead(s) }); err != nil {
836+
return err
837+
}
838+
757839
var ship *shipper.Shipper
758840
if t.bucket != nil {
759-
ship = shipper.New(
760-
t.bucket,
761-
dataDir,
762-
shipper.WithLogger(logger),
841+
shipperOpts := []shipper.Option{}
842+
843+
shipperOpts = append(shipperOpts, shipper.WithLogger(logger),
763844
shipper.WithRegisterer(reg),
764845
shipper.WithSource(metadata.ReceiveSource),
765846
shipper.WithHashFunc(t.hashFunc),
766847
shipper.WithMetaFileName(shipper.DefaultMetaFilename),
767848
shipper.WithLabels(func() labels.Labels { return lset }),
768849
shipper.WithAllowOutOfOrderUploads(t.allowOutOfOrderUpload),
769-
shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks),
850+
shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks))
851+
852+
if t.extLabelsInTSDB {
853+
shipperOpts = append(shipperOpts, shipper.WithExtensions(
854+
map[string]any{
855+
metadata.ExtLabelsInTSDBKey: "",
856+
},
857+
))
858+
}
859+
ship = shipper.New(
860+
t.bucket,
861+
dataDir,
862+
shipperOpts...,
770863
)
771864
}
772865
var options []store.TSDBStoreOption
@@ -776,7 +869,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
776869
if t.matcherCache != nil {
777870
options = append(options, store.WithMatcherCacheInstance(t.matcherCache))
778871
}
872+
options = append(options, store.WithExtLabelsInTSDB(t.extLabelsInTSDB))
873+
779874
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset), reg.(*UnRegisterer))
875+
780876
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
781877
level.Info(logger).Log("msg", "TSDB is now ready")
782878
return nil
@@ -805,7 +901,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
805901
return tenant, nil
806902
}
807903

808-
tenant = newTenant()
904+
tenant = newTenant(t.labels, t.extLabelsInTSDB)
809905
t.addTenantUnlocked(tenantID, tenant)
810906
t.mtx.Unlock()
811907

@@ -866,10 +962,12 @@ var ErrNotReady = errors.New("TSDB not ready")
866962

867963
// ReadyStorage implements the Storage interface while allowing to set the actual
868964
// storage at a later point in time.
869-
// TODO: Replace this with upstream Prometheus implementation when it is exposed.
870965
type ReadyStorage struct {
871966
mtx sync.RWMutex
872967
a *adapter
968+
969+
extLabels labels.Labels
970+
addExtLabels bool
873971
}
874972

875973
// Set the storage.
@@ -920,9 +1018,39 @@ func (s *ReadyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQue
9201018
return nil, ErrNotReady
9211019
}
9221020

1021+
type wrappingAppender struct {
1022+
addLabels labels.Labels
1023+
storage.Appender
1024+
gr storage.GetRef
1025+
}
1026+
1027+
var _ storage.Appender = (*wrappingAppender)(nil)
1028+
var _ storage.GetRef = (*wrappingAppender)(nil)
1029+
1030+
func (w *wrappingAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
1031+
return w.gr.GetRef(labelpb.ExtendSortedLabels(lset, w.addLabels), hash)
1032+
}
1033+
1034+
func (w *wrappingAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
1035+
l = labelpb.ExtendSortedLabels(l, w.addLabels)
1036+
return w.Appender.Append(ref, l, t, v)
1037+
}
1038+
9231039
// Appender implements the Storage interface.
9241040
func (s *ReadyStorage) Appender(ctx context.Context) (storage.Appender, error) {
9251041
if x := s.get(); x != nil {
1042+
if s.addExtLabels {
1043+
app, err := x.Appender(ctx)
1044+
if err != nil {
1045+
return nil, err
1046+
}
1047+
1048+
return &wrappingAppender{
1049+
Appender: app,
1050+
gr: app.(storage.GetRef),
1051+
addLabels: s.extLabels,
1052+
}, nil
1053+
}
9261054
return x.Appender(ctx)
9271055
}
9281056
return nil, ErrNotReady

pkg/receive/multitsdb_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package receive
55

66
import (
77
"context"
8+
"encoding/json"
89
"fmt"
910
"io"
1011
"math"
@@ -959,3 +960,69 @@ func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) {
959960
}, tenant.blocksToDelete(nil))
960961
})
961962
}
963+
964+
func TestDumpsIfUnequalLabels(t *testing.T) {
965+
var dumped = false
966+
967+
dumpHead := func() error {
968+
dumped = true
969+
return nil
970+
}
971+
972+
td := t.TempDir()
973+
974+
ul := ulid.MustNewDefault(time.Now())
975+
require.NoError(t, os.MkdirAll(
976+
path.Join(td, ul.String()), os.ModePerm,
977+
))
978+
979+
m := metadata.Meta{
980+
BlockMeta: tsdb.BlockMeta{
981+
ULID: ul,
982+
Version: 1,
983+
},
984+
Thanos: metadata.Thanos{
985+
Version: 1,
986+
Labels: map[string]string{
987+
"foo": "bar",
988+
},
989+
},
990+
}
991+
992+
mm, err := json.Marshal(m)
993+
require.NoError(t, err)
994+
995+
require.NoError(t, os.WriteFile(path.Join(td, ul.String(), metadata.MetaFilename), mm, os.ModePerm))
996+
997+
t.Run("unequal labels", func(t *testing.T) {
998+
m := &MultiTSDB{
999+
extLabelsInTSDB: true,
1000+
logger: log.NewNopLogger(),
1001+
}
1002+
1003+
require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("aa", "bb"), dumpHead))
1004+
require.True(t, dumped)
1005+
1006+
m.extLabelsInTSDB = false
1007+
dumped = false
1008+
1009+
require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("aa", "bb"), dumpHead))
1010+
require.False(t, dumped)
1011+
})
1012+
1013+
t.Run("equal labels", func(t *testing.T) {
1014+
m := &MultiTSDB{
1015+
extLabelsInTSDB: true,
1016+
logger: log.NewNopLogger(),
1017+
}
1018+
1019+
dumped = false
1020+
require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("foo", "bar"), dumpHead))
1021+
require.False(t, dumped)
1022+
1023+
m.extLabelsInTSDB = false
1024+
require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("foo", "bar"), dumpHead))
1025+
require.False(t, dumped)
1026+
})
1027+
1028+
}

0 commit comments

Comments
 (0)