-
Notifications
You must be signed in to change notification settings - Fork 41
🚀 Speculative agent autoscaling #664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,10 @@ import ( | |
| appv1alpha2 "github.com/hashicorp/hcp-terraform-operator/api/v1alpha2" | ||
| ) | ||
|
|
||
| type AgentPoolControllerAutoscaling interface { | ||
| pendingWorkspaceRuns(ctx context.Context, ap *agentPoolInstance) (int32, error) | ||
| } | ||
|
|
||
| // userInteractionRunStatuses contains run statuses that require user interaction. | ||
| var userInteractionRunStatuses = map[tfc.RunStatus]struct{}{ | ||
| tfc.RunCostEstimated: {}, | ||
|
|
@@ -61,9 +65,10 @@ func matchWildcardName(wildcard string, str string) bool { | |
| } | ||
| } | ||
|
|
||
| // pendingWorkspaceRuns returns the number of workspaces with pending runs for a given agent pool. | ||
| // pendingWorkspaceRuns returns the number of agents needed to execute current pending runs for a given agent pool. | ||
| // If there are no plan-only runs in the list of current pending runs for a workspace this functoion returns the number of workspaces. | ||
| // This function is compatible with HCP Terraform and TFE version v202409-1 and later. | ||
| func pendingWorkspaceRuns(ctx context.Context, ap *agentPoolInstance) (int32, error) { | ||
| func (ap *agentPoolInstance) pendingWorkspaceRuns(ctx context.Context) (int32, error) { | ||
| runs := map[string]struct{}{} | ||
| awaitingUserInteractionRuns := map[string]int{} // Track runs awaiting user interaction by status for future metrics | ||
| listOpts := &tfc.RunListForOrganizationOptions{ | ||
|
|
@@ -74,8 +79,9 @@ func pendingWorkspaceRuns(ctx context.Context, ap *agentPoolInstance) (int32, er | |
| PageNumber: initPageNumber, | ||
| }, | ||
| } | ||
|
|
||
| planOnlyRunCount := 0 | ||
| for { | ||
| ap.log.Info("Fetching runs for organization", "org", ap.instance.Spec.Organization, "page", listOpts.PageNumber) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove this message; it seems to be redundant here. |
||
| runsList, err := ap.tfClient.Client.Runs.ListForOrganization(ctx, ap.instance.Spec.Organization, listOpts) | ||
| if err != nil { | ||
| return 0, err | ||
|
|
@@ -87,6 +93,11 @@ func pendingWorkspaceRuns(ctx context.Context, ap *agentPoolInstance) (int32, er | |
| awaitingUserInteractionRuns[string(run.Status)]++ | ||
| continue | ||
| } | ||
| // Count plan-only runs separately so agents can scale up and execute runs parallely | ||
| if run.PlanOnly { | ||
| planOnlyRunCount++ | ||
| continue | ||
| } | ||
| runs[run.Workspace.ID] = struct{}{} | ||
| } | ||
| if runsList.NextPage == 0 { | ||
|
|
@@ -97,13 +108,14 @@ func pendingWorkspaceRuns(ctx context.Context, ap *agentPoolInstance) (int32, er | |
|
|
||
| // TODO: | ||
| // Add metric(s) for runs awaiting user interaction | ||
|
|
||
| return int32(len(runs)), nil | ||
| agentsCount := len(runs) + planOnlyRunCount | ||
| ap.log.Info("Workspaces and plan-only runs count", "msg", fmt.Sprintf("Workspaces: %+v Plan-only runs: %d Total agents: %d", runs, planOnlyRunCount, agentsCount)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For all log messages, we keep a context. For instance, for the autoscaling part, all info and error messages should follow the following template: ap.log.Info("Reconcile Agent Autoscaling", "msg", <MESSAGE>)
ap.log.Error(err, "Reconcile Agent Autoscaling", "msg", <MESSAGE>)For simplicity, the messages could as simple as The summary is logged below once this function returns, and it currently logs the number of workspaces with pending runs. It will be worth rephrasing that message and keeping the name 'runs' rather than 'agents'. Agents are calculated later. |
||
| return int32(agentsCount), nil | ||
| } | ||
|
|
||
| // computeRequiredAgents is a legacy algorithm that is used to compute the number of agents needed. | ||
| // It is used when the TFE version is less than v202409-1. | ||
| func computeRequiredAgents(ctx context.Context, ap *agentPoolInstance) (int32, error) { | ||
| func (ap *agentPoolInstance) computeRequiredAgents(ctx context.Context) (int32, error) { | ||
| required := 0 | ||
| // NOTE: | ||
| // - Two maps are used here to simplify target workspace searching by ID, name, and wildcard. | ||
|
|
@@ -242,7 +254,7 @@ func (r *AgentPoolReconciler) reconcileAgentAutoscaling(ctx context.Context, ap | |
|
|
||
| requiredAgents, err := func() (int32, error) { | ||
| if ap.tfClient.Client.IsCloud() { | ||
| return pendingWorkspaceRuns(ctx, ap) | ||
| return ap.pendingWorkspaceRuns(ctx) | ||
| } | ||
| tfeVersion := ap.tfClient.Client.RemoteTFEVersion() | ||
| useRunsEndpoint, err := validateTFEVersion(tfeVersion) | ||
|
|
@@ -256,10 +268,10 @@ func (r *AgentPoolReconciler) reconcileAgentAutoscaling(ctx context.Context, ap | |
| // It now allows retrieving a list of runs for the organization. | ||
| if useRunsEndpoint { | ||
| ap.log.Info("Reconcile Agent Autoscaling", "msg", fmt.Sprintf("Proceeding with the new algorithm based on the detected TFE version %s", tfeVersion)) | ||
| return pendingWorkspaceRuns(ctx, ap) | ||
| return ap.pendingWorkspaceRuns(ctx) | ||
| } | ||
| ap.log.Info("Reconcile Agent Autoscaling", "msg", fmt.Sprintf("Proceeding with the legacy algorithm based to the detected TFE version %s", tfeVersion)) | ||
| return computeRequiredAgents(ctx, ap) | ||
| return ap.computeRequiredAgents(ctx) | ||
| }() | ||
| if err != nil { | ||
| ap.log.Error(err, "Reconcile Agent Autoscaling", "msg", "Failed to get agents needed") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,17 +4,25 @@ | |
| package controller | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/go-logr/logr" | ||
| tfc "github.com/hashicorp/go-tfe" | ||
| "github.com/hashicorp/go-tfe/mocks" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This confuses me. Why do we need to import mocks from the |
||
| appv1alpha2 "github.com/hashicorp/hcp-terraform-operator/api/v1alpha2" | ||
| "github.com/hashicorp/hcp-terraform-operator/internal/pointer" | ||
| . "github.com/onsi/ginkgo/v2" | ||
| . "github.com/onsi/gomega" | ||
| gomock "go.uber.org/mock/gomock" | ||
| corev1 "k8s.io/api/core/v1" | ||
| "k8s.io/apimachinery/pkg/api/errors" | ||
| k8sapierrors "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| ) | ||
|
|
||
| var _ = Describe("Agent Pool controller", Ordered, func() { | ||
|
|
@@ -77,7 +85,7 @@ var _ = Describe("Agent Pool controller", Ordered, func() { | |
| Expect(k8sClient.Delete(ctx, instance)).To(Succeed()) | ||
| Eventually(func() bool { | ||
| err := k8sClient.Get(ctx, namespacedName, instance) | ||
| return errors.IsNotFound(err) | ||
| return k8sapierrors.IsNotFound(err) | ||
| }).Should(BeTrue()) | ||
| }) | ||
|
|
||
|
|
@@ -194,3 +202,87 @@ var _ = Describe("Agent Pool controller", Ordered, func() { | |
| }) | ||
| }) | ||
| }) | ||
|
|
||
| func TestPendingWorkspaceRuns(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| mockRuns []*tfc.Run | ||
| mockErr error | ||
| expectedCount int32 | ||
| expectError bool | ||
| }{ | ||
| { | ||
| name: "returns error from client", | ||
| mockErr: errors.New("api error"), | ||
| expectedCount: 0, | ||
| expectError: true, | ||
| }, | ||
| { | ||
| name: "counts plan-only runs", | ||
| mockRuns: []*tfc.Run{ | ||
| {ID: "run1", PlanOnly: true, Status: tfc.RunPlanning, Workspace: &tfc.Workspace{ID: "ws1"}}, | ||
| {ID: "run2", PlanOnly: true, Status: tfc.RunPlanning, Workspace: &tfc.Workspace{ID: "ws2"}}, | ||
| }, | ||
| expectedCount: 2, | ||
| expectError: false, | ||
| }, | ||
| { | ||
| name: "skips user interaction runs", | ||
| mockRuns: []*tfc.Run{ | ||
| {ID: "run1", PlanOnly: false, Status: tfc.RunPlanned, Workspace: &tfc.Workspace{ID: "ws1"}}, | ||
| {ID: "run2", PlanOnly: false, Status: tfc.RunPolicyOverride, Workspace: &tfc.Workspace{ID: "ws2"}}, | ||
| }, | ||
| expectedCount: 0, | ||
| expectError: false, | ||
| }, | ||
| { | ||
| name: "counts normal pending runs", | ||
| mockRuns: []*tfc.Run{ | ||
| {ID: "run1", PlanOnly: false, Status: tfc.RunPlanning, Workspace: &tfc.Workspace{ID: "ws1"}}, | ||
| {ID: "run2", PlanOnly: false, Status: tfc.RunPlanning, Workspace: &tfc.Workspace{ID: "ws2"}}, | ||
| }, | ||
| expectedCount: 2, | ||
| expectError: false, | ||
| }, | ||
| { | ||
| name: "mix of plan-only and normal runs", | ||
| mockRuns: []*tfc.Run{ | ||
| {ID: "run1", PlanOnly: true, Status: tfc.RunPlanning, Workspace: &tfc.Workspace{ID: "ws1"}}, | ||
| {ID: "run2", PlanOnly: false, Status: tfc.RunPlanning, Workspace: &tfc.Workspace{ID: "ws2"}}, | ||
| }, | ||
| expectedCount: 2, | ||
| expectError: false, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| defer ctrl.Finish() | ||
|
|
||
| mockRuns := mocks.NewMockRuns(ctrl) | ||
| mockRuns.EXPECT(). | ||
| ListForOrganization(gomock.Any(), "test-org", gomock.Any()). | ||
| Return(&tfc.RunList{Items: tt.mockRuns, Pagination: &tfc.Pagination{NextPage: 0}}, tt.mockErr) | ||
|
|
||
| ap := &agentPoolInstance{ | ||
| tfClient: HCPTerraformClient{Client: &tfc.Client{Runs: mockRuns}}, | ||
| instance: appv1alpha2.AgentPool{ | ||
| Spec: appv1alpha2.AgentPoolSpec{ | ||
| Name: "test-pool", | ||
| Organization: "test-org", | ||
| }, | ||
| }, | ||
| log: logr.Logger{}, | ||
| } | ||
|
|
||
| count, err := ap.pendingWorkspaceRuns(context.Background()) | ||
| if tt.expectError { | ||
| assert.Error(t, err) | ||
| } else { | ||
| assert.NoError(t, err) | ||
| assert.Equal(t, tt.expectedCount, count) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we stick to the number of pending runs only in the description? I would suggest either removing the second line or rephrasing it, as it’s confusing in its current form.