-
Notifications
You must be signed in to change notification settings - Fork 301
Introduce scheduled event checking for Azure VMs via instance metadata #9170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| /* | ||
| Copyright 2019 The Kubernetes Authors. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be 2025, no? |
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package nodemanager | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
| "time" | ||
|
|
||
| v1 "k8s.io/api/core/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| clientset "k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/tools/record" | ||
| "k8s.io/klog/v2" | ||
|
|
||
| "sigs.k8s.io/cloud-provider-azure/pkg/provider" | ||
| utilnode "sigs.k8s.io/cloud-provider-azure/pkg/util/node" | ||
| ) | ||
|
|
||
| const ( | ||
| conditionType = "ScheduledEvent" | ||
| ) | ||
|
|
||
| // EventChecker periodically checks for Azure scheduled events and updates the Kubernetes node status accordingly. | ||
| type EventChecker struct { | ||
| nodeName string | ||
| kubeClient clientset.Interface | ||
| recorder record.EventRecorder | ||
| imds *provider.InstanceMetadataService | ||
|
|
||
| updateFrequency time.Duration | ||
| } | ||
|
|
||
| // NewEventChecker creates a new EventChecker instance | ||
| func NewEventChecker(nodeName string, kubeClient clientset.Interface, metadataService *provider.InstanceMetadataService, updateFrequency time.Duration) *EventChecker { | ||
| return &EventChecker{ | ||
| imds: metadataService, | ||
| nodeName: nodeName, | ||
| kubeClient: kubeClient, | ||
| updateFrequency: updateFrequency, | ||
| } | ||
| } | ||
|
|
||
| // Run starts the event checker loop that periodically checks for Azure scheduled events | ||
| func (ec *EventChecker) Run(ctx context.Context) { | ||
| ticker := time.NewTicker(ec.updateFrequency) | ||
| defer ticker.Stop() | ||
|
|
||
| klog.Infof("Starting Azure scheduled event checker for node %s with update frequency %s", ec.nodeName, ec.updateFrequency) | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| klog.Infof("Stopping Azure scheduled event checker for node %s", ec.nodeName) | ||
| return | ||
| case <-ticker.C: | ||
| if err := ec.CheckAzureScheduledEvents(context.Background()); err != nil { | ||
| klog.Errorf("Failed to check Azure scheduled events: %v", err) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // CheckAzureScheduledEvents queries the Azure metadata service for scheduled events | ||
| // and updates the Kubernetes node with a custom condition "NodeEvent". | ||
| func (ec *EventChecker) CheckAzureScheduledEvents(ctx context.Context) error { | ||
| eventResponse, err := ec.imds.GetScheduledEvents() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| node, err := ec.kubeClient.CoreV1().Nodes().Get(ctx, ec.nodeName, metav1.GetOptions{}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| currentCondition := ec.GetNodeEventCondition(node.Status.Conditions) | ||
| if len(eventResponse.Events) == 0 { | ||
| targetCondition := v1.NodeCondition{ | ||
| Type: conditionType, | ||
| Status: v1.ConditionFalse, | ||
| Reason: "NoScheduledEvents", | ||
| Message: "No scheduled events found", | ||
| } | ||
| if currentCondition == nil || targetCondition.Status != currentCondition.Status { | ||
| klog.Infof("No scheduled events found for node %s, updating condition to %s", ec.nodeName, targetCondition.Status) | ||
| return utilnode.SetNodeCondition(ec.kubeClient, types.NodeName(ec.nodeName), targetCondition) | ||
| } | ||
|
|
||
| return nil // No events to process, and no change in condition | ||
| } | ||
|
|
||
| targetCondition := ScheduledEventCondition(*eventResponse) | ||
| if currentCondition == nil || targetCondition.Message != currentCondition.Message { | ||
| klog.Infof("Scheduled events found for node %s, updating condition to %s", ec.nodeName, targetCondition.Status) | ||
| return utilnode.SetNodeCondition(ec.kubeClient, types.NodeName(ec.nodeName), targetCondition) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (ec *EventChecker) GetNodeEventCondition(conditions []v1.NodeCondition) *v1.NodeCondition { | ||
| for _, condition := range conditions { | ||
| if condition.Type == conditionType { | ||
| return &condition | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func ScheduledEventCondition(event provider.EventResponse) v1.NodeCondition { | ||
| nodeCondition := v1.NodeCondition{ | ||
| Type: conditionType, | ||
| Status: v1.ConditionTrue, | ||
| LastTransitionTime: metav1.Time{Time: time.Now()}, | ||
| Reason: "ScheduledEventExists", | ||
| } | ||
| var messages []string | ||
| for _, ev := range event.Events { | ||
| messages = append(messages, fmt.Sprintf("EventID: %s, EventSource: %s, EventType: %s, NotBefore: %s, Duration: %d seconds, Description: %s", ev.EventID, ev.EventSource, ev.EventType, ev.NotBefore, ev.DurationInSeconds, ev.Description)) | ||
| } | ||
| nodeCondition.Message = strings.Join(messages, "; ") | ||
| return nodeCondition | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -108,6 +108,23 @@ type InstanceMetadataService struct { | |
| imsCache azcache.Resource | ||
| } | ||
|
|
||
| type Event struct { | ||
| Description string `json:"Description,omitempty"` | ||
| DurationInSeconds int `json:"DurationInSeconds,omitempty"` | ||
| EventID string `json:"EventId,omitempty"` | ||
| EventSource string `json:"EventSource,omitempty"` | ||
| EventStatus string `json:"EventStatus,omitempty"` | ||
| EventType string `json:"EventType,omitempty"` | ||
| NotBefore string `json:"NotBefore,omitempty"` | ||
| ResourceType string `json:"ResourceType,omitempty"` | ||
| Resources []string `json:"Resources,omitempty"` | ||
| } | ||
|
|
||
| type EventResponse struct { | ||
| DocumentIncarnation int `json:"DocumentIncarnation"` | ||
| Events []Event `json:"Events"` | ||
| } | ||
|
|
||
| // NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object. | ||
| func NewInstanceMetadataService(imdsServer string) (*InstanceMetadataService, error) { | ||
| ims := &InstanceMetadataService{ | ||
|
|
@@ -255,6 +272,42 @@ func (ims *InstanceMetadataService) getLoadBalancerMetadata() (*LoadBalancerMeta | |
| return &obj, nil | ||
| } | ||
|
|
||
| func (ims *InstanceMetadataService) GetScheduledEvents() (*EventResponse, error) { | ||
| req, err := http.NewRequest("GET", ims.imdsServer+consts.ImdsScheduledEventsURI, nil) | ||
| if err != nil { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would the request fail if there are no scheduled events? And would it fail during node bootstrap? I'm wondering whether we should skip the errors in the caller side as failures here may block node lifecycle logic |
||
| return nil, err | ||
| } | ||
| req.Header.Add("Metadata", "True") | ||
| req.Header.Add("User-Agent", "golang/kubernetes-cloud-provider") | ||
| q := req.URL.Query() | ||
| q.Add("format", "json") | ||
| q.Add("api-version", consts.ImdsScheduledEventsAPIVersion) | ||
| req.URL.RawQuery = q.Encode() | ||
|
|
||
| client := &http.Client{} | ||
| resp, err := client.Do(req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
| return nil, fmt.Errorf("failure of getting scheduled events with response %q", resp.Status) | ||
| } | ||
|
|
||
| data, err := io.ReadAll(resp.Body) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| er := EventResponse{} | ||
| if err := json.Unmarshal(data, &er); err != nil { | ||
| return nil, err | ||
| } | ||
| return &er, nil | ||
|
|
||
| } | ||
|
|
||
| // GetMetadata gets instance metadata from cache. | ||
| // crt determines if we can get data from stalled cache/need fresh if cache expired. | ||
| func (ims *InstanceMetadataService) GetMetadata(ctx context.Context, crt azcache.AzureCacheReadType) (*InstanceMetadata, error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ import ( | |
| "strings" | ||
|
|
||
| "sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader" | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert this file? |
||
| "sigs.k8s.io/cloud-provider-azure/pkg/consts" | ||
| ) | ||
|
|
||
|
|
@@ -133,7 +134,7 @@ type Config struct { | |
| // `nodeIP`: vm private IPs will be attached to the inbound backend pool of the load balancer; | ||
| // `podIP`: pod IPs will be attached to the inbound backend pool of the load balancer (not supported yet). | ||
| LoadBalancerBackendPoolConfigurationType string `json:"loadBalancerBackendPoolConfigurationType,omitempty" yaml:"loadBalancerBackendPoolConfigurationType,omitempty"` | ||
| // PutVMSSVMBatchSize defines how many requests the client send concurrently when putting the VMSS VMs. | ||
| // PutVMSSVMBatchSize defines how many reque hssts the client send concurrently when putting the VMSS VMs. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 |
||
| // If it is smaller than or equal to one, the request will be sent one by one in sequence (default). | ||
| PutVMSSVMBatchSize int `json:"putVMSSVMBatchSize" yaml:"putVMSSVMBatchSize"` | ||
| // PrivateLinkServiceResourceGroup determines the specific resource group of the private link services user want to use | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we default to false, and enable as needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. we'd need user to enable this feature by the new flag