Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/statistics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
Subsystem: "cluster",
Name: "status",
Help: "Status of the cluster.",
}, []string{"type", "engine"})
}, []string{"type", "engine", "store"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update metrics/grafana/pd.json?

Copy link
Author

@SerjKol80 SerjKol80 Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhy1024
I don't think so. All queries for pd_cluster_status metrics already aggregate that metric with sum(). Thus, there should no be any changes there. The description of that metrics should be updated, but I wasn't able to find doc describing metrics in this repo.


placementStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
248 changes: 72 additions & 176 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/prometheus/client_golang/prometheus"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -30,116 +31,89 @@ import (
const (
unknown = "unknown"
labelType = "label"
)

var (
// tikv status counters.
tikvUpCounter = clusterStatusGauge.WithLabelValues("store_up_count", "tikv")
tikvDiconnectedCounter = clusterStatusGauge.WithLabelValues("store_disconnected_count", "tikv")
tikvDownCounter = clusterStatusGauge.WithLabelValues("store_down_count", "tikv")
tikvUnhealthCounter = clusterStatusGauge.WithLabelValues("store_unhealth_count", "tikv")
tikvOfflineCounter = clusterStatusGauge.WithLabelValues("store_offline_count", "tikv")
tikvTombstoneCounter = clusterStatusGauge.WithLabelValues("store_tombstone_count", "tikv")
tikvLowSpaceCounter = clusterStatusGauge.WithLabelValues("store_low_space_count", "tikv")
tikvPreparingCounter = clusterStatusGauge.WithLabelValues("store_preparing_count", "tikv")
tikvServingCounter = clusterStatusGauge.WithLabelValues("store_serving_count", "tikv")
tikvRemovingCounter = clusterStatusGauge.WithLabelValues("store_removing_count", "tikv")
tikvRemovedCounter = clusterStatusGauge.WithLabelValues("store_removed_count", "tikv")

// tiflash status counters.
tiflashUpCounter = clusterStatusGauge.WithLabelValues("store_up_count", "tiflash")
tiflashDiconnectedCounter = clusterStatusGauge.WithLabelValues("store_disconnected_count", "tiflash")
tiflashDownCounter = clusterStatusGauge.WithLabelValues("store_down_count", "tiflash")
tiflashUnhealthCounter = clusterStatusGauge.WithLabelValues("store_unhealth_count", "tiflash")
tiflashOfflineCounter = clusterStatusGauge.WithLabelValues("store_offline_count", "tiflash")
tiflashTombstoneCounter = clusterStatusGauge.WithLabelValues("store_tombstone_count", "tiflash")
tiflashLowSpaceCounter = clusterStatusGauge.WithLabelValues("store_low_space_count", "tiflash")
tiflashPreparingCounter = clusterStatusGauge.WithLabelValues("store_preparing_count", "tiflash")
tiflashServingCounter = clusterStatusGauge.WithLabelValues("store_serving_count", "tiflash")
tiflashRemovingCounter = clusterStatusGauge.WithLabelValues("store_removing_count", "tiflash")
tiflashRemovedCounter = clusterStatusGauge.WithLabelValues("store_removed_count", "tiflash")

// Store status metrics.
storeRegionCountGauge = clusterStatusGauge.WithLabelValues("region_count", "all")
storeLeaderCountGauge = clusterStatusGauge.WithLabelValues("leader_count", "all")
storeWitnessCountGauge = clusterStatusGauge.WithLabelValues("witness_count", "all")
storeLearnerCountGauge = clusterStatusGauge.WithLabelValues("learner_count", "all")
storeStorageSizeGauge = clusterStatusGauge.WithLabelValues("storage_size", "all")
storeStorageCapacityGauge = clusterStatusGauge.WithLabelValues("storage_capacity", "all")
ClusterStatusStoreUpCount = "store_up_count"
ClusterStatusStoreDisconnectedCount = "store_disconnected_count"
ClusterStatusStoreSlowCount = "store_slow_count"
ClusterStatusStoreDownCount = "store_down_count"
ClusterStatusStoreUnhealthCount = "store_unhealth_count"
ClusterStatusStoreOfflineCount = "store_offline_count"
ClusterStatusStoreTombstoneCount = "store_tombstone_count"
ClusterStatusStoreLowSpaceCount = "store_low_space_count"
ClusterStatusStorePreparingCount = "store_preparing_count"
ClusterStatusStoreServingCount = "store_serving_count"
ClusterStatusStoreRemovingCount = "store_removing_count"
ClusterStatusStoreRemovedCount = "store_removed_count"

ClusterStatusRegionCount = "region_count"
ClusterStatusLeaderCount = "leader_count"
ClusterStatusWitnessCount = "witness_count"
ClusterStatusLearnerCount = "learner_count"
ClusterStatusStorageSize = "storage_size"
ClusterStatusStorageCapacity = "storage_capacity"
)

type storeStatistics struct {
opt config.ConfProvider
StorageSize uint64
StorageCapacity uint64
RegionCount int
LeaderCount int
LearnerCount int
WitnessCount int
LabelCounter map[string][]uint64

engineStatistics map[string]*storeStatusStatistics
opt config.ConfProvider
LabelCounter map[string][]uint64
}

type storeStatusStatistics struct {
opt config.ConfProvider
Up int
Disconnect int
Unhealthy int
Down int
Offline int
Tombstone int
LowSpace int
Slow int
Preparing int
Serving int
Removing int
Removed int
func newStoreStatistics(opt config.ConfProvider) *storeStatistics {
return &storeStatistics{
opt: opt,
LabelCounter: make(map[string][]uint64),
}
}

func (s *storeStatusStatistics) observe(store *core.StoreInfo) {
func (s *storeStatistics) observeStoreStatus(store *core.StoreInfo) map[string]float64 {
result := map[string]float64{
ClusterStatusStoreUpCount: 0,
ClusterStatusStoreDisconnectedCount: 0,
ClusterStatusStoreSlowCount: 0,
ClusterStatusStoreDownCount: 0,
ClusterStatusStoreUnhealthCount: 0,
ClusterStatusStoreOfflineCount: 0,
ClusterStatusStoreTombstoneCount: 0,
ClusterStatusStoreLowSpaceCount: 0,
ClusterStatusStorePreparingCount: 0,
ClusterStatusStoreServingCount: 0,
ClusterStatusStoreRemovingCount: 0,
ClusterStatusStoreRemovedCount: 0,
}

// Store state.
isDown := false
switch store.GetNodeState() {
case metapb.NodeState_Preparing, metapb.NodeState_Serving:
if store.DownTime() >= s.opt.GetMaxStoreDownTime() {
isDown = true
s.Down++
result[ClusterStatusStoreDownCount]++
} else if store.IsUnhealthy() {
s.Unhealthy++
result[ClusterStatusStoreUnhealthCount]++
} else if store.IsDisconnected() {
s.Disconnect++
result[ClusterStatusStoreDisconnectedCount]++
} else if store.IsSlow() {
s.Slow++
result[ClusterStatusStoreSlowCount]++
} else {
s.Up++
result[ClusterStatusStoreUpCount]++
}
if store.IsPreparing() {
s.Preparing++
result[ClusterStatusStorePreparingCount]++
} else {
s.Serving++
result[ClusterStatusStoreServingCount]++
}
case metapb.NodeState_Removing:
s.Offline++
s.Removing++
result[ClusterStatusStoreOfflineCount]++
result[ClusterStatusStoreRemovingCount]++
case metapb.NodeState_Removed:
s.Tombstone++
s.Removed++
return
result[ClusterStatusStoreTombstoneCount]++
result[ClusterStatusStoreRemovedCount]++
return result
}
if !isDown && store.IsLowSpace(s.opt.GetLowSpaceRatio()) {
s.LowSpace++
}
}

func newStoreStatistics(opt config.ConfProvider) *storeStatistics {
statistics := make(map[string]*storeStatusStatistics, 1)
statistics[core.EngineTiKV] = &storeStatusStatistics{opt: opt}
return &storeStatistics{
opt: opt,
LabelCounter: make(map[string][]uint64),
engineStatistics: statistics,
result[ClusterStatusStoreLowSpaceCount]++
}
return result
}

func (s *storeStatistics) observe(store *core.StoreInfo) {
Expand All @@ -156,31 +130,28 @@ func (s *storeStatistics) observe(store *core.StoreInfo) {
}
storeAddress := store.GetAddress()
id := strconv.FormatUint(store.GetID(), 10)
// Store state.
var statistics *storeStatusStatistics
var engine string
if store.IsTiFlash() {
statistics = s.engineStatistics[core.EngineTiFlash]
if statistics == nil {
s.engineStatistics[core.EngineTiFlash] = &storeStatusStatistics{opt: s.opt}
statistics = s.engineStatistics[core.EngineTiFlash]
}
engine = core.EngineTiFlash
} else {
// tikv statistics has been initialized in newStoreStatistics.
statistics = s.engineStatistics[core.EngineTiKV]
engine = core.EngineTiKV
}
storeStatusStats := s.observeStoreStatus(store)
for statusType, value := range storeStatusStats {
clusterStatusGauge.WithLabelValues(statusType, engine, id).Set(value)
}
statistics.observe(store)
// skip tombstone store avoid to overwrite metrics
if store.GetNodeState() == metapb.NodeState_Removed {
return
}

// Store stats.
s.StorageSize += store.StorageSize()
s.StorageCapacity += store.GetCapacity()
s.RegionCount += store.GetRegionCount()
s.LeaderCount += store.GetLeaderCount()
s.WitnessCount += store.GetWitnessCount()
s.LearnerCount += store.GetLearnerCount()
clusterStatusGauge.WithLabelValues(ClusterStatusStorageSize, engine, id).Set(float64(store.StorageSize()))
clusterStatusGauge.WithLabelValues(ClusterStatusStorageCapacity, engine, id).Set(float64(store.GetCapacity()))
clusterStatusGauge.WithLabelValues(ClusterStatusRegionCount, engine, id).Set(float64(store.GetRegionCount()))
clusterStatusGauge.WithLabelValues(ClusterStatusLeaderCount, engine, id).Set(float64(store.GetLeaderCount()))
clusterStatusGauge.WithLabelValues(ClusterStatusWitnessCount, engine, id).Set(float64(store.GetWitnessCount()))
clusterStatusGauge.WithLabelValues(ClusterStatusLearnerCount, engine, id).Set(float64(store.GetLearnerCount()))
limit, ok := store.GetStoreLimit().(*storelimit.SlidingWindows)
if ok {
cap := limit.GetCap()
Expand Down Expand Up @@ -247,46 +218,6 @@ func ObserveHotStat(store *core.StoreInfo, stats *StoresStats) {
func (s *storeStatistics) collect() {
placementStatusGauge.Reset()

// tikv store status metrics.
tikvStatistics, ok := s.engineStatistics[core.EngineTiKV]
if ok {
tikvUpCounter.Set(float64(tikvStatistics.Up))
tikvDiconnectedCounter.Set(float64(tikvStatistics.Disconnect))
tikvDownCounter.Set(float64(tikvStatistics.Down))
tikvUnhealthCounter.Set(float64(tikvStatistics.Unhealthy))
tikvOfflineCounter.Set(float64(tikvStatistics.Offline))
tikvTombstoneCounter.Set(float64(tikvStatistics.Tombstone))
tikvLowSpaceCounter.Set(float64(tikvStatistics.LowSpace))
tikvPreparingCounter.Set(float64(tikvStatistics.Preparing))
tikvServingCounter.Set(float64(tikvStatistics.Serving))
tikvRemovingCounter.Set(float64(tikvStatistics.Removing))
tikvRemovedCounter.Set(float64(tikvStatistics.Removed))
}

// tiflash store status metrics.
tiflashStatistics, ok := s.engineStatistics[core.EngineTiFlash]
if ok {
tiflashUpCounter.Set(float64(tiflashStatistics.Up))
tiflashDiconnectedCounter.Set(float64(tiflashStatistics.Disconnect))
tiflashDownCounter.Set(float64(tiflashStatistics.Down))
tiflashUnhealthCounter.Set(float64(tiflashStatistics.Unhealthy))
tiflashOfflineCounter.Set(float64(tiflashStatistics.Offline))
tiflashTombstoneCounter.Set(float64(tiflashStatistics.Tombstone))
tiflashLowSpaceCounter.Set(float64(tiflashStatistics.LowSpace))
tiflashPreparingCounter.Set(float64(tiflashStatistics.Preparing))
tiflashServingCounter.Set(float64(tiflashStatistics.Serving))
tiflashRemovingCounter.Set(float64(tiflashStatistics.Removing))
tiflashRemovedCounter.Set(float64(tiflashStatistics.Removed))
}

// Store status metrics.
storeRegionCountGauge.Set(float64(s.RegionCount))
storeLeaderCountGauge.Set(float64(s.LeaderCount))
storeWitnessCountGauge.Set(float64(s.WitnessCount))
storeLearnerCountGauge.Set(float64(s.LearnerCount))
storeStorageSizeGauge.Set(float64(s.StorageSize))
storeStorageCapacityGauge.Set(float64(s.StorageCapacity))

// Current scheduling configurations of the cluster
configs := make(map[string]float64)
configs["leader-schedule-limit"] = float64(s.opt.GetLeaderScheduleLimit())
Expand Down Expand Up @@ -374,6 +305,7 @@ func ResetStoreStatistics(storeAddress string, id string) {
for _, m := range metrics {
storeStatusGauge.DeleteLabelValues(storeAddress, id, m)
}
clusterStatusGauge.DeletePartialMatch(prometheus.Labels{"store": id})
}

type storeStatisticsMap struct {
Expand Down Expand Up @@ -403,44 +335,8 @@ func (m *storeStatisticsMap) Collect() {
func Reset() {
storeStatusGauge.Reset()
placementStatusGauge.Reset()
ResetClusterStatusMetrics()
clusterStatusGauge.Reset()
ResetRegionStatsMetrics()
ResetLabelStatsMetrics()
ResetHotCacheStatusMetrics()
}

// ResetClusterStatusMetrics resets the cluster status metrics.
func ResetClusterStatusMetrics() {
tikvUpCounter.Set(0)
tikvDiconnectedCounter.Set(0)
tikvDownCounter.Set(0)
tikvUnhealthCounter.Set(0)
tikvOfflineCounter.Set(0)
tikvTombstoneCounter.Set(0)
tikvLowSpaceCounter.Set(0)
tikvPreparingCounter.Set(0)
tikvServingCounter.Set(0)
tikvRemovingCounter.Set(0)
tikvRemovedCounter.Set(0)

// tiflash status counters.
tiflashUpCounter.Set(0)
tiflashDiconnectedCounter.Set(0)
tiflashDownCounter.Set(0)
tiflashUnhealthCounter.Set(0)
tiflashOfflineCounter.Set(0)
tiflashTombstoneCounter.Set(0)
tiflashLowSpaceCounter.Set(0)
tiflashPreparingCounter.Set(0)
tiflashServingCounter.Set(0)
tiflashRemovingCounter.Set(0)
tiflashRemovedCounter.Set(0)

// Store status metrics.
storeRegionCountGauge.Set(0)
storeLeaderCountGauge.Set(0)
storeWitnessCountGauge.Set(0)
storeLearnerCountGauge.Set(0)
storeStorageSizeGauge.Set(0)
storeStorageCapacityGauge.Set(0)
}
16 changes: 0 additions & 16 deletions pkg/statistics/store_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,7 @@ func TestStoreStatistics(t *testing.T) {
ObserveHotStat(store, storesStats)
}
stats := storeStats.stats
tikvStats := stats.engineStatistics[core.EngineTiKV]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a new test for it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhy1024
Since we don't have aggregation logic anymore, then the test was removed for that part.
So, the only thing that might be tested in a new logic is if actual metric is emitted. I looked through existing tests and didn't dint find any tests validating actual metric emission. Thus, no new tests.


re.Equal(6, tikvStats.Up)
re.Equal(7, tikvStats.Preparing)
re.Equal(0, tikvStats.Serving)
re.Equal(1, tikvStats.Removing)
re.Equal(1, tikvStats.Removed)
re.Equal(1, tikvStats.Down)
re.Equal(1, tikvStats.Offline)
re.Equal(0, stats.RegionCount)
re.Equal(0, stats.WitnessCount)
re.Equal(0, tikvStats.Unhealthy)
re.Equal(0, tikvStats.Disconnect)
re.Equal(1, tikvStats.Tombstone)
re.Equal(1, tikvStats.LowSpace)
re.Equal(1, stats.engineStatistics[core.EngineTiFlash].Up)
re.Len(stats.LabelCounter["zone:z1"], 2)
re.Equal([]uint64{1, 2}, stats.LabelCounter["zone:z1"])
re.Len(stats.LabelCounter["zone:z2"], 2)
Expand All @@ -100,7 +85,6 @@ func TestStoreStatistics(t *testing.T) {
re.Equal([]uint64{1, 3, 5, 7}, stats.LabelCounter["host:h1"])
re.Len(stats.LabelCounter["host:h2"], 4)
re.Len(stats.LabelCounter["zone:unknown"], 2)
re.Equal(0, stats.LeaderCount)
}

func TestSummaryStoreInfos(t *testing.T) {
Expand Down