Skip to content

Commit 9633a0b

Browse files
authored
🌱 add grpc subscribers metric. (#148)
* add grpc subscribers metric. Signed-off-by: morvencao <[email protected]> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED * remove origin source label for source side metrics. Signed-off-by: morvencao <[email protected]> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent 023fcb3 commit 9633a0b

File tree

6 files changed

+101
-59
lines changed

6 files changed

+101
-59
lines changed

pkg/cloudevents/generic/metrics_collector.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,11 @@ var cloudeventsReceivedByClientMetricsLabels = []string{
4747

4848
// cloudeventsSentFromSourceMetricsLabels - Array of labels added to cloudevents sent from source metrics:
4949
var cloudeventsSentFromSourceMetricsLabels = []string{
50-
metricsSourceLabel, // source
51-
metricsOriginalSourceLabel, // original source, if no, set to "none"
52-
metricsConsumerLabel, // consumer
53-
metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles
54-
metricsSubResourceLabel, // subresource, eg, spec or status
55-
metricsActionLabel, // action, eg, create, update, delete, resync_request, resync_response
50+
metricsSourceLabel, // source
51+
metricsConsumerLabel, // consumer
52+
metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles
53+
metricsSubResourceLabel, // subresource, eg, spec or status
54+
metricsActionLabel, // action, eg, create, update, delete, resync_request, resync_response
5655
}
5756

5857
// cloudeventsSentFromClientMetricsLabels - Array of labels added to cloudevents sent from client metrics:
@@ -125,7 +124,7 @@ var cloudeventsReceivedByClientCounterMetric = prometheus.NewCounterVec(
125124
// The cloudevents sent from source counter metric is a counter with a base metric name of 'sent_from_source_total'
126125
// and a help string of 'The total number of CloudEvents sent from source.'
127126
// For example, 1 cloudevent sent from source1 to consumer1 with data type manifestbundles for resource spec create would result in the following metrics:
128-
// cloudevents_sent_total{source="source1",original_source="none",consumer="consumer1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",subresource="spec",action="create"} 1
127+
// cloudevents_sent_total{source="source1",consumer="consumer1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",subresource="spec",action="create"} 1
129128
var cloudeventsSentFromSourceCounterMetric = prometheus.NewCounterVec(
130129
prometheus.CounterOpts{
131130
Subsystem: cloudeventsMetricsSubsystem,
@@ -294,17 +293,13 @@ func increaseCloudEventsReceivedByAgentCounter(source, dataType, subresource, ac
294293
}
295294

296295
// increaseCloudEventsSentFromSourceCounter increases the cloudevents sent from source counter metric:
297-
func increaseCloudEventsSentFromSourceCounter(source, originalSource, consumer, dataType, subresource, action string) {
298-
if originalSource == "" {
299-
originalSource = noneOriginalSource
300-
}
296+
func increaseCloudEventsSentFromSourceCounter(source, consumer, dataType, subresource, action string) {
301297
labels := prometheus.Labels{
302-
metricsSourceLabel: source,
303-
metricsOriginalSourceLabel: originalSource,
304-
metricsConsumerLabel: consumer,
305-
metricsDataTypeLabel: dataType,
306-
metricsSubResourceLabel: subresource,
307-
metricsActionLabel: action,
298+
metricsSourceLabel: source,
299+
metricsConsumerLabel: consumer,
300+
metricsDataTypeLabel: dataType,
301+
metricsSubResourceLabel: subresource,
302+
metricsActionLabel: action,
308303
}
309304
cloudeventsSentFromSourceCounterMetric.With(labels).Inc()
310305
}

pkg/cloudevents/generic/metrics_collector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestCloudEventsMetrics(t *testing.T) {
100100
time.Sleep(time.Second)
101101

102102
// ensure metrics are updated
103-
sentTotal := cloudeventsSentFromSourceCounterMetric.WithLabelValues(c.sourceID, noneOriginalSource, c.clusterName, c.dataType.String(), string(c.subresource), string(c.action))
103+
sentTotal := cloudeventsSentFromSourceCounterMetric.WithLabelValues(c.sourceID, c.clusterName, c.dataType.String(), string(c.subresource), string(c.action))
104104
require.Equal(t, len(c.resources), int(toFloat64Counter(sentTotal)))
105105
receivedTotal := cloudeventsReceivedByClientCounterMetric.WithLabelValues(c.sourceID, c.dataType.String(), string(c.subresource), string(c.action))
106106
require.Equal(t, len(c.resources), int(toFloat64Counter(receivedTotal)))
@@ -239,7 +239,7 @@ func TestResyncSpecMetrics(t *testing.T) {
239239
require.Greater(t, sum, 0.0)
240240
require.Less(t, sum, 1.0)
241241

242-
sentTotal := cloudeventsSentFromSourceCounterMetric.WithLabelValues(c.sourceID, noneOriginalSource, c.clusterName, c.dataType.String(), string(types.SubResourceSpec), string(types.ResyncResponseAction))
242+
sentTotal := cloudeventsSentFromSourceCounterMetric.WithLabelValues(c.sourceID, c.clusterName, c.dataType.String(), string(types.SubResourceSpec), string(types.ResyncResponseAction))
243243
require.Equal(t, len(c.resources), int(toFloat64Counter(sentTotal)))
244244
}
245245

pkg/cloudevents/generic/sourceclient.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName stri
105105
return err
106106
}
107107

108-
increaseCloudEventsSentFromSourceCounter(evt.Source(), "", clusterName, c.codec.EventDataType().String(), string(eventType.SubResource), string(eventType.Action))
108+
increaseCloudEventsSentFromSourceCounter(evt.Source(), clusterName, c.codec.EventDataType().String(), string(eventType.SubResource), string(eventType.Action))
109109

110110
return nil
111111
}
@@ -130,7 +130,7 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
130130
}
131131

132132
clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string)
133-
increaseCloudEventsSentFromSourceCounter(evt.Source(), "", clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action))
133+
increaseCloudEventsSentFromSourceCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action))
134134

