Skip to content

Commit 1ac9dba

Browse files
christopherzlialecrajeev
authored andcommitted
add pantheon migration state metrics (#4)
* support az aware hashring and multiple sts in one hashring (#129) * support az aware hashring * Update receive-controller.json * support multiple statefulsets in 1 hashring * add more logs * style * fix lint issue * debug * return when encountering error * remove whitespace * Fix k8s permissions (#133) * Fix k8s permissions * fix ci * fix ci * sync * add pantheon migration state * Revert "Fix k8s permissions (#133)" This reverts commit e545b83. --------- Co-authored-by: Alec Rajeev <[email protected]> Signed-off-by: Yi Jin <[email protected]>
1 parent f8de577 commit 1ac9dba

File tree

3 files changed

+190
-6
lines changed

3 files changed

+190
-6
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ vendor
44
jsonnetfile.lock.json
55
tmp
66
.buildxcache
7+
.idea/

main.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ import (
4141
type label = string
4242

4343
const (
44-
defaultPort = 10901
44+
defaultPort = 10901
45+
defaultReplicaFactor = 3
4546

4647
resyncPeriod = 5 * time.Minute
4748
defaultScaleTimeout = 5 * time.Second
@@ -76,6 +77,7 @@ type CmdConfig struct {
7677
ScaleTimeout time.Duration
7778
useAzAwareHashRing bool
7879
podAzAnnotationKey string
80+
migrationState string
7981
}
8082

8183
func parseFlags() CmdConfig {
@@ -98,6 +100,7 @@ func parseFlags() CmdConfig {
98100
flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy")
99101
flag.BoolVar(&config.useAzAwareHashRing, "use-az-aware-hashring", false, "A boolean to use az aware hashring to comply with Thanos v0.32+")
100102
flag.StringVar(&config.podAzAnnotationKey, "pod-az-annotation-key", "", "pod annotation key for AZ Info, If not specified or key not found, will use sts name as AZ key")
103+
flag.StringVar(&config.migrationState, "migration-state", "no-state", "[Databricks Internal] internal pantheon migration state info")
101104
flag.Parse()
102105

103106
return config
@@ -160,7 +163,9 @@ func main() {
160163
scaleTimeout: config.ScaleTimeout,
161164
useAzAwareHashRing: config.useAzAwareHashRing,
162165
podAzAnnotationKey: config.podAzAnnotationKey,
166+
migrationState: config.migrationState,
163167
}
168+
164169
c := newController(klient, logger, opt)
165170
c.registerMetrics(reg)
166171
done := make(chan struct{})
@@ -346,6 +351,7 @@ type options struct {
346351
scaleTimeout time.Duration
347352
useAzAwareHashRing bool
348353
podAzAnnotationKey string
354+
migrationState string
349355
}
350356

351357
type controller struct {
@@ -368,6 +374,7 @@ type controller struct {
368374
configmapLastSuccessfulChangeTime prometheus.Gauge
369375
hashringNodes *prometheus.GaugeVec
370376
hashringTenants *prometheus.GaugeVec
377+
pantheonMigrationState *prometheus.GaugeVec
371378
}
372379

373380
func newController(klient kubernetes.Interface, logger log.Logger, o *options) *controller {
@@ -432,6 +439,13 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) *
432439
},
433440
[]string{"name"},
434441
),
442+
pantheonMigrationState: prometheus.NewGaugeVec(
443+
prometheus.GaugeOpts{
444+
Name: "thanos_receive_controller_pantheon_migration_state",
445+
Help: "pantheon migration state",
446+
},
447+
[]string{"migration_state"},
448+
),
435449
}
436450
}
437451

@@ -446,6 +460,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) {
446460
c.configmapChangeErrors.WithLabelValues(create).Add(0)
447461
c.configmapChangeErrors.WithLabelValues(update).Add(0)
448462
c.configmapChangeErrors.WithLabelValues(other).Add(0)
463+
c.pantheonMigrationState.WithLabelValues(c.options.migrationState).Add(1)
449464
reg.MustRegister(
450465
c.reconcileAttempts,
451466
c.reconcileErrors,
@@ -455,6 +470,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) {
455470
c.configmapLastSuccessfulChangeTime,
456471
c.hashringNodes,
457472
c.hashringTenants,
473+
c.pantheonMigrationState,
458474
)
459475
}
460476
}
@@ -533,6 +549,32 @@ func (c *controller) worker(ctx context.Context) {
533549
}
534550
}
535551

552+
func (c *controller) isProvisioned(statefulsets map[string][]*appsv1.StatefulSet) bool {
553+
_, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapGeneratedName))
554+
if ok && err == nil {
555+
level.Warn(c.logger).Log("msg", "could not fetch ConfigMap", "err", err)
556+
// if the generated configmap is already present, we don't need to do anything
557+
return true
558+
}
559+
if len(statefulsets) == 0 {
560+
return false
561+
}
562+
for group, stsList := range statefulsets {
563+
level.Info(c.logger).Log("msg", "checking statefulsets group", "group", group)
564+
// at least 3 statefulsets need to be ready during provision per replication group
565+
if len(stsList) < defaultReplicaFactor {
566+
for _, sts := range stsList {
567+
level.Info(c.logger).Log("msg", "not enough statefulsets found during provision < 3",
568+
"sts", sts.Name,
569+
"replicas", sts.Spec.Replicas,
570+
"ready", sts.Status.ReadyReplicas)
571+
}
572+
return false
573+
}
574+
}
575+
return true
576+
}
577+
536578
func (c *controller) sync(ctx context.Context) {
537579
c.reconcileAttempts.Inc()
538580
configMap, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapName))
@@ -613,6 +655,11 @@ func (c *controller) sync(ctx context.Context) {
613655
time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s
614656
}
615657

658+
if !c.isProvisioned(statefulsets) {
659+
level.Error(c.logger).Log("msg", "not enough statefulsets found during provision")
660+
return
661+
}
662+
616663
c.populate(ctx, hashrings, statefulsets)
617664
level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings))
618665

@@ -632,7 +679,7 @@ func (c *controller) sync(ctx context.Context) {
632679
}
633680
}
634681

635-
func (c controller) waitForPod(ctx context.Context, name string) error {
682+
func (c *controller) waitForPod(ctx context.Context, name string) error {
636683
//nolint:staticcheck
637684
return wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
638685
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, name, metav1.GetOptions{})

0 commit comments

Comments
 (0)