diff --git a/pkg/kubernetes/manager.go b/pkg/kubernetes/manager.go index 2ba8970e..6851ae35 100644 --- a/pkg/kubernetes/manager.go +++ b/pkg/kubernetes/manager.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" "os" + "sort" "strconv" "strings" + "sync" + "time" "github.com/containers/kubernetes-mcp-server/pkg/config" "github.com/fsnotify/fsnotify" @@ -23,10 +26,38 @@ type Manager struct { staticConfig *config.StaticConfig CloseWatchKubeConfig CloseWatchKubeConfig + + clusterWatcher *clusterStateWatcher +} + +// clusterState represents the cached state of the cluster +type clusterState struct { + apiGroups []string + isOpenShift bool +} + +// clusterStateWatcher monitors cluster state changes and triggers debounced reloads +type clusterStateWatcher struct { + manager *Manager + pollInterval time.Duration + debounceWindow time.Duration + lastKnownState clusterState + reloadCallback func() error + debounceTimer *time.Timer + mu sync.Mutex + stopCh chan struct{} + stoppedCh chan struct{} } var _ Openshift = (*Manager)(nil) +const ( + // DefaultClusterStatePollInterval is the default interval for polling cluster state changes + DefaultClusterStatePollInterval = 30 * time.Second + // DefaultClusterStateDebounceWindow is the default debounce window for cluster state changes + DefaultClusterStateDebounceWindow = 5 * time.Second +) + var ( ErrorKubeconfigInClusterNotAllowed = errors.New("kubeconfig manager cannot be used in in-cluster deployments") ErrorInClusterNotInCluster = errors.New("in-cluster manager cannot be used outside of a cluster") @@ -154,6 +185,9 @@ func (m *Manager) Close() { if m.CloseWatchKubeConfig != nil { _ = m.CloseWatchKubeConfig() } + if m.clusterWatcher != nil { + m.clusterWatcher.stop() + } } func (m *Manager) VerifyToken(ctx context.Context, token, audience string) (*authenticationv1api.UserInfo, []string, error) { @@ -249,3 +283,117 @@ func applyRateLimitFromEnv(cfg *rest.Config) { } } } + +// WatchClusterState starts a background watcher that periodically polls for cluster state changes +// and triggers a debounced reload when changes are detected. +func (m *Manager) WatchClusterState(pollInterval, debounceWindow time.Duration, onClusterStateChange func() error) { + if m.clusterWatcher != nil { + m.clusterWatcher.stop() + } + + watcher := &clusterStateWatcher{ + manager: m, + pollInterval: pollInterval, + debounceWindow: debounceWindow, + reloadCallback: onClusterStateChange, + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + + captureState := func() clusterState { + state := clusterState{apiGroups: []string{}} + if groups, err := m.accessControlClientset.DiscoveryClient().ServerGroups(); err == nil { + for _, group := range groups.Groups { + state.apiGroups = append(state.apiGroups, group.Name) + } + sort.Strings(state.apiGroups) + } + state.isOpenShift = m.IsOpenShift(context.Background()) + return state + } + watcher.lastKnownState = captureState() + + m.clusterWatcher = watcher + + // Start background monitoring + go func() { + defer close(watcher.stoppedCh) + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + klog.V(2).Infof("Started cluster state watcher (poll interval: %v, debounce: %v)", pollInterval, debounceWindow) + + for { + select { + case <-watcher.stopCh: + klog.V(2).Info("Stopping cluster state watcher") + return + case <-ticker.C: + // Invalidate discovery cache to get fresh API groups + m.accessControlClientset.DiscoveryClient().Invalidate() + + watcher.mu.Lock() + current := captureState() + klog.V(3).Infof("Polled cluster state: %d API groups, OpenShift=%v", len(current.apiGroups), current.isOpenShift) + + changed := current.isOpenShift != watcher.lastKnownState.isOpenShift || + len(current.apiGroups) != len(watcher.lastKnownState.apiGroups) + + if !changed { + for i := range current.apiGroups { + if current.apiGroups[i] != watcher.lastKnownState.apiGroups[i] { + changed = true + break + } + } + } + + if changed { + klog.V(2).Info("Cluster state changed, scheduling debounced reload") + if watcher.debounceTimer != nil { + watcher.debounceTimer.Stop() + } + watcher.debounceTimer = time.AfterFunc(debounceWindow, func() { + klog.V(2).Info("Debounce window expired, triggering reload") + if err := onClusterStateChange(); err != nil { + klog.Errorf("Failed to reload: %v", err) + } else { + watcher.mu.Lock() + watcher.lastKnownState = captureState() + watcher.mu.Unlock() + klog.V(2).Info("Reload completed") + } + }) + } + watcher.mu.Unlock() + } + } + }() +} + +// stop stops the cluster state watcher +func (w *clusterStateWatcher) stop() { + if w == nil { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + if w.debounceTimer != nil { + w.debounceTimer.Stop() + } + + if w.stopCh == nil || w.stoppedCh == nil { + return + } + + select { + case <-w.stopCh: + // Already closed or stopped + return + default: + close(w.stopCh) + <-w.stoppedCh + } +} diff --git a/pkg/kubernetes/manager_test.go b/pkg/kubernetes/manager_test.go index aeed934e..4f54b299 100644 --- a/pkg/kubernetes/manager_test.go +++ b/pkg/kubernetes/manager_test.go @@ -228,6 +228,49 @@ func (s *ManagerTestSuite) TestNewManager() { }) } +func (s *ManagerTestSuite) TestClusterStateWatcherStop() { + s.Run("stop() on nil watcher", func() { + var watcher *clusterStateWatcher + // Should not panic + watcher.stop() + }) + + s.Run("stop() on uninitialized watcher (nil channels)", func() { + watcher := &clusterStateWatcher{} + // Should not panic even with nil channels + watcher.stop() + }) + + s.Run("stop() on initialized watcher", func() { + watcher := &clusterStateWatcher{ + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + // Close the stoppedCh to simulate a running goroutine + go func() { + <-watcher.stopCh + close(watcher.stoppedCh) + }() + // Should not panic and should stop cleanly + watcher.stop() + }) + + s.Run("stop() called multiple times", func() { + watcher := &clusterStateWatcher{ + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + go func() { + <-watcher.stopCh + close(watcher.stoppedCh) + }() + // First stop + watcher.stop() + // Second stop should not panic + watcher.stop() + }) +} + func TestManager(t *testing.T) { suite.Run(t, new(ManagerTestSuite)) } diff --git a/pkg/kubernetes/provider_kubeconfig.go b/pkg/kubernetes/provider_kubeconfig.go index e70f0a6c..b46740e1 100644 --- a/pkg/kubernetes/provider_kubeconfig.go +++ b/pkg/kubernetes/provider_kubeconfig.go @@ -120,8 +120,8 @@ func (p *kubeConfigClusterProvider) GetDefaultTarget() string { func (p *kubeConfigClusterProvider) WatchTargets(onKubeConfigChanged func() error) { m := p.managers[p.defaultContext] - m.WatchKubeConfig(onKubeConfigChanged) + m.WatchClusterState(DefaultClusterStatePollInterval, DefaultClusterStateDebounceWindow, onKubeConfigChanged) } func (p *kubeConfigClusterProvider) Close() { diff --git a/pkg/kubernetes/provider_single.go b/pkg/kubernetes/provider_single.go index 3693d639..1e663f67 100644 --- a/pkg/kubernetes/provider_single.go +++ b/pkg/kubernetes/provider_single.go @@ -87,6 +87,7 @@ func (p *singleClusterProvider) GetTargetParameterName() string { func (p *singleClusterProvider) WatchTargets(watch func() error) { p.manager.WatchKubeConfig(watch) + p.manager.WatchClusterState(DefaultClusterStatePollInterval, DefaultClusterStateDebounceWindow, watch) } func (p *singleClusterProvider) Close() { diff --git a/pkg/mcp/mcp.go b/pkg/mcp/mcp.go index 6a4a6d2f..8fee520f 100644 --- a/pkg/mcp/mcp.go +++ b/pkg/mcp/mcp.go @@ -98,31 +98,44 @@ func NewServer(configuration Configuration) (*Server, error) { func (s *Server) reloadKubernetesClusterProvider() error { ctx := context.Background() - p, err := internalk8s.NewProvider(s.configuration.StaticConfig) + + newProvider, err := internalk8s.NewProvider(s.configuration.StaticConfig) + if err != nil { + return err + } + + targets, err := newProvider.GetTargets(ctx) if err != nil { + newProvider.Close() return err } - // close the old provider if s.p != nil { s.p.Close() } - s.p = p + s.p = newProvider - targets, err := p.GetTargets(ctx) - if err != nil { + if err := s.rebuildTools(targets); err != nil { return err } + s.p.WatchTargets(s.reloadKubernetesClusterProvider) + + return nil +} + +// rebuildTools rebuilds the MCP tool registry based on the current provider and targets. +// This is called after the provider has been successfully validated and set. +func (s *Server) rebuildTools(targets []string) error { filter := CompositeFilter( s.configuration.isToolApplicable, - ShouldIncludeTargetListTool(p.GetTargetParameterName(), targets), + ShouldIncludeTargetListTool(s.p.GetTargetParameterName(), targets), ) mutator := WithTargetParameter( - p.GetDefaultTarget(), - p.GetTargetParameterName(), + s.p.GetDefaultTarget(), + s.p.GetTargetParameterName(), targets, ) @@ -136,7 +149,7 @@ func (s *Server) reloadKubernetesClusterProvider() error { applicableTools := make([]api.ServerTool, 0) s.enabledTools = make([]string, 0) for _, toolset := range s.configuration.Toolsets() { - for _, tool := range toolset.GetTools(p) { + for _, tool := range toolset.GetTools(s.p) { tool := mutator(tool) if !filter(tool) { continue @@ -157,6 +170,7 @@ func (s *Server) reloadKubernetesClusterProvider() error { } s.server.RemoveTools(toolsToRemove...) + // Add new tools for _, tool := range applicableTools { goSdkTool, goSdkToolHandler, err := ServerToolToGoSdkTool(s, tool) if err != nil { @@ -165,8 +179,6 @@ func (s *Server) reloadKubernetesClusterProvider() error { s.server.AddTool(goSdkTool, goSdkToolHandler) } - // start new watch - s.p.WatchTargets(s.reloadKubernetesClusterProvider) return nil }