Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/cloud-node-manager/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Config struct {
// Specifies if node information is retrieved via IMDS or ARM.
UseInstanceMetadata bool

// EnableNodeEventChecker if enabled will also run the NodeEventChecker. Can only be set to true if UseInstanceMetadata is also true.
EnableNodeEventChecker bool

// WindowsService should be set to true if cloud-node-manager is running as a service on Windows.
// Its corresponding flag only gets registered in Windows builds
WindowsService bool
Expand Down
15 changes: 15 additions & 0 deletions cmd/cloud-node-manager/app/nodemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (

cloudnodeconfig "sigs.k8s.io/cloud-provider-azure/cmd/cloud-node-manager/app/config"
"sigs.k8s.io/cloud-provider-azure/cmd/cloud-node-manager/app/options"
"sigs.k8s.io/cloud-provider-azure/pkg/consts"
nodeprovider "sigs.k8s.io/cloud-provider-azure/pkg/node"
"sigs.k8s.io/cloud-provider-azure/pkg/nodemanager"
"sigs.k8s.io/cloud-provider-azure/pkg/provider"
"sigs.k8s.io/cloud-provider-azure/pkg/version"
"sigs.k8s.io/cloud-provider-azure/pkg/version/verflag"
)
Expand Down Expand Up @@ -136,6 +138,19 @@ func startControllers(ctx context.Context, c *cloudnodeconfig.Config, healthzHan
c.EnableDeprecatedBetaTopologyLabels)

go nodeController.Run(ctx)
if c.EnableNodeEventChecker {
imdsService, err := provider.NewInstanceMetadataService(consts.ImdsServer)
if err != nil {
return fmt.Errorf("failed to create instance metadata service: %w", err)
}
eventChecker := nodemanager.NewEventChecker(
c.NodeName,
c.Client,
imdsService,
2*time.Second,
)
go eventChecker.Run(ctx)
}

check := controllerhealthz.NamedPingChecker(c.NodeName)
healthzHandler.AddHealthChecker(check)
Expand Down
4 changes: 4 additions & 0 deletions cmd/cloud-node-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type CloudNodeManagerOptions struct {

UseInstanceMetadata bool

EnableNodeEventChecker bool

// WindowsService should be set to true if cloud-node-manager is running as a service on Windows.
// Its corresponding flag only gets registered in Windows builds
WindowsService bool
Expand Down Expand Up @@ -135,6 +137,7 @@ func (o *CloudNodeManagerOptions) Flags() cliflag.NamedFlagSets {
fs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", 30, "Burst to use while talking with kubernetes apiserver.")
fs.BoolVar(&o.WaitForRoutes, "wait-routes", false, "Whether the nodes should wait for routes created on Azure route table. It should be set to true when using kubenet plugin.")
fs.BoolVar(&o.UseInstanceMetadata, "use-instance-metadata", true, "Should use Instance Metadata Service for fetching node information; if false will use ARM instead.")
fs.BoolVar(&o.EnableNodeEventChecker, "enable-node-event-checker", true, "Should enable the NodeEventChecker to check for Azure scheduled events. Can only be set to true if --use-instance-metadata is also true. If false, the NodeEventChecker will not run and no events will be recorded in the node status.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we default to false, and enable as needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. we'd need user to enable this feature by the new flag

fs.StringVar(&o.CloudConfigFilePath, "cloud-config", o.CloudConfigFilePath, "The path to the cloud config file to be used when using ARM to fetch node information.")
fs.BoolVar(&o.EnableDeprecatedBetaTopologyLabels, "enable-deprecated-beta-topology-labels", o.EnableDeprecatedBetaTopologyLabels, "DEPRECATED: This flag will be removed in a future release. If true, the node will apply beta topology labels.")
return fss
Expand Down Expand Up @@ -195,6 +198,7 @@ func (o *CloudNodeManagerOptions) ApplyTo(c *cloudnodeconfig.Config, userAgent s
}))
c.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
c.UseInstanceMetadata = o.UseInstanceMetadata
c.EnableNodeEventChecker = c.UseInstanceMetadata && o.EnableNodeEventChecker
c.CloudConfigFilePath = o.CloudConfigFilePath

c.WindowsService = o.WindowsService
Expand Down
6 changes: 4 additions & 2 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,15 @@ const (
// ImdsInstanceAPIVersion is the imds instance api version
ImdsInstanceAPIVersion = "2021-10-01"
// ImdsLoadBalancerAPIVersion is the imds load balancer api version
ImdsLoadBalancerAPIVersion = "2020-10-01"
ImdsLoadBalancerAPIVersion = "2020-10-01"
ImdsScheduledEventsAPIVersion = "2020-07-01"
// ImdsServer is the imds server endpoint
ImdsServer = "http://169.254.169.254"
// ImdsInstanceURI is the imds instance uri
ImdsInstanceURI = "/metadata/instance"
// ImdsLoadBalancerURI is the imds load balancer uri
ImdsLoadBalancerURI = "/metadata/loadbalancer"
ImdsLoadBalancerURI = "/metadata/loadbalancer"
ImdsScheduledEventsURI = "/metadata/scheduledevents"
)

// routes
Expand Down
139 changes: 139 additions & 0 deletions pkg/nodemanager/eventchecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2019 The Kubernetes Authors.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be 2025, no?

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodemanager

import (
"context"
"fmt"
"strings"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"sigs.k8s.io/cloud-provider-azure/pkg/provider"
utilnode "sigs.k8s.io/cloud-provider-azure/pkg/util/node"
)

const (
conditionType = "ScheduledEvent"
)

// EventChecker periodically checks for Azure scheduled events and updates the Kubernetes node status accordingly.
type EventChecker struct {
nodeName string
kubeClient clientset.Interface
recorder record.EventRecorder
imds *provider.InstanceMetadataService

updateFrequency time.Duration
}

// NewEventChecker creates a new EventChecker instance
func NewEventChecker(nodeName string, kubeClient clientset.Interface, metadataService *provider.InstanceMetadataService, updateFrequency time.Duration) *EventChecker {
return &EventChecker{
imds: metadataService,
nodeName: nodeName,
kubeClient: kubeClient,
updateFrequency: updateFrequency,
}
}

// Run starts the event checker loop that periodically checks for Azure scheduled events
func (ec *EventChecker) Run(ctx context.Context) {
ticker := time.NewTicker(ec.updateFrequency)
defer ticker.Stop()

klog.Infof("Starting Azure scheduled event checker for node %s with update frequency %s", ec.nodeName, ec.updateFrequency)
for {
select {
case <-ctx.Done():
klog.Infof("Stopping Azure scheduled event checker for node %s", ec.nodeName)
return
case <-ticker.C:
if err := ec.CheckAzureScheduledEvents(context.Background()); err != nil {
klog.Errorf("Failed to check Azure scheduled events: %v", err)
}
}
}
}

// CheckAzureScheduledEvents queries the Azure metadata service for scheduled events
// and updates the Kubernetes node with a custom condition "NodeEvent".
func (ec *EventChecker) CheckAzureScheduledEvents(ctx context.Context) error {
eventResponse, err := ec.imds.GetScheduledEvents()
if err != nil {
return err
}

node, err := ec.kubeClient.CoreV1().Nodes().Get(ctx, ec.nodeName, metav1.GetOptions{})
if err != nil {
return err
}

currentCondition := ec.GetNodeEventCondition(node.Status.Conditions)
if len(eventResponse.Events) == 0 {
targetCondition := v1.NodeCondition{
Type: conditionType,
Status: v1.ConditionFalse,
Reason: "NoScheduledEvents",
Message: "No scheduled events found",
}
if currentCondition == nil || targetCondition.Status != currentCondition.Status {
klog.Infof("No scheduled events found for node %s, updating condition to %s", ec.nodeName, targetCondition.Status)
return utilnode.SetNodeCondition(ec.kubeClient, types.NodeName(ec.nodeName), targetCondition)
}

return nil // No events to process, and no change in condition
}

targetCondition := ScheduledEventCondition(*eventResponse)
if currentCondition == nil || targetCondition.Message != currentCondition.Message {
klog.Infof("Scheduled events found for node %s, updating condition to %s", ec.nodeName, targetCondition.Status)
return utilnode.SetNodeCondition(ec.kubeClient, types.NodeName(ec.nodeName), targetCondition)
}
return nil
}

func (ec *EventChecker) GetNodeEventCondition(conditions []v1.NodeCondition) *v1.NodeCondition {
for _, condition := range conditions {
if condition.Type == conditionType {
return &condition
}
}

return nil
}

func ScheduledEventCondition(event provider.EventResponse) v1.NodeCondition {
nodeCondition := v1.NodeCondition{
Type: conditionType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now()},
Reason: "ScheduledEventExists",
}
var messages []string
for _, ev := range event.Events {
messages = append(messages, fmt.Sprintf("EventID: %s, EventSource: %s, EventType: %s, NotBefore: %s, Duration: %d seconds, Description: %s", ev.EventID, ev.EventSource, ev.EventType, ev.NotBefore, ev.DurationInSeconds, ev.Description))
}
nodeCondition.Message = strings.Join(messages, "; ")
return nodeCondition
}
53 changes: 53 additions & 0 deletions pkg/provider/azure_instance_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,23 @@ type InstanceMetadataService struct {
imsCache azcache.Resource
}

type Event struct {
Description string `json:"Description,omitempty"`
DurationInSeconds int `json:"DurationInSeconds,omitempty"`
EventID string `json:"EventId,omitempty"`
EventSource string `json:"EventSource,omitempty"`
EventStatus string `json:"EventStatus,omitempty"`
EventType string `json:"EventType,omitempty"`
NotBefore string `json:"NotBefore,omitempty"`
ResourceType string `json:"ResourceType,omitempty"`
Resources []string `json:"Resources,omitempty"`
}

type EventResponse struct {
DocumentIncarnation int `json:"DocumentIncarnation"`
Events []Event `json:"Events"`
}

// NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object.
func NewInstanceMetadataService(imdsServer string) (*InstanceMetadataService, error) {
ims := &InstanceMetadataService{
Expand Down Expand Up @@ -255,6 +272,42 @@ func (ims *InstanceMetadataService) getLoadBalancerMetadata() (*LoadBalancerMeta
return &obj, nil
}

func (ims *InstanceMetadataService) GetScheduledEvents() (*EventResponse, error) {
req, err := http.NewRequest("GET", ims.imdsServer+consts.ImdsScheduledEventsURI, nil)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would the request fail if there are no scheduled events? And would it fail during node bootstrap?

I'm wondering whether we should skip the errors in the caller side as failures here may block node lifecycle logic

return nil, err
}
req.Header.Add("Metadata", "True")
req.Header.Add("User-Agent", "golang/kubernetes-cloud-provider")
q := req.URL.Query()
q.Add("format", "json")
q.Add("api-version", consts.ImdsScheduledEventsAPIVersion)
req.URL.RawQuery = q.Encode()

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failure of getting scheduled events with response %q", resp.Status)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

er := EventResponse{}
if err := json.Unmarshal(data, &er); err != nil {
return nil, err
}
return &er, nil

}

// GetMetadata gets instance metadata from cache.
// crt determines if we can get data from stalled cache/need fresh if cache expired.
func (ims *InstanceMetadataService) GetMetadata(ctx context.Context, crt azcache.AzureCacheReadType) (*InstanceMetadata, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/provider/config/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this file?

"sigs.k8s.io/cloud-provider-azure/pkg/consts"
)

Expand Down Expand Up @@ -133,7 +134,7 @@ type Config struct {
// `nodeIP`: vm private IPs will be attached to the inbound backend pool of the load balancer;
// `podIP`: pod IPs will be attached to the inbound backend pool of the load balancer (not supported yet).
LoadBalancerBackendPoolConfigurationType string `json:"loadBalancerBackendPoolConfigurationType,omitempty" yaml:"loadBalancerBackendPoolConfigurationType,omitempty"`
// PutVMSSVMBatchSize defines how many requests the client send concurrently when putting the VMSS VMs.
// PutVMSSVMBatchSize defines how many reque hssts the client send concurrently when putting the VMSS VMs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

// If it is smaller than or equal to one, the request will be sent one by one in sequence (default).
PutVMSSVMBatchSize int `json:"putVMSSVMBatchSize" yaml:"putVMSSVMBatchSize"`
// PrivateLinkServiceResourceGroup determines the specific resource group of the private link services user want to use
Expand Down