Skip to content

Commit 5701fcb

Browse files
committed
WiP: adaptation: implement plugin update.
Start treating plugins which register with the same index and name as multiple instances of the same plugin. When a plugin registers, shut down the previous instance of the same plugin once the later one gets synchronized. Signed-off-by: Krisztian Litkey <[email protected]>
1 parent bc7f1ed commit 5701fcb

File tree

3 files changed

+86
-21
lines changed

3 files changed

+86
-21
lines changed

pkg/adaptation/adaptation.go

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (r *Adaptation) ShutdownPlugins(reason, pattern string) {
242242
func (r *Adaptation) RunPodSandbox(ctx context.Context, req *RunPodSandboxRequest) error {
243243
r.Lock()
244244
defer r.Unlock()
245-
defer r.removeClosedPlugins()
245+
defer r.removeClosedPlugins(nil)
246246

247247
for _, plugin := range r.plugins {
248248
_, err := plugin.runPodSandbox(ctx, req)
@@ -258,7 +258,7 @@ func (r *Adaptation) RunPodSandbox(ctx context.Context, req *RunPodSandboxReques
258258
func (r *Adaptation) UpdatePodSandbox(ctx context.Context, req *UpdatePodSandboxRequest) (*UpdatePodSandboxResponse, error) {
259259
r.Lock()
260260
defer r.Unlock()
261-
defer r.removeClosedPlugins()
261+
defer r.removeClosedPlugins(nil)
262262

263263
for _, plugin := range r.plugins {
264264
_, err := plugin.updatePodSandbox(ctx, req)
@@ -274,7 +274,7 @@ func (r *Adaptation) UpdatePodSandbox(ctx context.Context, req *UpdatePodSandbox
274274
func (r *Adaptation) PostUpdatePodSandbox(ctx context.Context, req *PostUpdatePodSandboxRequest) error {
275275
r.Lock()
276276
defer r.Unlock()
277-
defer r.removeClosedPlugins()
277+
defer r.removeClosedPlugins(nil)
278278

279279
for _, plugin := range r.plugins {
280280
_, err := plugin.postUpdatePodSandbox(ctx, req)
@@ -290,7 +290,7 @@ func (r *Adaptation) PostUpdatePodSandbox(ctx context.Context, req *PostUpdatePo
290290
func (r *Adaptation) StopPodSandbox(ctx context.Context, req *StopPodSandboxRequest) error {
291291
r.Lock()
292292
defer r.Unlock()
293-
defer r.removeClosedPlugins()
293+
defer r.removeClosedPlugins(nil)
294294

295295
for _, plugin := range r.plugins {
296296
_, err := plugin.stopPodSandbox(ctx, req)
@@ -306,7 +306,7 @@ func (r *Adaptation) StopPodSandbox(ctx context.Context, req *StopPodSandboxRequ
306306
func (r *Adaptation) RemovePodSandbox(ctx context.Context, req *RemovePodSandboxRequest) error {
307307
r.Lock()
308308
defer r.Unlock()
309-
defer r.removeClosedPlugins()
309+
defer r.removeClosedPlugins(nil)
310310

311311
for _, plugin := range r.plugins {
312312
_, err := plugin.removePodSandbox(ctx, req)
@@ -322,7 +322,7 @@ func (r *Adaptation) RemovePodSandbox(ctx context.Context, req *RemovePodSandbox
322322
func (r *Adaptation) CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
323323
r.Lock()
324324
defer r.Unlock()
325-
defer r.removeClosedPlugins()
325+
defer r.removeClosedPlugins(nil)
326326

327327
var (
328328
result = collectCreateContainerResult(req)
@@ -357,7 +357,7 @@ func (r *Adaptation) CreateContainer(ctx context.Context, req *CreateContainerRe
357357
func (r *Adaptation) PostCreateContainer(ctx context.Context, req *PostCreateContainerRequest) error {
358358
r.Lock()
359359
defer r.Unlock()
360-
defer r.removeClosedPlugins()
360+
defer r.removeClosedPlugins(nil)
361361

362362
for _, plugin := range r.plugins {
363363
_, err := plugin.postCreateContainer(ctx, req)
@@ -373,7 +373,7 @@ func (r *Adaptation) PostCreateContainer(ctx context.Context, req *PostCreateCon
373373
func (r *Adaptation) StartContainer(ctx context.Context, req *StartContainerRequest) error {
374374
r.Lock()
375375
defer r.Unlock()
376-
defer r.removeClosedPlugins()
376+
defer r.removeClosedPlugins(nil)
377377

378378
for _, plugin := range r.plugins {
379379
_, err := plugin.startContainer(ctx, req)
@@ -389,7 +389,7 @@ func (r *Adaptation) StartContainer(ctx context.Context, req *StartContainerRequ
389389
func (r *Adaptation) PostStartContainer(ctx context.Context, req *PostStartContainerRequest) error {
390390
r.Lock()
391391
defer r.Unlock()
392-
defer r.removeClosedPlugins()
392+
defer r.removeClosedPlugins(nil)
393393

394394
for _, plugin := range r.plugins {
395395
_, err := plugin.postStartContainer(ctx, req)
@@ -405,7 +405,7 @@ func (r *Adaptation) PostStartContainer(ctx context.Context, req *PostStartConta
405405
func (r *Adaptation) UpdateContainer(ctx context.Context, req *UpdateContainerRequest) (*UpdateContainerResponse, error) {
406406
r.Lock()
407407
defer r.Unlock()
408-
defer r.removeClosedPlugins()
408+
defer r.removeClosedPlugins(nil)
409409

410410
result := collectUpdateContainerResult(req)
411411
for _, plugin := range r.plugins {
@@ -426,7 +426,7 @@ func (r *Adaptation) UpdateContainer(ctx context.Context, req *UpdateContainerRe
426426
func (r *Adaptation) PostUpdateContainer(ctx context.Context, req *PostUpdateContainerRequest) error {
427427
r.Lock()
428428
defer r.Unlock()
429-
defer r.removeClosedPlugins()
429+
defer r.removeClosedPlugins(nil)
430430

431431
for _, plugin := range r.plugins {
432432
_, err := plugin.postUpdateContainer(ctx, req)
@@ -442,7 +442,7 @@ func (r *Adaptation) PostUpdateContainer(ctx context.Context, req *PostUpdateCon
442442
func (r *Adaptation) StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {
443443
r.Lock()
444444
defer r.Unlock()
445-
defer r.removeClosedPlugins()
445+
defer r.removeClosedPlugins(nil)
446446

447447
result := collectStopContainerResult()
448448
for _, plugin := range r.plugins {
@@ -463,7 +463,7 @@ func (r *Adaptation) StopContainer(ctx context.Context, req *StopContainerReques
463463
func (r *Adaptation) RemoveContainer(ctx context.Context, req *RemoveContainerRequest) error {
464464
r.Lock()
465465
defer r.Unlock()
466-
defer r.removeClosedPlugins()
466+
defer r.removeClosedPlugins(nil)
467467

468468
for _, plugin := range r.plugins {
469469
_, err := plugin.removeContainer(ctx, req)
@@ -592,7 +592,7 @@ func (r *Adaptation) startPlugins() (retErr error) {
592592
}
593593

594594
r.plugins = plugins
595-
r.sortPlugins()
595+
r.sortPlugins(nil)
596596
return nil
597597
}
598598

@@ -606,12 +606,19 @@ func (r *Adaptation) stopPlugins() {
606606
r.plugins = nil
607607
}
608608

609-
func (r *Adaptation) removeClosedPlugins() {
610-
var active, closed, validators []*plugin
609+
func (r *Adaptation) removeClosedPlugins(newPlugin *plugin) *plugin {
610+
var (
611+
active, closed, validators []*plugin
612+
old *plugin
613+
)
614+
611615
for _, p := range r.plugins {
612-
if p.isClosed() {
616+
switch {
617+
case p.isClosed():
613618
closed = append(closed, p)
614-
} else {
619+
case isSameExternalInstance(p, newPlugin):
620+
old = p
621+
default:
615622
active = append(active, p)
616623
if p.isContainerAdjustmentValidator() {
617624
validators = append(validators, p)
@@ -629,6 +636,7 @@ func (r *Adaptation) removeClosedPlugins() {
629636

630637
r.plugins = active
631638
r.validators = validators
639+
return old
632640
}
633641

634642
func (r *Adaptation) startListener() error {
@@ -695,8 +703,11 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
695703
if p.isContainerAdjustmentValidator() {
696704
r.validators = append(r.validators, p)
697705
}
698-
r.sortPlugins()
706+
old := r.sortPlugins(p)
699707
r.Unlock()
708+
if old != nil {
709+
old.shutdown(api.ShutdownByOtherInstance)
710+
}
700711
log.Infof(ctx, "plugin %q connected and synchronized", p.name())
701712
}
702713

@@ -759,8 +770,8 @@ func (r *Adaptation) discoverPlugins() ([]string, []string, []string, error) {
759770
return indices, plugins, configs, nil
760771
}
761772

762-
func (r *Adaptation) sortPlugins() {
763-
r.removeClosedPlugins()
773+
func (r *Adaptation) sortPlugins(newPlugin *plugin) *plugin {
774+
oldInstance := r.removeClosedPlugins(newPlugin)
764775
sort.Slice(r.plugins, func(i, j int) bool {
765776
return r.plugins[i].idx < r.plugins[j].idx
766777
})
@@ -779,6 +790,20 @@ func (r *Adaptation) sortPlugins() {
779790
log.Infof(noCtx, " %q (%s)", p.name(), p.qualifiedName())
780791
}
781792
}
793+
return oldInstance
794+
}
795+
796+
func isSameExternalInstance(p1, p2 *plugin) bool {
797+
if p1 == nil || p2 == nil {
798+
return false
799+
}
800+
if !p1.isExternal() || !p2.isExternal() {
801+
return false
802+
}
803+
if p1.idx != p2.idx || p1.base != p2.base {
804+
return false
805+
}
806+
return p1 != p2
782807
}
783808

784809
func (r *Adaptation) hasValidators() bool {

pkg/adaptation/adaptation_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3388,6 +3388,42 @@ func TestContainerAdjustmentValidation(t *testing.T) {
33883388
}
33893389
}
33903390

3391+
func TestPluginInstanceUpdate(t *testing.T) {
3392+
var (
3393+
sut *Suite
3394+
evt *EventCollector
3395+
tc = &testbase{
3396+
plugins: map[string][]PluginOption{
3397+
"00-test": {},
3398+
},
3399+
}
3400+
)
3401+
3402+
sut, evt = tc.Setup(t)
3403+
sut.Start(WithWaitForPluginsToStart())
3404+
3405+
p := NewPlugin(t, sut.Dir(), evt.Channel(),
3406+
WithPluginIndex("00"),
3407+
WithPluginName("test"),
3408+
)
3409+
3410+
p.Start()
3411+
_, started := evt.Search(
3412+
EventOccurred(PluginSynchronized(p.ID(), nil, nil)),
3413+
UntilTimeout(1*time.Second),
3414+
)
3415+
require.True(t, started, "start new plugin instance")
3416+
3417+
_, shutdown := evt.Search(
3418+
EventOccurred(PluginShutdown("00-test", api.ShutdownByOtherInstance)),
3419+
UntilTimeout(1*time.Second),
3420+
)
3421+
require.True(t, shutdown, "old instance shut down by new instance")
3422+
3423+
sut.Stop()
3424+
evt.Stop()
3425+
}
3426+
33913427
func protoDiff(a, b proto.Message) string {
33923428
return cmp.Diff(a, b, protocmp.Transform())
33933429
}

pkg/api/v1beta1/plugin.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ const (
3030
PluginNameEnvVar = "NRI_PLUGIN_NAME"
3131
// PluginIdxEnvVar is used to inform NRI-launched plugins about their ID.
3232
PluginIdxEnvVar = "NRI_PLUGIN_IDX"
33+
34+
// ShutdownByOtherInstance indicates that the plugin was shut down because
35+
// another instance of the same plugin was registered.
36+
ShutdownByOtherInstance = "other plugin instance registered"
3337
)
3438

3539
// ParsePluginName parses the (file)name of a plugin into an index and a base.

0 commit comments

Comments
 (0)