diff --git a/cmd/cloud-node-manager/app/config/config.go b/cmd/cloud-node-manager/app/config/config.go index 47371195f7..b91900d5c1 100644 --- a/cmd/cloud-node-manager/app/config/config.go +++ b/cmd/cloud-node-manager/app/config/config.go @@ -78,6 +78,9 @@ type Config struct { // Specifies if node information is retrieved via IMDS or ARM. UseInstanceMetadata bool + // EnableNodeEventChecker if enabled will also run the NodeEventChecker. Can only be set to true if UseInstanceMetadata is also true. + EnableNodeEventChecker bool + // WindowsService should be set to true if cloud-node-manager is running as a service on Windows. // Its corresponding flag only gets registered in Windows builds WindowsService bool diff --git a/cmd/cloud-node-manager/app/nodemanager.go b/cmd/cloud-node-manager/app/nodemanager.go index 5d7b5f0420..f29ba16458 100644 --- a/cmd/cloud-node-manager/app/nodemanager.go +++ b/cmd/cloud-node-manager/app/nodemanager.go @@ -34,8 +34,10 @@ import ( cloudnodeconfig "sigs.k8s.io/cloud-provider-azure/cmd/cloud-node-manager/app/config" "sigs.k8s.io/cloud-provider-azure/cmd/cloud-node-manager/app/options" + "sigs.k8s.io/cloud-provider-azure/pkg/consts" nodeprovider "sigs.k8s.io/cloud-provider-azure/pkg/node" "sigs.k8s.io/cloud-provider-azure/pkg/nodemanager" + "sigs.k8s.io/cloud-provider-azure/pkg/provider" "sigs.k8s.io/cloud-provider-azure/pkg/version" "sigs.k8s.io/cloud-provider-azure/pkg/version/verflag" ) @@ -136,6 +138,19 @@ func startControllers(ctx context.Context, c *cloudnodeconfig.Config, healthzHan c.EnableDeprecatedBetaTopologyLabels) go nodeController.Run(ctx) + if c.EnableNodeEventChecker { + imdsService, err := provider.NewInstanceMetadataService(consts.ImdsServer) + if err != nil { + return fmt.Errorf("failed to create instance metadata service: %w", err) + } + eventChecker := nodemanager.NewEventChecker( + c.NodeName, + c.Client, + imdsService, + 2*time.Second, + ) + go eventChecker.Run(ctx) + } check := controllerhealthz.NamedPingChecker(c.NodeName) healthzHandler.AddHealthChecker(check) diff --git a/cmd/cloud-node-manager/app/options/options.go b/cmd/cloud-node-manager/app/options/options.go index a8b3f081cf..46ebf70b67 100644 --- a/cmd/cloud-node-manager/app/options/options.go +++ b/cmd/cloud-node-manager/app/options/options.go @@ -83,6 +83,8 @@ type CloudNodeManagerOptions struct { UseInstanceMetadata bool + EnableNodeEventChecker bool + // WindowsService should be set to true if cloud-node-manager is running as a service on Windows. // Its corresponding flag only gets registered in Windows builds WindowsService bool @@ -135,6 +137,7 @@ func (o *CloudNodeManagerOptions) Flags() cliflag.NamedFlagSets { fs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", 30, "Burst to use while talking with kubernetes apiserver.") fs.BoolVar(&o.WaitForRoutes, "wait-routes", false, "Whether the nodes should wait for routes created on Azure route table. It should be set to true when using kubenet plugin.") fs.BoolVar(&o.UseInstanceMetadata, "use-instance-metadata", true, "Should use Instance Metadata Service for fetching node information; if false will use ARM instead.") + fs.BoolVar(&o.EnableNodeEventChecker, "enable-node-event-checker", true, "Should enable the NodeEventChecker to check for Azure scheduled events. Can only be set to true if --use-instance-metadata is also true. If false, the NodeEventChecker will not run and no events will be recorded in the node status.") fs.StringVar(&o.CloudConfigFilePath, "cloud-config", o.CloudConfigFilePath, "The path to the cloud config file to be used when using ARM to fetch node information.") fs.BoolVar(&o.EnableDeprecatedBetaTopologyLabels, "enable-deprecated-beta-topology-labels", o.EnableDeprecatedBetaTopologyLabels, "DEPRECATED: This flag will be removed in a future release. If true, the node will apply beta topology labels.") return fss @@ -195,6 +198,7 @@ func (o *CloudNodeManagerOptions) ApplyTo(c *cloudnodeconfig.Config, userAgent s })) c.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency c.UseInstanceMetadata = o.UseInstanceMetadata + c.EnableNodeEventChecker = c.UseInstanceMetadata && o.EnableNodeEventChecker c.CloudConfigFilePath = o.CloudConfigFilePath c.WindowsService = o.WindowsService diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 658b7fd1fc..b40999d1be 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -425,13 +425,15 @@ const ( // ImdsInstanceAPIVersion is the imds instance api version ImdsInstanceAPIVersion = "2021-10-01" // ImdsLoadBalancerAPIVersion is the imds load balancer api version - ImdsLoadBalancerAPIVersion = "2020-10-01" + ImdsLoadBalancerAPIVersion = "2020-10-01" + ImdsScheduledEventsAPIVersion = "2020-07-01" // ImdsServer is the imds server endpoint ImdsServer = "http://169.254.169.254" // ImdsInstanceURI is the imds instance uri ImdsInstanceURI = "/metadata/instance" // ImdsLoadBalancerURI is the imds load balancer uri - ImdsLoadBalancerURI = "/metadata/loadbalancer" + ImdsLoadBalancerURI = "/metadata/loadbalancer" + ImdsScheduledEventsURI = "/metadata/scheduledevents" ) // routes diff --git a/pkg/nodemanager/eventchecker.go b/pkg/nodemanager/eventchecker.go new file mode 100644 index 0000000000..ad73fbf145 --- /dev/null +++ b/pkg/nodemanager/eventchecker.go @@ -0,0 +1,139 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodemanager + +import ( + "context" + "fmt" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + + "sigs.k8s.io/cloud-provider-azure/pkg/provider" + utilnode "sigs.k8s.io/cloud-provider-azure/pkg/util/node" +) + +const ( + conditionType = "ScheduledEvent" +) + +// EventChecker periodically checks for Azure scheduled events and updates the Kubernetes node status accordingly. +type EventChecker struct { + nodeName string + kubeClient clientset.Interface + recorder record.EventRecorder + imds *provider.InstanceMetadataService + + updateFrequency time.Duration +} + +// NewEventChecker creates a new EventChecker instance +func NewEventChecker(nodeName string, kubeClient clientset.Interface, metadataService *provider.InstanceMetadataService, updateFrequency time.Duration) *EventChecker { + return &EventChecker{ + imds: metadataService, + nodeName: nodeName, + kubeClient: kubeClient, + updateFrequency: updateFrequency, + } +} + +// Run starts the event checker loop that periodically checks for Azure scheduled events +func (ec *EventChecker) Run(ctx context.Context) { + ticker := time.NewTicker(ec.updateFrequency) + defer ticker.Stop() + + klog.Infof("Starting Azure scheduled event checker for node %s with update frequency %s", ec.nodeName, ec.updateFrequency) + for { + select { + case <-ctx.Done(): + klog.Infof("Stopping Azure scheduled event checker for node %s", ec.nodeName) + return + case <-ticker.C: + if err := ec.CheckAzureScheduledEvents(context.Background()); err != nil { + klog.Errorf("Failed to check Azure scheduled events: %v", err) + } + } + } +} + +// CheckAzureScheduledEvents queries the Azure metadata service for scheduled events +// and updates the Kubernetes node with a custom condition "NodeEvent". +func (ec *EventChecker) CheckAzureScheduledEvents(ctx context.Context) error { + eventResponse, err := ec.imds.GetScheduledEvents() + if err != nil { + return err + } + + node, err := ec.kubeClient.CoreV1().Nodes().Get(ctx, ec.nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + + currentCondition := ec.GetNodeEventCondition(node.Status.Conditions) + if len(eventResponse.Events) == 0 { + targetCondition := v1.NodeCondition{ + Type: conditionType, + Status: v1.ConditionFalse, + Reason: "NoScheduledEvents", + Message: "No scheduled events found", + } + if currentCondition == nil || targetCondition.Status != currentCondition.Status { + klog.Infof("No scheduled events found for node %s, updating condition to %s", ec.nodeName, targetCondition.Status) + return utilnode.SetNodeCondition(ec.kubeClient, types.NodeName(ec.nodeName), targetCondition) + } + + return nil // No events to process, and no change in condition + } + + targetCondition := ScheduledEventCondition(*eventResponse) + if currentCondition == nil || targetCondition.Message != currentCondition.Message { + klog.Infof("Scheduled events found for node %s, updating condition to %s", ec.nodeName, targetCondition.Status) + return utilnode.SetNodeCondition(ec.kubeClient, types.NodeName(ec.nodeName), targetCondition) + } + return nil +} + +func (ec *EventChecker) GetNodeEventCondition(conditions []v1.NodeCondition) *v1.NodeCondition { + for _, condition := range conditions { + if condition.Type == conditionType { + return &condition + } + } + + return nil +} + +func ScheduledEventCondition(event provider.EventResponse) v1.NodeCondition { + nodeCondition := v1.NodeCondition{ + Type: conditionType, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: time.Now()}, + Reason: "ScheduledEventExists", + } + var messages []string + for _, ev := range event.Events { + messages = append(messages, fmt.Sprintf("EventID: %s, EventSource: %s, EventType: %s, NotBefore: %s, Duration: %d seconds, Description: %s", ev.EventID, ev.EventSource, ev.EventType, ev.NotBefore, ev.DurationInSeconds, ev.Description)) + } + nodeCondition.Message = strings.Join(messages, "; ") + return nodeCondition +} diff --git a/pkg/provider/azure_instance_metadata.go b/pkg/provider/azure_instance_metadata.go index 239e95a81f..c03de22932 100644 --- a/pkg/provider/azure_instance_metadata.go +++ b/pkg/provider/azure_instance_metadata.go @@ -108,6 +108,23 @@ type InstanceMetadataService struct { imsCache azcache.Resource } +type Event struct { + Description string `json:"Description,omitempty"` + DurationInSeconds int `json:"DurationInSeconds,omitempty"` + EventID string `json:"EventId,omitempty"` + EventSource string `json:"EventSource,omitempty"` + EventStatus string `json:"EventStatus,omitempty"` + EventType string `json:"EventType,omitempty"` + NotBefore string `json:"NotBefore,omitempty"` + ResourceType string `json:"ResourceType,omitempty"` + Resources []string `json:"Resources,omitempty"` +} + +type EventResponse struct { + DocumentIncarnation int `json:"DocumentIncarnation"` + Events []Event `json:"Events"` +} + // NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object. func NewInstanceMetadataService(imdsServer string) (*InstanceMetadataService, error) { ims := &InstanceMetadataService{ @@ -255,6 +272,42 @@ func (ims *InstanceMetadataService) getLoadBalancerMetadata() (*LoadBalancerMeta return &obj, nil } +func (ims *InstanceMetadataService) GetScheduledEvents() (*EventResponse, error) { + req, err := http.NewRequest("GET", ims.imdsServer+consts.ImdsScheduledEventsURI, nil) + if err != nil { + return nil, err + } + req.Header.Add("Metadata", "True") + req.Header.Add("User-Agent", "golang/kubernetes-cloud-provider") + q := req.URL.Query() + q.Add("format", "json") + q.Add("api-version", consts.ImdsScheduledEventsAPIVersion) + req.URL.RawQuery = q.Encode() + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failure of getting scheduled events with response %q", resp.Status) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + er := EventResponse{} + if err := json.Unmarshal(data, &er); err != nil { + return nil, err + } + return &er, nil + +} + // GetMetadata gets instance metadata from cache. // crt determines if we can get data from stalled cache/need fresh if cache expired. func (ims *InstanceMetadataService) GetMetadata(ctx context.Context, crt azcache.AzureCacheReadType) (*InstanceMetadata, error) { diff --git a/pkg/provider/config/azure.go b/pkg/provider/config/azure.go index 9d8801b9c3..0df04663cb 100644 --- a/pkg/provider/config/azure.go +++ b/pkg/provider/config/azure.go @@ -20,6 +20,7 @@ import ( "strings" "sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader" + "sigs.k8s.io/cloud-provider-azure/pkg/consts" ) @@ -133,7 +134,7 @@ type Config struct { // `nodeIP`: vm private IPs will be attached to the inbound backend pool of the load balancer; // `podIP`: pod IPs will be attached to the inbound backend pool of the load balancer (not supported yet). LoadBalancerBackendPoolConfigurationType string `json:"loadBalancerBackendPoolConfigurationType,omitempty" yaml:"loadBalancerBackendPoolConfigurationType,omitempty"` - // PutVMSSVMBatchSize defines how many requests the client send concurrently when putting the VMSS VMs. + // PutVMSSVMBatchSize defines how many reque hssts the client send concurrently when putting the VMSS VMs. // If it is smaller than or equal to one, the request will be sent one by one in sequence (default). PutVMSSVMBatchSize int `json:"putVMSSVMBatchSize" yaml:"putVMSSVMBatchSize"` // PrivateLinkServiceResourceGroup determines the specific resource group of the private link services user want to use