135135
return nil
136136
}
@@ -290,7 +290,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
290290
if err := c.publish(ctx, evt); err != nil {
291291
return err
292292
}
293-
increaseCloudEventsSentFromSourceCounter(evt.Source(), "", fmt.Sprintf("%s", clusterName), evtDataType.String(), string(eventType.SubResource), string(eventType.Action))
293+
increaseCloudEventsSentFromSourceCounter(evt.Source(), fmt.Sprintf("%s", clusterName), evtDataType.String(), string(eventType.SubResource), string(eventType.Action))
294294
}
295295

296296
return nil

pkg/cloudevents/server/grpc/broker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat"
87
"sync"
98
"time"
109

10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat"
11+
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/metrics"
12+
1113
"k8s.io/apimachinery/pkg/api/errors"
1214

1315
cloudevents "github.com/cloudevents/sdk-go/v2"
@@ -134,6 +136,7 @@ func (bkr *GRPCBroker) register(
134136
}
135137

136138
klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)
139+
metrics.IncGRPCCESubscribersMetric(clusterName, dataType.String())
137140

138141
return id, errChan
139142
}
@@ -147,6 +150,7 @@ func (bkr *GRPCBroker) unregister(id string) {
147150
if sub, exists := bkr.subscribers[id]; exists {
148151
close(sub.errChan)
149152
delete(bkr.subscribers, id)
153+
metrics.DecGRPCCESubscribersMetric(sub.clusterName, sub.dataType.String())
150154
}
151155
}
152156

pkg/cloudevents/server/grpc/metrics/metrics.go

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"google.golang.org/grpc/status"
1010

1111
"github.com/cloudevents/sdk-go/v2/binding"
12+
cetypes "github.com/cloudevents/sdk-go/v2/types"
1213
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
1314
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
1415
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -21,37 +22,51 @@ const grpcCEMetricsSubsystem = "grpc_server_ce"
2122

2223
// Names of the labels added to metrics:
2324
const (
25+
grpcCEMetricsClusterLabel = "consumer"
2426
grpcCEMetricsDataTypeLabel = "data_type"
2527
grpcCEMetricsMethodLabel = "method"
2628
grpcMetricsCodeLabel = "grpc_code"
2729
)
2830

