Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 1901b56

Browse files
Allow custom view.Meters to export metrics for other Resources (#1212)
* Remove call to time.Now() on worker thread when handling record reqs (#1210) Time is already recorded on the client side and stored in the currently unused recordReq.t field. Avoiding these repeated calls to time.Now while the worker is blocked can significantly reduce worker contention. * Update Meter to track and report Resource for metric data. Co-authored-by: Ian Milligan <[email protected]>
1 parent 785d899 commit 1901b56

File tree

4 files changed

+94
-37
lines changed

4 files changed

+94
-37
lines changed

stats/view/view_to_metric.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package view
1818
import (
1919
"time"
2020

21+
"go.opencensus.io/resource"
22+
2123
"go.opencensus.io/metric/metricdata"
2224
"go.opencensus.io/stats"
2325
)
@@ -125,7 +127,7 @@ func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Ti
125127
}
126128
}
127129

128-
func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric {
130+
func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTime time.Time) *metricdata.Metric {
129131
if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
130132
v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
131133
startTime = time.Time{}
@@ -144,6 +146,7 @@ func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricda
144146
m := &metricdata.Metric{
145147
Descriptor: *v.metricDescriptor,
146148
TimeSeries: ts,
149+
Resource: r,
147150
}
148151
return m
149152
}

stats/view/view_to_metric_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func Test_ViewToMetric(t *testing.T) {
447447
tc.vi.addSample(tag.FromContext(ctx), v, nil, now)
448448
}
449449

