Skip to content

Commit a0a20fe

Browse files
authored
Merge pull request #533 from networkservicemesh/fix-find-corrner-cases
fix nse/ns Find(watch=true) corner cases on CI
2 parents a140a6b + 16788b6 commit a0a20fe

File tree

4 files changed

+53
-77
lines changed

4 files changed

+53
-77
lines changed

pkg/registry/etcd/ns_server.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import (
2222
"container/list"
2323
"context"
2424
"io"
25-
"sync"
2625
"time"
2726

27+
"github.com/edwarnicke/serialize"
2828
"github.com/golang/protobuf/ptypes/empty"
2929

3030
"github.com/pkg/errors"
@@ -46,8 +46,8 @@ type etcdNSRegistryServer struct {
4646
client versioned.Interface
4747
ns string
4848

49-
subscribers *list.List
50-
subscribersMutex sync.Mutex
49+
subscribers *list.List
50+
subscribersExecutor serialize.Executor
5151

5252
updateChannelSize int
5353
}
@@ -111,22 +111,19 @@ func (n *etcdNSRegistryServer) watchRemoteStorage() {
111111
NetworkService: item,
112112
Deleted: deleted,
113113
}
114-
n.sendEvent(resp)
114+
n.subscribersExecutor.AsyncExec(func() {
115+
n.sendEvent(resp)
116+
})
115117
}
116118
}
117119
watcher.Stop()
118120
}
119121
}
120122

121123
func (n *etcdNSRegistryServer) sendEvent(resp *registry.NetworkServiceResponse) {
122-
n.subscribersMutex.Lock()
123124
for curr := n.subscribers.Front(); curr != nil; curr = curr.Next() {
124-
select {
125-
case curr.Value.(chan *registry.NetworkServiceResponse) <- resp:
126-
default:
127-
}
125+
curr.Value.(chan *registry.NetworkServiceResponse) <- resp
128126
}
129-
n.subscribersMutex.Unlock()
130127
}
131128