29-
// grpcCEMetricsLabels - Array of labels added to grpc server metrics for cloudevents:
30-
var grpcCEMetricsLabels = []string{
31+
// grpcCEMetricsCommonLabels - Array of common labels added to grpc server metrics for cloudevents:
32+
var grpcCEMetricsCommonLabels = []string{
33+
grpcCEMetricsClusterLabel,
3134
grpcCEMetricsDataTypeLabel,
32-
grpcCEMetricsMethodLabel,
3335
}
3436

37+
// grpcCEMetricsHandlerLabels - Array of handler labels added to grpc server metrics for cloudevents:
38+
var grpcCEMetricsHandlerLabels = append(grpcCEMetricsCommonLabels, grpcCEMetricsMethodLabel)
39+
3540
// grpcCEMetricsAllLabels - Array of all labels added to grpc server metrics for cloudevents:
36-
var grpcCEMetricsAllLabels = append(grpcCEMetricsLabels, grpcMetricsCodeLabel)
41+
var grpcCEMetricsAllLabels = append(grpcCEMetricsHandlerLabels, grpcMetricsCodeLabel)
3742

3843
// Names of the grpc server metrics for cloudevents:
3944
const (
45+
subscribersMetric = "subscribers"
4046
calledCountMetric = "called_total"
4147
processedCountMetric = "processed_total"
4248
processingDurationMetric = "processing_duration_seconds"
4349
messageReceivedCountMetric = "msg_received_total"
4450
messageSentCountMetric = "msg_sent_total"
4551
)
4652

53+
// grpcCESubscribersMetric is a gauge metric that tracks the number of registered
54+
// subscribers for cloudevents on the gRPC server.
55+
var grpcCESubscribersMetric = k8smetrics.NewGaugeVec(&k8smetrics.GaugeOpts{
56+
Subsystem: grpcCEMetricsSubsystem,
57+
Name: subscribersMetric,
58+
StabilityLevel: k8smetrics.ALPHA,
59+
Help: "Number of registered subscribers for cloudevents on the grpc server.",
60+
}, grpcCEMetricsCommonLabels)
61+
4762
// grpcCECalledCountMetric is a counter metric that tracks the total number of
4863
// RPC requests for cloudevents called on the gRPC server.
4964
var grpcCECalledCountMetric = k8smetrics.NewCounterVec(&k8smetrics.CounterOpts{
5065
Subsystem: grpcCEMetricsSubsystem,
5166
Name: calledCountMetric,
5267
StabilityLevel: k8smetrics.ALPHA,
5368
Help: "Total number of RPC requests for cloudevents called on the grpc server.",
54-
}, grpcCEMetricsLabels)
69+
}, grpcCEMetricsHandlerLabels)
5570

5671
// grpcCEMessageReceivedCountMetric is a counter metric that tracks the total number of
5772
// messages for cloudevents received on the gRPC server.
@@ -60,7 +75,7 @@ var grpcCEMessageReceivedCountMetric = k8smetrics.NewCounterVec(&k8smetrics.Coun
6075
Name: messageReceivedCountMetric,
6176
StabilityLevel: k8smetrics.ALPHA,
6277
Help: "Total number of messages for cloudevents received on the gRPC server.",
63-
}, grpcCEMetricsLabels)
78+
}, grpcCEMetricsHandlerLabels)
6479

6580
// grpcCEMessageSentCountMetric is a counter metric that tracks the total number of
6681
// messages for cloudevents sent by the gRPC server.
@@ -69,7 +84,7 @@ var grpcCEMessageSentCountMetric = k8smetrics.NewCounterVec(&k8smetrics.CounterO
6984
Name: messageSentCountMetric,
7085
StabilityLevel: k8smetrics.ALPHA,
7186
Help: "Total number of messages for cloudevents sent by the gRPC server.",
72-
}, grpcCEMetricsLabels)
87+
}, grpcCEMetricsHandlerLabels)
7388

7489
// grpcCEProcessedCountMetric is a counter metric that tracks the total number of
7590
// RPC requests for cloudevents processed on the server, regardless of success or failure.
@@ -106,61 +121,72 @@ func NewCloudEventsMetricsUnaryInterceptor() grpc.UnaryServerInterceptor {
106121
}
107122

108123
// initialize defaults for error cases
124+
cluster := "unknown"
109125
dataType := "unknown"
110126