450-
gotMetric := viewToMetric(tc.vi, now, startTime)
450+
gotMetric := viewToMetric(tc.vi, nil, now, startTime)
451451
if !cmp.Equal(gotMetric, tc.wantMetric) {
452452
// JSON format is strictly for checking the content when test fails. Do not use JSON
453453
// format to determine if the two values are same as it doesn't differentiate between
@@ -509,7 +509,7 @@ func TestUnitConversionForAggCount(t *testing.T) {
509509

510510
for _, tc := range tests {
511511
tc.vi.addSample(tag.FromContext(context.Background()), 5.0, nil, now)
512-
gotMetric := viewToMetric(tc.vi, now, startTime)
512+
gotMetric := viewToMetric(tc.vi, nil, now, startTime)
513513
gotUnit := gotMetric.Descriptor.Unit
514514
if !cmp.Equal(gotUnit, tc.wantUnit) {
515515
t.Errorf("Verify Unit: %s: Got:%v Want:%v", tc.name, gotUnit, tc.wantUnit)

stats/view/worker.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"sync"
2121
"time"
2222

23+
"go.opencensus.io/resource"
24+
2325
"go.opencensus.io/metric/metricdata"
2426
"go.opencensus.io/metric/metricproducer"
2527
"go.opencensus.io/stats"
@@ -47,6 +49,7 @@ type worker struct {
4749
c chan command
4850
quit, done chan bool
4951
mu sync.RWMutex
52+
r *resource.Resource
5053

5154
exportersMu sync.RWMutex
5255
exporters map[Exporter]struct{}
@@ -91,6 +94,10 @@ type Meter interface {
9194
RegisterExporter(Exporter)
9295
// UnregisterExporter unregisters an exporter.
9396
UnregisterExporter(Exporter)
97+
// SetResource may be used to set the Resource associated with this registry.
98+
// This is intended to be used in cases where a single process exports metrics
99+
// for multiple Resources, typically in a multi-tenant situation.
100+
SetResource(*resource.Resource)
94101

95102
// Start causes the Meter to start processing Record calls and aggregating
96103
// statistics as well as exporting data.
@@ -249,6 +256,14 @@ func NewMeter() Meter {
249256
}
250257
}
251258

259+
// SetResource associates all data collected by this Meter with the specified
260+
// resource. This resource is reported when using metricexport.ReadAndExport;
261+
// it is not provided when used with ExportView/RegisterExporter, because that
262+
// interface does not provide a means for reporting the Resource.
263+
func (w *worker) SetResource(r *resource.Resource) {
264+
w.r = r
265+
}
266+
252267
func (w *worker) Start() {
253268
go w.start()
254269
}
@@ -371,7 +386,7 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
371386
startTime = w.startTimes[v]
372387
}
373388

374-
return viewToMetric(v, now, startTime)
389+
return viewToMetric(v, w.r, now, startTime)
375390
}
376391

377392
// Read reads all view data and returns them as metrics.

stats/view/worker_test.go

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ package view
1818
import (
1919
"context"
2020
"errors"
21+
"sort"
2122
"sync"
2223
"testing"
2324
"time"
2425

26+
"go.opencensus.io/resource"
27+
2528
"go.opencensus.io/metric/metricdata"
2629
"go.opencensus.io/metric/metricexport"
2730
"go.opencensus.io/stats"
@@ -123,8 +126,13 @@ func Test_Worker_MultiExport(t *testing.T) {
123126

124127
// This test reports the same data for the default worker and a secondary
125128
// worker, and ensures that the stats are kept independently.
129+
extraResource := resource.Resource{
130+
Type: "additional",
131+
Labels: map[string]string{"key1": "value1", "key2": "value2"},
132+
}
126133
worker2 := NewMeter().(*worker)
127134
worker2.Start()
135+
worker2.SetResource(&extraResource)
128136

129137
m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit")
130138
key := tag.MustNewKey(("key"))
@@ -162,50 +170,62 @@ func Test_Worker_MultiExport(t *testing.T) {
162170
}
163171
}
164172

165-
wantRows := []struct {
166-
w Meter
167-
view string
168-
rows []*Row
169-
}{{
170-
view: count.Name,
171-
rows: []*Row{
173+
makeKey := func(r *resource.Resource, view string) string {
174+
if r == nil {
175+
r = &resource.Resource{}
176+
}
177+
return resource.EncodeLabels(r.Labels) + "/" + view
178+
}
179+
180+
// Format is Resource.Labels encoded as string, then
181+
wantPartialData := map[string][]*Row{
182+
makeKey(nil, count.Name): []*Row{
172183
{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
173184
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
174185
},
175-
}, {
176-
view: sum.Name,
177-
rows: []*Row{
178-
{nil, &SumData{Value: 7.5}}},
179-
}, {
180-
w: worker2,
181-
view: count.Name,
182-
rows: []*Row{
186+
makeKey(nil, sum.Name): []*Row{
187+
{nil, &SumData{Value: 7.5}},
188+
},
189+
makeKey(&extraResource, count.Name): []*Row{
183190
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
184191
},
185-
}}
192+
}
186193

187-
for _, wantRow := range wantRows {
188-
retrieve := RetrieveData
189-
if wantRow.w != nil {
190-
retrieve = wantRow.w.(*worker).RetrieveData
191-
}
192-
gotRows, err := retrieve(wantRow.view)
193-
if err != nil {
194-
t.Fatalf("RetrieveData(%v), got error %v", wantRow.view, err)
194+
te := &testExporter{}
195+
metricexport.NewReader().ReadAndExport(te)
196+
for _, m := range te.metrics {
197+
key := makeKey(m.Resource, m.Descriptor.Name)
198+
want, ok := wantPartialData[key]
199+
if !ok {
200+
t.Errorf("Unexpected data for %q: %v", key, m)
201+
continue
195202
}
196-
for _, got := range gotRows {
197-
if !containsRow(wantRow.rows, got) {
198-
t.Errorf("%s: got row %#v; want none", wantRow.view, got)
199-
break
203+
gotTs := m.TimeSeries
204+
sort.Sort(byLabel(gotTs))
205+
206+
for i, ts := range gotTs {
207+
for j, label := range ts.LabelValues {
208+
if want[i].Tags[j].Value != label.Value {
209+
t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key)
210+
}
200211
}
201-
}
202-
for _, want := range wantRow.rows {
203-
if !containsRow(gotRows, want) {
204-
t.Errorf("%s: got none, want %#v", wantRow.view, want)
205-
break
212+
switch wantValue := want[i].Data.(type) {
213+
case *CountData:
214+
got := ts.Points[0].Value.(int64)
215+
if wantValue.Value != got {
216+
t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue, got, ts, key)
217+
}
218+
case *SumData:
219+
got := ts.Points[0].Value.(float64)
220+
if wantValue.Value != got {
221+
t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue, got, ts, key)
222+
}
223+
default:
224+
t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key)
206225
}
207226
}
208227
}
228+
209229
// Verify that worker has not been computing sum:
210230
got, err := worker2.RetrieveData(sum.Name)
211231
if err == nil {
@@ -577,9 +597,11 @@ func TestWorkerRace(t *testing.T) {
577597
}
578598

579599
type testExporter struct {
600+
metrics []*metricdata.Metric
580601
}
581602

582603
func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
604+
te.metrics = metrics
583605
return nil
584606
}
585607

@@ -619,3 +641,20 @@ func restart() {
619641
defaultWorker = NewMeter().(*worker)
620642
go defaultWorker.start()
621643
}
644+
645+
// byTag implements sort.Interface for *metricdata.TimeSeries by Labels.
646+
type byLabel []*metricdata.TimeSeries
647+
648+
func (ts byLabel) Len() int { return len(ts) }
649+
func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
650+
func (ts byLabel) Less(i, j int) bool {
651+
if len(ts[i].LabelValues) != len(ts[j].LabelValues) {
652+
return len(ts[i].LabelValues) < len(ts[j].LabelValues)
653+
}
654+
for k := range ts[i].LabelValues {
655+
if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value {
656+
return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value
657+
}
658+
}
659+
return false
660+
}

0 commit comments

Comments
 (0)