Skip to content

Commit 9821e2b

Browse files
Joshua RutherfordJoshua Rutherford
andauthored
cache: fix per-response caching and switch to pointers for requests and responses (#340)
* cache: fix per response caching and switch to pointers for response Signed-off-by: Joshua Rutherford <[email protected]> * cache: ensure cached discovery response logic is platform agnostic Signed-off-by: Joshua Rutherford <[email protected]> Co-authored-by: Joshua Rutherford <[email protected]>
1 parent 9c7e2b2 commit 9821e2b

23 files changed

+356
-308
lines changed

build/integration.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ echo Envoy log: ${ENVOY_LOG}
3737
ENVOY_PID=$!
3838

3939
function cleanup() {
40-
kill ${ENVOY_PID}
41-
kill ${UPSTREAM_PID}
40+
kill ${ENVOY_PID} ${UPSTREAM_PID}
41+
wait ${ENVOY_PID} ${UPSTREAM_PID} 2> /dev/null || true
4242
}
4343
trap cleanup EXIT
4444

pkg/cache/v2/cache.go

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache
1818
import (
1919
"context"
2020
"fmt"
21+
"sync/atomic"
2122

2223
discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
2324
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
@@ -43,15 +44,15 @@ type ConfigWatcher interface {
4344
//
4445
// Cancel is an optional function to release resources in the producer. If
4546
// provided, the consumer may call this function multiple times.
46-
CreateWatch(Request) (value chan Response, cancel func())
47+
CreateWatch(*Request) (value chan Response, cancel func())
4748
}
4849

4950
// Cache is a generic config cache with a watcher.
5051
type Cache interface {
5152
ConfigWatcher
5253

5354
// Fetch implements the polling method of the config cache using a non-empty request.
54-
Fetch(context.Context, Request) (Response, error)
55+
Fetch(context.Context, *Request) (Response, error)
5556
}
5657

5758
// Response is a wrapper around Envoy's DiscoveryResponse.
@@ -70,7 +71,7 @@ type Response interface {
7071
// be included in the final Discovery Response.
7172
type RawResponse struct {
7273
// Request is the original request.
73-
Request discovery.DiscoveryRequest
74+
Request *discovery.DiscoveryRequest
7475

7576
// Version of the resources as tracked by the cache for the given type.
7677
// Proxy responds with this version as an acknowledgement.
@@ -79,21 +80,16 @@ type RawResponse struct {
7980
// Resources to be included in the response.
8081
Resources []types.Resource
8182

82-
// isResourceMarshaled indicates whether the resources have been marshaled.
83-
// This is internally maintained by go-control-plane to prevent future
84-
// duplication in marshaling efforts.
85-
isResourceMarshaled bool
86-
87-
// marshaledResponse holds the serialized discovery response.
88-
marshaledResponse *discovery.DiscoveryResponse
83+
// marshaledResponse holds an atomic reference to the serialized discovery response.
84+
marshaledResponse atomic.Value
8985
}
9086

9187
var _ Response = &RawResponse{}
9288

9389
// PassthroughResponse is a pre constructed xDS response that need not go through marshalling transformations.
9490
type PassthroughResponse struct {
9591
// Request is the original request.
96-
Request discovery.DiscoveryRequest
92+
Request *discovery.DiscoveryRequest
9793

9894
// The discovery response that needs to be sent as is, without any marshalling transformations.
9995
DiscoveryResponse *discovery.DiscoveryResponse
@@ -104,55 +100,59 @@ var _ Response = &PassthroughResponse{}
104100
// GetDiscoveryResponse performs the marshalling the first time its called and uses the cached response subsequently.
105101
// This is necessary because the marshalled response does not change across the calls.
106102
// This caching behavior is important in high throughput scenarios because grpc marshalling has a cost and it drives the cpu utilization under load.
107-
func (r RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
108-
if r.isResourceMarshaled {
109-
return r.marshaledResponse, nil
110-
}
103+
func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
104+
105+
marshaledResponse := r.marshaledResponse.Load()
106+
107+
if marshaledResponse == nil {
111108

112-
marshaledResources := make([]*any.Any, len(r.Resources))
109+
marshaledResources := make([]*any.Any, len(r.Resources))
113110

114-
for i, resource := range r.Resources {
115-
marshaledResource, err := MarshalResource(resource)
116-
if err != nil {
117-
return nil, err
111+
for i, resource := range r.Resources {
112+
marshaledResource, err := MarshalResource(resource)
113+
if err != nil {
114+
return nil, err
115+
}
116+
marshaledResources[i] = &any.Any{
117+
TypeUrl: r.Request.TypeUrl,
118+
Value: marshaledResource,
119+
}
118120
}
119-
marshaledResources[i] = &any.Any{
120-
TypeUrl: r.Request.TypeUrl,
121-
Value: marshaledResource,
121+
122+
marshaledResponse = &discovery.DiscoveryResponse{
123+
VersionInfo: r.Version,
124+
Resources: marshaledResources,
125+
TypeUrl: r.Request.TypeUrl,
122126
}
123-
}
124127

125-
r.isResourceMarshaled = true
128+
r.marshaledResponse.Store(marshaledResponse)
129+
}
126130

127-
return &discovery.DiscoveryResponse{
128-
VersionInfo: r.Version,
129-
Resources: marshaledResources,
130-
TypeUrl: r.Request.TypeUrl,
131-
}, nil
131+
return marshaledResponse.(*discovery.DiscoveryResponse), nil
132132
}
133133

134134
// GetRequest returns the original Discovery Request.
135-
func (r RawResponse) GetRequest() *discovery.DiscoveryRequest {
136-
return &r.Request
135+
func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest {
136+
return r.Request
137137
}
138138

139139
// GetVersion returns the response version.
140-
func (r RawResponse) GetVersion() (string, error) {
140+
func (r *RawResponse) GetVersion() (string, error) {
141141
return r.Version, nil
142142
}
143143

144144
// GetDiscoveryResponse returns the final passthrough Discovery Response.
145-
func (r PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
145+
func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
146146
return r.DiscoveryResponse, nil
147147
}
148148

149149
// GetRequest returns the original Discovery Request
150-
func (r PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
151-
return &r.Request
150+
func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
151+
return r.Request
152152
}
153153

154154
// GetVersion returns the response version.
155-
func (r PassthroughResponse) GetVersion() (string, error) {
155+
func (r *PassthroughResponse) GetVersion() (string, error) {
156156
if r.DiscoveryResponse != nil {
157157
return r.DiscoveryResponse.VersionInfo, nil
158158
}

pkg/cache/v2/cache_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const (
2020
func TestResponseGetDiscoveryResponse(t *testing.T) {
2121
routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}}
2222
resp := cache.RawResponse{
23-
Request: discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
23+
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
2424
Version: "v",
2525
Resources: routes,
2626
}
@@ -30,6 +30,10 @@ func TestResponseGetDiscoveryResponse(t *testing.T) {
3030
assert.Equal(t, discoveryResponse.VersionInfo, resp.Version)
3131
assert.Equal(t, len(discoveryResponse.Resources), 1)
3232

33+
cachedResponse, err := resp.GetDiscoveryResponse()
34+
assert.Nil(t, err)
35+
assert.Same(t, discoveryResponse, cachedResponse)
36+
3337
r := &route.RouteConfiguration{}
3438
err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r)
3539
assert.Nil(t, err)
@@ -46,7 +50,7 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
4650
VersionInfo: "v",
4751
}
4852
resp := cache.PassthroughResponse{
49-
Request: discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
53+
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
5054
DiscoveryResponse: dr,
5155
}
5256

pkg/cache/v2/linear.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
8484
}
8585
}
8686
}
87-
value <- RawResponse{
88-
Request: Request{TypeUrl: cache.typeURL},
87+
value <- &RawResponse{
88+
Request: &Request{TypeUrl: cache.typeURL},
8989
Resources: resources,
9090
Version: strconv.FormatUint(cache.version, 10),
9191
}
@@ -141,7 +141,7 @@ func (cache *LinearCache) DeleteResource(name string) error {
141141
return nil
142142
}
143143

144-
func (cache *LinearCache) CreateWatch(request Request) (chan Response, func()) {
144+
func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) {
145145
value := make(chan Response, 1)
146146
if request.TypeUrl != cache.typeURL {
147147
close(value)
@@ -207,7 +207,7 @@ func (cache *LinearCache) CreateWatch(request Request) (chan Response, func()) {
207207
}
208208
}
209209

210-
func (cache *LinearCache) Fetch(ctx context.Context, request Request) (Response, error) {
210+
func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
211211
return nil, errors.New("not implemented")
212212
}
213213

pkg/cache/v2/linear_test.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ func mustBlock(t *testing.T, w <-chan Response) {
7171

7272
func TestLinearInitialResources(t *testing.T) {
7373
c := NewLinearCache(testType, map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})
74-
w, _ := c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType})
74+
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType})
7575
verifyResponse(t, w, "0", 1)
76-
w, _ = c.CreateWatch(Request{TypeUrl: testType})
76+
w, _ = c.CreateWatch(&Request{TypeUrl: testType})
7777
verifyResponse(t, w, "0", 2)
7878
}
7979

@@ -84,7 +84,7 @@ func TestLinearCornerCases(t *testing.T) {
8484
t.Error("expected error on nil resource")
8585
}
8686
// create an incorrect type URL request
87-
w, _ := c.CreateWatch(Request{TypeUrl: "test"})
87+
w, _ := c.CreateWatch(&Request{TypeUrl: "test"})
8888
select {
8989
case _, more := <-w:
9090
if more {
@@ -99,9 +99,9 @@ func TestLinearBasic(t *testing.T) {
9999
c := NewLinearCache(testType, nil)
100100

101101
// Create watches before a resource is ready
102-
w1, _ := c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
102+
w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
103103
mustBlock(t, w1)
104-
w, _ := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
104+
w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
105105
mustBlock(t, w)
106106
checkWatchCount(t, c, "a", 2)
107107
checkWatchCount(t, c, "b", 1)
@@ -112,44 +112,44 @@ func TestLinearBasic(t *testing.T) {
112112
verifyResponse(t, w, "1", 1)
113113

114114
// Request again, should get same response
115-
w, _ = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
115+
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
116116
checkWatchCount(t, c, "a", 0)
117117
verifyResponse(t, w, "1", 1)
118-
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
118+
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
119119
checkWatchCount(t, c, "a", 0)
120120
verifyResponse(t, w, "1", 1)
121121

122122
// Add another element and update the first, response should be different
123123
c.UpdateResource("b", testResource("b"))
124124
c.UpdateResource("a", testResource("aa"))
125-
w, _ = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
125+
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
126126
verifyResponse(t, w, "3", 1)
127-
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
127+
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
128128
verifyResponse(t, w, "3", 2)
129129
}
130130

131131
func TestLinearDeletion(t *testing.T) {
132132
c := NewLinearCache(testType, map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})
133-
w, _ := c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
133+
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
134134
mustBlock(t, w)
135135
checkWatchCount(t, c, "a", 1)
136136
c.DeleteResource("a")
137137
verifyResponse(t, w, "1", 0)
138138
checkWatchCount(t, c, "a", 0)
139-
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
139+
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
140140
verifyResponse(t, w, "1", 1)
141141
checkWatchCount(t, c, "b", 0)
142142
c.DeleteResource("b")
143-
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
143+
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
144144
verifyResponse(t, w, "2", 0)
145145
checkWatchCount(t, c, "b", 0)
146146
}
147147

148148
func TestLinearWatchTwo(t *testing.T) {
149149
c := NewLinearCache(testType, map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})
150-
w, _ := c.CreateWatch(Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
150+
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
151151
mustBlock(t, w)
152-
w1, _ := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
152+
w1, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
153153
mustBlock(t, w1)
154154
c.UpdateResource("a", testResource("aa"))
155155
// should only get the modified resource
@@ -162,24 +162,24 @@ func TestLinearCancel(t *testing.T) {
162162
c.UpdateResource("a", testResource("a"))
163163

164164
// cancel watch-all
165-
w, cancel := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
165+
w, cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
166166
mustBlock(t, w)
167167
checkWatchCount(t, c, "a", 1)
168168
cancel()
169169
checkWatchCount(t, c, "a", 0)
170170

171171
// cancel watch for "a"
172-
w, cancel = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
172+
w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
173173
mustBlock(t, w)
174174
checkWatchCount(t, c, "a", 1)
175175
cancel()
176176
checkWatchCount(t, c, "a", 0)
177177

178178
// open four watches for "a" and "b" and two for all, cancel one of each, make sure the second one is unaffected
179-
w, cancel = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
180-
w2, cancel2 := c.CreateWatch(Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
181-
w3, cancel3 := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
182-
w4, cancel4 := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
179+
w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
180+
w2, cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
181+
w3, cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
182+
w4, cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
183183
mustBlock(t, w)
184184
mustBlock(t, w2)
185185
mustBlock(t, w3)
@@ -212,7 +212,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
212212
} else {
213213
id2 := fmt.Sprintf("%d", i-1)
214214
t.Logf("request resources %q and %q", id, id2)
215-
value, _ := c.CreateWatch(Request{
215+
value, _ := c.CreateWatch(&Request{
216216
// Only expect one to become stale
217217
ResourceNames: []string{id, id2},
218218
VersionInfo: "0",

pkg/cache/v2/mux.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ type MuxCache struct {
3434

3535
var _ Cache = &MuxCache{}
3636

37-
func (mux *MuxCache) CreateWatch(request Request) (chan Response, func()) {
38-
key := mux.Classify(request)
37+
func (mux *MuxCache) CreateWatch(request *Request) (chan Response, func()) {
38+
key := mux.Classify(*request)
3939
cache, exists := mux.Caches[key]
4040
if !exists {
4141
value := make(chan Response, 0)
@@ -45,6 +45,6 @@ func (mux *MuxCache) CreateWatch(request Request) (chan Response, func()) {
4545
return cache.CreateWatch(request)
4646
}
4747

48-
func (mux *MuxCache) Fetch(ctx context.Context, request Request) (Response, error) {
48+
func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error) {
4949
return nil, errors.New("not implemented")
5050
}

0 commit comments

Comments
 (0)