132129
func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.NetworkService) (*registry.NetworkService, error) {
@@ -186,7 +183,9 @@ func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s regis
186183
}
187184
}
188185
if query.Watch {
189-
if err := n.watch(s.Context(), query, s); err != nil && !errors.Is(err, io.EOF) {
186+
var watchCtx, cancel = context.WithCancel(s.Context())
187+
defer cancel()
188+
if err := n.watch(watchCtx, query, s); err != nil && !errors.Is(err, io.EOF) {
190189
return err
191190
}
192191
}
@@ -211,19 +210,18 @@ func (n *etcdNSRegistryServer) Unregister(ctx context.Context, request *registry
211210

212211
func (n *etcdNSRegistryServer) subscribeOnEvents(ctx context.Context) <-chan *registry.NetworkServiceResponse {
213212
var ret = make(chan *registry.NetworkServiceResponse, n.updateChannelSize)
213+
var node *list.Element
214214

215-
n.subscribersMutex.Lock()
216-
var node = n.subscribers.PushBack(ret)
217-
n.subscribersMutex.Unlock()
215+
n.subscribersExecutor.AsyncExec(func() {
216+
node = n.subscribers.PushBack(ret)
217+
})
218218

219219
go func() {
220220
<-ctx.Done()
221-
222-
n.subscribersMutex.Lock()
223-
n.subscribers.Remove(node)
224-
n.subscribersMutex.Unlock()
225-
226-
close(ret)
221+
n.subscribersExecutor.AsyncExec(func() {
222+
n.subscribers.Remove(node)
223+
close(ret)
224+
})
227225
}()
228226

229227
return ret

pkg/registry/etcd/ns_server_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/stretchr/testify/require"
3232
"google.golang.org/protobuf/proto"
3333
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/watch"
3435

3536
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
3637

@@ -199,8 +200,8 @@ func Test_NSHighloadWatch_ShouldNotFail(t *testing.T) {
199200
defer cancel()
200201

201202
const clinetCount = 20
202-
const updateCount int32 = 200
203203

204+
var updateCount = watch.DefaultChanSize
204205
var actual atomic.Int32
205206
var myClientset = fake.NewSimpleClientset()
206207

@@ -219,8 +220,9 @@ func Test_NSHighloadWatch_ShouldNotFail(t *testing.T) {
219220
NetworkService: &registry.NetworkService{},
220221
Watch: true,
221222
})
223+
ch := registry.ReadNetworkServiceChannel(stream)
222224
startWg.Done()
223-
for range registry.ReadNetworkServiceChannel(stream) {
225+
for range ch {
224226
actual.Add(1)
225227
}
226228
}()

pkg/registry/etcd/nse_server.go

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"container/list"
2323
"context"
2424
"io"
25-
"sync"
2625
"time"
2726

2827
"github.com/edwarnicke/serialize"
@@ -46,11 +45,10 @@ type etcdNSERegistryServer struct {
4645
chainContext context.Context
4746
deleteExecutor serialize.Executor
4847
client versioned.Interface
49-
versions sync.Map
5048
ns string
5149

52-
subscribers *list.List
53-
subscribersMutex sync.Mutex
50+
subscribers *list.List
51+
subscribersExecutor serialize.Executor
5452

5553
updateChannelSize int
5654
}
@@ -114,9 +112,8 @@ func (n *etcdNSERegistryServer) watchRemoteStorage() {
114112
NetworkServiceEndpoint: item,
115113
Deleted: deleted,
116114
}
117-
n.sendEvent(resp)
115+
n.subscribersExecutor.AsyncExec(func() { n.sendEvent(resp) })
118116
if !deleted && item.ExpirationTime != nil && item.ExpirationTime.AsTime().Local().Before(time.Now()) {
119-
n.versions.Delete(item.GetName())
120117
n.deleteExecutor.AsyncExec(func() {
121118
_ = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(n.chainContext, item.GetName(), metav1.DeleteOptions{
122119
Preconditions: &metav1.Preconditions{
@@ -132,14 +129,9 @@ func (n *etcdNSERegistryServer) watchRemoteStorage() {
132129
}
133130

134131
func (n *etcdNSERegistryServer) sendEvent(resp *registry.NetworkServiceEndpointResponse) {
135-
n.subscribersMutex.Lock()
136132
for curr := n.subscribers.Front(); curr != nil; curr = curr.Next() {
137-
select {
138-
case curr.Value.(chan *registry.NetworkServiceEndpointResponse) <- resp:
139-
default:
140-
}
133+
curr.Value.(chan *registry.NetworkServiceEndpointResponse) <- resp
141134
}
142-
n.subscribersMutex.Unlock()
143135
}
144136

145137
func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
@@ -167,20 +159,11 @@ func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.
167159
if nse != nil {
168160
nse.Spec = *(*v1.NetworkServiceEndpointSpec)(request)
169161
apiResp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(ctx, nse, metav1.UpdateOptions{})
170-
if err != nil {
171-
return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns)
172-
}
173-
174-
n.versions.Store(apiResp.Spec.Name, apiResp.ResourceVersion)
175-
ctx = withNSEVersion(ctx, apiResp.ResourceVersion)
176-
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
177162
}
178163
}
179164
if err != nil {
180165
return nil, err
181166
}
182-
183-
n.versions.Store(apiResp.Spec.Name, apiResp.ResourceVersion)
184167
ctx = withNSEVersion(ctx, apiResp.ResourceVersion)
185168
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
186169
}
@@ -214,7 +197,9 @@ func (n *etcdNSERegistryServer) Find(query *registry.NetworkServiceEndpointQuery
214197
}
215198
}
216199
if query.Watch {
217-
if err := n.watch(s.Context(), query, s); err != nil && !errors.Is(err, io.EOF) {
200+
var watchCtx, cancel = context.WithCancel(s.Context())
201+
defer cancel()
202+
if err := n.watch(watchCtx, query, s); err != nil && !errors.Is(err, io.EOF) {
218203
return err
219204
}
220205
}
@@ -226,51 +211,41 @@ func (n *etcdNSERegistryServer) Unregister(ctx context.Context, request *registr
226211
if err != nil {
227212
return nil, errors.WithStack(err)
228213
}
229-
230-
if _, ok := nseVersionFromContext(ctx); !ok {
231-
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
232-
ctx,
233-
request.Name,
234-
metav1.DeleteOptions{})
235-
if err != nil {
236-
return nil, errors.Wrapf(err, "failed to delete a NetworkServiceEndpoints %s in a namespace %s", request.Name, n.ns)
237-
}
238-
return resp, nil
214+
var version *string
215+
if v, ok := nseVersionFromContext(ctx); ok {
216+
version = &v
239217
}
240218

241-
if v, ok := n.versions.Load(request.Name); ok {
242-
version := v.(string)
243-
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
244-
ctx,
245-
request.Name,
246-
metav1.DeleteOptions{
247-
Preconditions: &metav1.Preconditions{
248-
ResourceVersion: &version,
249-
},
250-
})
251-
if err != nil {
252-
log.FromContext(ctx).Warnf("failed to delete a NetworkServiceEndpoints %s in a namespace %s, cause: %v", request.Name, n.ns, err.Error())
253-
}
254-
n.versions.Delete(request.GetName())
219+
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
220+
ctx,
221+
request.Name,
222+
metav1.DeleteOptions{
223+
Preconditions: &metav1.Preconditions{
224+
ResourceVersion: version,
225+
},
226+
})
227+
if err != nil {
228+
log.FromContext(ctx).Warnf("failed to delete a NetworkServiceEndpoints %s in a namespace %s, cause: %v", request.Name, n.ns, err.Error())
255229
}
230+
256231
return resp, nil
257232
}
258233

259234
func (n *etcdNSERegistryServer) subscribeOnEvents(ctx context.Context) <-chan *registry.NetworkServiceEndpointResponse {
260235
var ret = make(chan *registry.NetworkServiceEndpointResponse, n.updateChannelSize)
236+
var node *list.Element
261237

262-
n.subscribersMutex.Lock()
263-
var node = n.subscribers.PushBack(ret)
264-
n.subscribersMutex.Unlock()
238+
n.subscribersExecutor.AsyncExec(func() {
239+
node = n.subscribers.PushBack(ret)
240+
})
265241

266242
go func() {
267243
<-ctx.Done()
268244

269-
n.subscribersMutex.Lock()
270-
n.subscribers.Remove(node)
271-
n.subscribersMutex.Unlock()
272-
273-
close(ret)
245+
n.subscribersExecutor.AsyncExec(func() {
246+
n.subscribers.Remove(node)
247+
close(ret)
248+
})
274249
}()
275250

276251
return ret

pkg/registry/etcd/nse_server_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/protobuf/proto"
3535
"google.golang.org/protobuf/types/known/timestamppb"
3636
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37+
"k8s.io/apimachinery/pkg/watch"
3738

3839
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
3940

@@ -197,8 +198,8 @@ func Test_NSEHighloadWatch_ShouldNotFail(t *testing.T) {
197198
defer cancel()
198199

199200
const clinetCount = 20
200-
const updateCount int32 = 200
201201

202+
var updateCount = watch.DefaultChanSize
202203
var actual atomic.Int32
203204
var myClientset = fake.NewSimpleClientset()
204205

0 commit comments

Comments
 (0)