111127
pubReq, ok := req.(*pbv1.PublishRequest)
112128
if !ok {
113129
err := fmt.Errorf("invalid request type for Publish method")
114-
recordCloudEventsMetrics(dataType, method, err, startTime)
130+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
115131
return nil, err
116132
}
117133
// convert the request to cloudevent and extract the source
118134
evt, err := binding.ToEvent(ctx, protocol.NewMessage(pubReq.Event))
119135
if err != nil {
120136
err = fmt.Errorf("failed to convert to cloudevent: %v", err)
121-
recordCloudEventsMetrics(dataType, method, err, startTime)
137+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
138+
return nil, err
139+
}
140+
141+
// extract the cluster name from event extensions
142+
clusterVal, err := cetypes.ToString(evt.Context.GetExtensions()[types.ExtensionClusterName])
143+
if err != nil {
144+
err = fmt.Errorf("failed to get clustername extension: %v", err)
145+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
122146
return nil, err
123147
}
148+
cluster = clusterVal
124149

125150
// extract the data type from event type
126151
eventType, err := types.ParseCloudEventsType(evt.Type())
127152
if err != nil {
128153
err = fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
129-
recordCloudEventsMetrics(dataType, method, err, startTime)
154+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
130155
return nil, err
131156
}
132157
dataType = eventType.CloudEventsDataType.String()
133158

134-
grpcCECalledCountMetric.WithLabelValues(dataType, method).Inc()
135-
grpcCEMessageReceivedCountMetric.WithLabelValues(dataType, method).Inc()
159+
grpcCECalledCountMetric.WithLabelValues(cluster, dataType, method).Inc()
160+
grpcCEMessageReceivedCountMetric.WithLabelValues(cluster, dataType, method).Inc()
136161
// call rpc handler to handle RPC request
137162
resp, err := handler(ctx, req)
138163
duration := time.Since(startTime).Seconds()
139-
grpcCEMessageSentCountMetric.WithLabelValues(dataType, method).Inc()
164+
grpcCEMessageSentCountMetric.WithLabelValues(cluster, dataType, method).Inc()
140165

141166
// get status code from error
142167
status := statusFromError(err)
143168
code := status.Code()
144-
grpcCEProcessedCountMetric.WithLabelValues(dataType, method, code.String()).Inc()
145-
grpcCEProcessingDurationMetric.WithLabelValues(dataType, method, code.String()).Observe(duration)
169+
grpcCEProcessedCountMetric.WithLabelValues(cluster, dataType, method, code.String()).Inc()
170+
grpcCEProcessingDurationMetric.WithLabelValues(cluster, dataType, method, code.String()).Observe(duration)
146171

147172
return resp, err
148173
}
149174
}
150175

151-
func recordCloudEventsMetrics(dataType, method string, err error, startTime time.Time) {
176+
func recordCloudEventsMetrics(cluster, dataType, method string, err error, startTime time.Time) {
152177
duration := time.Since(startTime).Seconds()
153178
status := statusFromError(err)
154179
code := status.Code()
155-
grpcCEProcessedCountMetric.WithLabelValues(dataType, method, code.String()).Inc()
156-
grpcCEProcessingDurationMetric.WithLabelValues(dataType, method, code.String()).Observe(duration)
180+
grpcCEProcessedCountMetric.WithLabelValues(cluster, dataType, method, code.String()).Inc()
181+
grpcCEProcessingDurationMetric.WithLabelValues(cluster, dataType, method, code.String()).Observe(duration)
157182
}
158183

159184
// wrappedCloudEventsMetricsStream wraps a grpc.ServerStream, capturing the request source
160185
// emitting metrics for the stream interceptor.
161186
type wrappedCloudEventsMetricsStream struct {
162-
dataType *string
163-
method string
187+
clusterName *string
188+
dataType *string
189+
method string
164190
grpc.ServerStream
165191
ctx context.Context
166192
}
@@ -178,10 +204,11 @@ func (w *wrappedCloudEventsMetricsStream) RecvMsg(m interface{}) error {
178204
return fmt.Errorf("invalid request type for Subscribe method")
179205
}
180206

181-
if w.dataType != nil {
207+
if w.clusterName != nil && w.dataType != nil {
208+
*w.clusterName = subReq.ClusterName
182209
*w.dataType = subReq.DataType
183-
grpcCECalledCountMetric.WithLabelValues(*w.dataType, w.method).Inc()
184-
grpcCEMessageReceivedCountMetric.WithLabelValues(*w.dataType, w.method).Inc()
210+
grpcCECalledCountMetric.WithLabelValues(*w.clusterName, *w.dataType, w.method).Inc()
211+
grpcCEMessageReceivedCountMetric.WithLabelValues(*w.clusterName, *w.dataType, w.method).Inc()
185212
}
186213

187214
return nil
@@ -194,16 +221,16 @@ func (w *wrappedCloudEventsMetricsStream) SendMsg(m interface{}) error {
194221
return err
195222
}
196223

197-
if w.dataType != nil && *w.dataType != "" {
198-
grpcCEMessageSentCountMetric.WithLabelValues(*w.dataType, w.method).Inc()
224+
if w.clusterName != nil && w.dataType != nil && *w.clusterName != "" && *w.dataType != "" {
225+
grpcCEMessageSentCountMetric.WithLabelValues(*w.clusterName, *w.dataType, w.method).Inc()
199226
}
200227

201228
return nil
202229
}
203230

204-
// newWrappedCloudEventsMetricsStream creates a wrappedCloudEventsMetricsStream with the specified type reference.
205-
func newWrappedCloudEventsMetricsStream(dataType *string, method string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
206-
return &wrappedCloudEventsMetricsStream{dataType, method, ss, ctx}
231+
// newWrappedCloudEventsMetricsStream creates a wrappedCloudEventsMetricsStream with the specified type and cluster reference.
232+
func newWrappedCloudEventsMetricsStream(clusterName, dataType *string, method string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
233+
return &wrappedCloudEventsMetricsStream{clusterName, dataType, method, ss, ctx}
207234
}
208235

209236
// NewCloudEventsMetricsStreamInterceptor creates a stream server interceptor for server metrics.
@@ -217,15 +244,16 @@ func NewCloudEventsMetricsStreamInterceptor() grpc.StreamServerInterceptor {
217244
}
218245

219246
dataType := ""
247+
cluster := ""
220248
// create a wrapped stream to capture the source and emit metrics
221-
wrappedCEMetricsStream := newWrappedCloudEventsMetricsStream(&dataType, method, stream.Context(), stream)
249+
wrappedCEMetricsStream := newWrappedCloudEventsMetricsStream(&cluster, &dataType, method, stream.Context(), stream)
222250
// call rpc handler to handle RPC request
223251
err := handler(srv, wrappedCEMetricsStream)
224252

225253
// get status code from error
226254
status := statusFromError(err)
227255
code := status.Code()
228-
grpcCEProcessedCountMetric.WithLabelValues(dataType, method, code.String()).Inc()
256+
grpcCEProcessedCountMetric.WithLabelValues(cluster, dataType, method, code.String()).Inc()
229257

230258
return err
231259
}
@@ -264,10 +292,21 @@ func SplitMethod(fullMethod string) (service, method string) {
264292
// Register all the grpc server metrics for cloudevents.
265293
func CloudEventsGRPCMetrics() []k8smetrics.Registerable {
266294
return []k8smetrics.Registerable{
295+
grpcCESubscribersMetric,
267296
grpcCECalledCountMetric,
268297
grpcCEProcessedCountMetric,
269298
grpcCEProcessingDurationMetric,
270299
grpcCEMessageReceivedCountMetric,
271300
grpcCEMessageSentCountMetric,
272301
}
273302
}
303+
304+
// IncGRPCCESubscribersMetric increments the grpcCESubscribersMetric by 1 for the given cluster and dataType.
305+
func IncGRPCCESubscribersMetric(cluster, dataType string) {
306+
grpcCESubscribersMetric.WithLabelValues(cluster, dataType).Inc()
307+
}
308+
309+
// DecGRPCCESubscribersMetric decrements the grpcCESubscribersMetric by 1 for the given cluster and dataType.
310+
func DecGRPCCESubscribersMetric(cluster, dataType string) {
311+
grpcCESubscribersMetric.WithLabelValues(cluster, dataType).Dec()
312+
}

0 commit comments

Comments
 (0)