diff --git a/README.md b/README.md index 7a8ba93..1dc0f62 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,24 @@ End-to-end tests for Deckhouse storage components. 5. Write your test in `tests//_test.go` (Section marked `---=== TESTS START HERE ===---`) 6. Run the test: `go test -timeout=240m -v ./tests/ -count=1` +### Run using an existing cluster (no VM creation) + +Use this mode to run tests against a cluster that is already running (faster iterations, no virtualization/VM setup). + +1. Set cluster creation mode to use existing cluster: + ```bash + export TEST_CLUSTER_CREATE_MODE=alwaysUseExisting + ``` +2. Point SSH to the **test cluster** (the Kubernetes API master you want to run tests on): + - **Direct access:** `SSH_HOST` = IP/hostname of the cluster master, `SSH_USER` = user that can run `sudo cat /etc/kubernetes/admin.conf` on that host. + - **Via jump host:** set `SSH_JUMP_HOST`, `SSH_JUMP_USER`, `SSH_JUMP_KEY_PATH` (optional); `SSH_HOST`/`SSH_USER` are the target cluster master. +3. Source the rest of your test env (e.g. `source tests//test_exports`), then run: + ```bash + go test -timeout=240m -v ./tests/ -count=1 + ``` + +Kubeconfig is written to `temp//` (e.g. `temp/sds_node_configurator_test/kubeconfig-.yml`). The framework acquires a cluster lock so only one test run uses the cluster at a time. If a previous run left the lock (crash, Ctrl+C), set `TEST_CLUSTER_FORCE_LOCK_RELEASE=true` for the next run (do not use if another test might be using the cluster). + The `-count=1` flag prevents Go from using cached test results. Timeout `240m` is a global timeout for entire testkit. Adjust it on your needs. @@ -71,6 +89,7 @@ See [pkg/FUNCTIONS_GLOSSARY.md](pkg/FUNCTIONS_GLOSSARY.md) for a full list of al - `SSH_PUBLIC_KEY` -- Path to SSH public key file, or plain-text key content. Default: `~/.ssh/id_rsa.pub` - `SSH_PASSPHRASE` -- Passphrase for the SSH private key. Required for non-interactive mode with encrypted keys - `SSH_VM_USER` -- SSH user for connecting to VMs deployed inside the test cluster. Default: `cloud` +- `SSH_VM_PASSWORD` -- Password for SSH to VMs (e.g. `cloud`) when connecting from jump host for lsblk checks. If set, uses `sshpass`; leave empty for key-based auth. Required when VMs accept only password auth. - `SSH_JUMP_HOST` -- Jump host address for connecting to clusters behind a bastion - `SSH_JUMP_USER` -- Jump host SSH user. Defaults to `SSH_USER` if jump host is set - `SSH_JUMP_KEY_PATH` -- Jump host SSH key path. Defaults to `SSH_PRIVATE_KEY` if jump host is set @@ -79,8 +98,10 @@ See [pkg/FUNCTIONS_GLOSSARY.md](pkg/FUNCTIONS_GLOSSARY.md) for a full list of al - `YAML_CONFIG_FILENAME` -- Filename of the cluster definition YAML. Default: `cluster_config.yml` - `TEST_CLUSTER_CLEANUP` -- Set to `true` to remove the test cluster after tests complete. Default: `false` +- `TEST_CLUSTER_RESUME` -- Set to `true` to continue from a previous failed run (only for `alwaysCreateNew`). If the test failed in the middle of cluster creation, re-run with `TEST_CLUSTER_RESUME=true`; the framework will load saved state from `temp//cluster-state.json` (written after step 6), restore VM hostnames, and run the remaining steps (connect to first master, add nodes, enable modules). Requires that step 6 (VMs created, VM info gathered) completed before the failure. - `TEST_CLUSTER_NAMESPACE` -- Namespace for DKP cluster deployment. Default: `e2e-test-cluster` - `KUBE_CONFIG_PATH` -- Path to a kubeconfig file. Used as fallback if SSH-based kubeconfig retrieval fails +- `KUBE_INSECURE_SKIP_TLS_VERIFY` -- Set to `true` to skip TLS certificate verification for the Kubernetes API (e.g. self-signed certs or tunnel to 127.0.0.1). Default: not set (verify TLS) - `IMAGE_PULL_POLICY` -- Image pull policy for ClusterVirtualImages: `Always` or `IfNotExists`. Default: `IfNotExists` ### Logging diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index f5a8154..8aed5df 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -183,7 +183,8 @@ func expandPath(path string) (string, error) { // and returns a rest.Config that can be used with Kubernetes clients, along with the path to the kubeconfig file. // If sshClient is provided, it will be used instead of creating a new connection. // If sshClient is nil, a new connection will be created and closed automatically. -func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClient ssh.SSHClient) (*rest.Config, string, error) { +// If kubeconfigOutputDir is non-empty, the kubeconfig file is written there; otherwise temp// is used. +func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClient ssh.SSHClient, kubeconfigOutputDir string) (*rest.Config, string, error) { // Create SSH client if not provided shouldClose := false if sshClient == nil { @@ -198,23 +199,24 @@ func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClien defer sshClient.Close() } - // Get the test file name from the caller - _, callerFile, _, ok := runtime.Caller(1) - if !ok { - return nil, "", fmt.Errorf("failed to get caller file information") - } - testFileName := strings.TrimSuffix(filepath.Base(callerFile), filepath.Ext(callerFile)) - - // Determine the temp directory path in the repo root - // callerFile is in tests/{test-dir}/, so we go up two levels to reach repo root - callerDir := filepath.Dir(callerFile) - repoRootPath := filepath.Join(callerDir, "..", "..") - // Resolve the .. parts to get absolute path - repoRoot, err := filepath.Abs(repoRootPath) - if err != nil { - return nil, "", fmt.Errorf("failed to resolve repo root path: %w", err) + var tempDir string + if kubeconfigOutputDir != "" { + tempDir = kubeconfigOutputDir + } else { + // Get the test file name from the caller (creates temp/cluster when called from pkg/cluster) + _, callerFile, _, ok := runtime.Caller(1) + if !ok { + return nil, "", fmt.Errorf("failed to get caller file information") + } + testFileName := strings.TrimSuffix(filepath.Base(callerFile), filepath.Ext(callerFile)) + callerDir := filepath.Dir(callerFile) + repoRootPath := filepath.Join(callerDir, "..", "..") + repoRoot, err := filepath.Abs(repoRootPath) + if err != nil { + return nil, "", fmt.Errorf("failed to resolve repo root path: %w", err) + } + tempDir = filepath.Join(repoRoot, "temp", testFileName) } - tempDir := filepath.Join(repoRoot, "temp", testFileName) // Create temp directory if it doesn't exist if err := os.MkdirAll(tempDir, 0755); err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index ef2ed88..5dde700 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,7 +53,7 @@ const ( // Kubernetes operations ModuleCheckTimeout = 10 * time.Second // Timeout for checking module status NamespaceTimeout = 30 * time.Second // Timeout for creating namespace - NodeGroupTimeout = 3 * time.Second // Timeout for creating NodeGroup + NodeGroupTimeout = 2 * time.Minute // Timeout for creating NodeGroup (API can be slow right after bootstrap) SecretsWaitTimeout = 2 * time.Minute // Timeout for waiting for bootstrap secrets to appear ClusterHealthTimeout = 15 * time.Minute // Timeout for cluster health check ModuleDeployTimeout = 15 * time.Minute // Timeout for waiting for ONE module to be ready diff --git a/internal/config/env.go b/internal/config/env.go index fa51fc2..7d3826b 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -69,12 +69,18 @@ var ( // SSH credentials to deploy to VM VMSSHUser = os.Getenv("SSH_VM_USER") VMSSHUserDefaultValue = "cloud" + // VMSSHPassword when set is used to SSH from jump host to VMs (cloud@vmIP) via sshpass. Leave empty for key-based auth. + VMSSHPassword = os.Getenv("SSH_VM_PASSWORD") // KubeConfigPath is the path to a kubeconfig file. If SSH retrieval fails (e.g., sudo requires password), // this path will be used as a fallback. If not set and SSH fails, the user will be notified to download // the kubeconfig manually and set this environment variable, test will fail. KubeConfigPath = os.Getenv("KUBE_CONFIG_PATH") + // KubeInsecureSkipTLSVerify when set to "true" disables TLS certificate verification for Kubernetes API + // (e.g. when using self-signed certs or connecting via tunnel to 127.0.0.1). Default: "false". + KubeInsecureSkipTLSVerify = os.Getenv("KUBE_INSECURE_SKIP_TLS_VERIFY") + // TestClusterCreateMode specifies the cluster creation mode. Must be set to either "alwaysUseExisting" or "alwaysCreateNew". If not set, test will fail. TestClusterCreateMode = os.Getenv("TEST_CLUSTER_CREATE_MODE") @@ -87,6 +93,16 @@ var ( TestClusterNamespace = os.Getenv("TEST_CLUSTER_NAMESPACE") TestClusterNamespaceDefaultValue = "e2e-test-cluster" + // TestClusterForceLockRelease when set to "true" or "True" (only for alwaysUseExisting) forces release of an + // existing cluster lock before acquiring. Use when a previous run left the lock (e.g. crash, Ctrl+C). + // Do not use if another test might be using the cluster. + TestClusterForceLockRelease = os.Getenv("TEST_CLUSTER_FORCE_LOCK_RELEASE") + + // TestClusterResume when set to "true" or "True" (only for alwaysCreateNew) tries to continue from a previous + // failed run: if state was saved after step 6 (VMs created, IPs gathered), connects to the first master and + // runs remaining steps (add nodes, enable modules). Set to "true" and re-run the test after a mid-deploy failure. + TestClusterResume = os.Getenv("TEST_CLUSTER_RESUME") + // TestClusterStorageClass specifies the storage class for DKP cluster deployment TestClusterStorageClass = os.Getenv("TEST_CLUSTER_STORAGE_CLASS") //TestClusterStorageClassDefaultValue = "rsc-test-r2-local" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ce64f24..5ba3530 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -95,6 +95,91 @@ type TestClusterResources struct { SetupSSHClient ssh.SSHClient // Setup node SSH client (for cleanup) } +// resumeState is saved after step 6 (VMs created, IPs gathered) so a failed run can be continued with TEST_CLUSTER_RESUME=true. +type resumeState struct { + FirstMasterIP string `json:"first_master_ip"` + Namespace string `json:"namespace"` + VMNames []string `json:"vm_names"` + SetupVMName string `json:"setup_vm_name"` + MasterHostnames []string `json:"master_hostnames"` + WorkerHostnames []string `json:"worker_hostnames"` +} + +// getClusterStatePath returns temp//cluster-state.json +// (same dir as PrepareBootstrapConfig and cluster-state.json; used for both save and resume). +func getClusterStatePath(testFilePath string) (string, error) { + callerDir := filepath.Dir(testFilePath) + repoRoot, err := filepath.Abs(filepath.Join(callerDir, "..", "..")) + if err != nil { + return "", err + } + testFileName := strings.TrimSuffix(filepath.Base(testFilePath), filepath.Ext(testFilePath)) + return filepath.Join(repoRoot, "temp", testFileName, "cluster-state.json"), nil +} + +// getTestTempDirFromStack returns temp// by walking the call stack +// until a path containing "/tests/" is found. Used when testFilePath is not available (e.g. UseExistingCluster). +func getTestTempDirFromStack() (string, error) { + for i := 1; i <= 20; i++ { + _, file, _, ok := runtime.Caller(i) + if !ok { + break + } + if !strings.Contains(filepath.ToSlash(file), "/tests/") { + continue + } + dir := filepath.Dir(file) + for filepath.Base(dir) != "tests" { + parent := filepath.Dir(dir) + if parent == dir { + break + } + dir = parent + } + if filepath.Base(dir) != "tests" { + continue + } + repoRoot := filepath.Dir(dir) + testFileName := strings.TrimSuffix(filepath.Base(file), filepath.Ext(file)) + return filepath.Join(repoRoot, "temp", testFileName), nil + } + return "", fmt.Errorf("could not determine test temp dir from call stack") +} + +func saveClusterState(testFilePath string, state *resumeState) error { + path, err := getClusterStatePath(testFilePath) + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, data, 0600) +} + +func loadClusterState(testFilePath string) (*resumeState, error) { + path, err := getClusterStatePath(testFilePath) + if err != nil { + return nil, err + } + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var state resumeState + if err := json.Unmarshal(data, &state); err != nil { + return nil, err + } + if state.FirstMasterIP == "" || state.Namespace == "" || len(state.VMNames) == 0 { + return nil, fmt.Errorf("invalid cluster state: missing required fields") + } + return &state, nil +} + // loadClusterConfigFromPath loads and validates a cluster configuration from a specific file path func loadClusterConfigFromPath(configPath string) (*config.ClusterDefinition, error) { // Read the YAML file @@ -156,33 +241,62 @@ func CreateTestCluster( ) (*TestClusterResources, error) { logger.Step(1, "Loading cluster configuration from %s", yamlConfigFilename) - // Get the test file's directory (the caller of CreateTestCluster, which is the test file) - // runtime.Caller(1) gets the immediate caller (the test file that called CreateTestCluster) - _, callerFile, _, ok := runtime.Caller(1) - if !ok { - return nil, fmt.Errorf("failed to determine test file path") + // Find the test package directory and test file path by walking the call stack. + // CreateTestCluster is called from CreateOrConnectToTestCluster in cluster.go, so Caller(1) is not the test file. + var testDir string + var testFilePath string + for skip := 1; skip <= 10; skip++ { + _, callerFile, _, ok := runtime.Caller(skip) + if !ok { + break + } + if strings.Contains(filepath.ToSlash(callerFile), "/tests/") { + testDir = filepath.Dir(callerFile) + testFilePath = callerFile + break + } + } + if testDir == "" { + return nil, fmt.Errorf("failed to determine test directory (no caller under tests/)") } - testDir := filepath.Dir(callerFile) yamlConfigPath := filepath.Join(testDir, yamlConfigFilename) - logger.Debug("Test file directory: %s", testDir) + logger.Debug("Test directory: %s", testDir) logger.Debug("Config file path: %s", yamlConfigPath) - // Step 1: Load cluster configuration from YAML - // LoadClusterConfig uses runtime.Caller(1) which would get this function, not the test file - // So we need to load it directly from the path + // Step 1: Load cluster configuration from YAML (from test directory, e.g. tests/sds-node-configurator/cluster_config.yml) clusterDefinition, err := loadClusterConfigFromPath(yamlConfigPath) if err != nil { return nil, fmt.Errorf("failed to load cluster configuration: %w", err) } logger.StepComplete(1, "Cluster configuration loaded successfully from %s", yamlConfigPath) - // Randomize hostnames to avoid SAN initiator collisions. - // SANs remember iSCSI initiator names keyed by hostname; reusing the same hostnames - // (master-1, worker-1, etc.) across cluster recreations causes stale initiator mappings. - // Each node gets its own unique suffix to minimize collision likelihood. - randomizeHostnames(clusterDefinition) - logger.Info("Cluster hostnames randomized: masters=%v, workers=%v", + var resumeMode bool + if config.TestClusterResume == "true" || config.TestClusterResume == "True" { + state, loadErr := loadClusterState(testFilePath) + if loadErr == nil { + resumeMode = true + logger.Info("Resume mode: restoring hostnames from saved state (first_master_ip=%s)", state.FirstMasterIP) + for i, h := range state.MasterHostnames { + if i < len(clusterDefinition.Masters) { + clusterDefinition.Masters[i].Hostname = h + } + } + for i, h := range state.WorkerHostnames { + if i < len(clusterDefinition.Workers) { + clusterDefinition.Workers[i].Hostname = h + } + } + } else { + statePath, _ := getClusterStatePath(testFilePath) + logger.Info("TEST_CLUSTER_RESUME is set but cluster state not found or invalid: %v (tried %s)", loadErr, statePath) + } + } + if !resumeMode { + // Randomize hostnames to avoid SAN initiator collisions. + randomizeHostnames(clusterDefinition) + } + logger.Info("Cluster hostnames: masters=%v, workers=%v", func() []string { names := make([]string, len(clusterDefinition.Masters)) for i, m := range clusterDefinition.Masters { @@ -206,14 +320,36 @@ func CreateTestCluster( return nil, fmt.Errorf("failed to get SSH private key path: %w", err) } + useJumpHost := config.SSHJumpHost != "" + var jumpUser, jumpHost, jumpKeyPath string + if useJumpHost { + jumpUser = config.SSHJumpUser + if jumpUser == "" { + jumpUser = sshUser + } + jumpHost = config.SSHJumpHost + jumpKeyPath = config.SSHJumpKeyPath + if jumpKeyPath == "" { + jumpKeyPath = sshKeyPath + } + logger.Info("Using jump host %s@%s to connect to base cluster %s@%s", jumpUser, jumpHost, sshUser, sshHost) + } + logger.Step(2, "Connecting to base cluster %s@%s", sshUser, sshHost) - // Step 2: Connect to base cluster - baseClusterResources, err := ConnectToCluster(ctx, ConnectClusterOptions{ - SSHUser: sshUser, - SSHHost: sshHost, - SSHKeyPath: sshKeyPath, - UseJumpHost: false, - }) + // Step 2: Connect to base cluster (kubeconfig written to test temp dir to avoid temp/cluster) + var kubeconfigDir string + if path, pathErr := getClusterStatePath(testFilePath); pathErr == nil { + kubeconfigDir = filepath.Dir(path) + } + baseConnectOpts := ConnectClusterOptions{SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, UseJumpHost: useJumpHost, KubeconfigOutputDir: kubeconfigDir} + if useJumpHost { + baseConnectOpts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, + UseJumpHost: true, TargetUser: sshUser, TargetHost: sshHost, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + } + baseClusterResources, err := ConnectToCluster(ctx, baseConnectOpts) if err != nil { return nil, fmt.Errorf("failed to connect to base cluster: %w", err) } @@ -249,6 +385,57 @@ func CreateTestCluster( } logger.StepComplete(4, "Test namespace created") + if resumeMode { + // Resume path: cluster is already fully created and ready. Only connect to it and return resources. + // No GatherVMInfo, NodeGroup, AddNodes, EnableModules — assume "should create test cluster" already completed. + state, _ := loadClusterState(testFilePath) + namespace = state.Namespace + baseKubeconfig := baseClusterResources.Kubeconfig + baseKubeconfigPath := baseClusterResources.KubeconfigPath + baseTunnelInfo := baseClusterResources.TunnelInfo + vmResources := &VMResources{VMNames: state.VMNames, SetupVMName: state.SetupVMName} + + if len(clusterDefinition.Masters) > 0 { + clusterDefinition.Masters[0].IPAddress = state.FirstMasterIP + } + firstMasterIP := state.FirstMasterIP + if firstMasterIP == "" { + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("resume: first_master_ip missing in cluster state") + } + + logger.Step(1, "Resume: connecting to existing test cluster master %s", firstMasterIP) + testConnectOpts := ConnectClusterOptions{ + SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + if useJumpHost { + testConnectOpts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + } + testClusterResources, err := ConnectToCluster(ctx, testConnectOpts) + if err != nil { + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("resume: connect to test cluster: %w", err) + } + logger.StepComplete(1, "Resume: connected to test cluster") + + testClusterResources.ClusterDefinition = clusterDefinition + testClusterResources.VMResources = vmResources + testClusterResources.BaseClusterClient = baseClusterResources.SSHClient + testClusterResources.BaseKubeconfig = baseKubeconfig + testClusterResources.BaseKubeconfigPath = baseKubeconfigPath + testClusterResources.BaseTunnelInfo = baseTunnelInfo + testClusterResources.SetupSSHClient = nil + return testClusterResources, nil + } + logger.Step(5, "Creating virtual machines (this may take up to %v)", config.VMCreationTimeout) // Step 5: Create virtualization client and virtual machines virtCtx, cancel := context.WithTimeout(ctx, config.VMCreationTimeout) @@ -328,7 +515,7 @@ func CreateTestCluster( logger.Step(6, "Gathering VM information") // Step 6: Gather VM information gatherCtx, cancel := context.WithTimeout(ctx, config.VMInfoTimeout) - err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources) + err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources, nil) cancel() if err != nil { baseClusterResources.SSHClient.Close() @@ -337,6 +524,26 @@ func CreateTestCluster( } logger.StepComplete(6, "VM information gathered") + // Save resume state so a failed run can be continued with TEST_CLUSTER_RESUME=true + masterHostnames := make([]string, len(clusterDefinition.Masters)) + for i, m := range clusterDefinition.Masters { + masterHostnames[i] = m.Hostname + } + workerHostnames := make([]string, len(clusterDefinition.Workers)) + for i, w := range clusterDefinition.Workers { + workerHostnames[i] = w.Hostname + } + if err := saveClusterState(testFilePath, &resumeState{ + FirstMasterIP: clusterDefinition.Masters[0].IPAddress, + Namespace: namespace, + VMNames: vmNames, + SetupVMName: vmResources.SetupVMName, + MasterHostnames: masterHostnames, + WorkerHostnames: workerHostnames, + }); err != nil { + logger.Warn("Failed to save resume state: %v", err) + } + // Step 7: Get setup node IP and wait for SSH readiness setupNode, err := GetSetupNode(clusterDefinition) if err != nil { @@ -363,11 +570,12 @@ func CreateTestCluster( logger.StepComplete(7, "SSH is ready on setup node %s", setupNodeIP) logger.Step(8, "Establishing SSH connection to setup node") - // Step 8: Establish SSH connection to setup node - setupSSHClient, err := ssh.NewClientWithJumpHost( - sshUser, sshHost, sshKeyPath, // jump host - config.VMSSHUser, setupNodeIP, sshKeyPath, // target host - ) + // Step 8: Establish SSH connection to setup node (first hop: jump host or base cluster) + hopUser, hopHost, hopKey := sshUser, sshHost, sshKeyPath + if useJumpHost { + hopUser, hopHost, hopKey = jumpUser, jumpHost, jumpKeyPath + } + setupSSHClient, err := ssh.NewClientWithJumpHost(hopUser, hopHost, hopKey, config.VMSSHUser, setupNodeIP, sshKeyPath) if err != nil { baseClusterResources.SSHClient.Close() baseClusterResources.TunnelInfo.StopFunc() @@ -439,16 +647,20 @@ func CreateTestCluster( baseTunnelInfo := baseClusterResources.TunnelInfo logger.Step(14, "Connecting to test cluster master %s", firstMasterIP) - // Step 14: Connect to test cluster - testClusterResources, err := ConnectToCluster(ctx, ConnectClusterOptions{ - SSHUser: sshUser, - SSHHost: sshHost, - SSHKeyPath: sshKeyPath, - UseJumpHost: true, - TargetUser: config.VMSSHUser, - TargetHost: firstMasterIP, - TargetKeyPath: sshKeyPath, - }) + // Step 14: Connect to test cluster (first hop: jump host or base cluster) + testConnectOpts := ConnectClusterOptions{ + SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + if useJumpHost { + testConnectOpts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + } + testClusterResources, err := ConnectToCluster(ctx, testConnectOpts) if err != nil { setupSSHClient.Close() baseClusterResources.SSHClient.Close() @@ -639,6 +851,10 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { logger.StepComplete(1, "Connected to existing cluster successfully") logger.Step(2, "Acquiring cluster lock") + // Optionally force-release existing lock (e.g. previous run left it) + if config.TestClusterForceLockRelease == "true" || config.TestClusterForceLockRelease == "True" { + _ = ForceReleaseClusterLock(ctx, clusterResources.Kubeconfig) + } // Acquire cluster lock to ensure exclusive access // Use a descriptive test name from environment or generate one testName := config.TestClusterNamespace @@ -700,6 +916,91 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { } logger.StepComplete(3, "Cluster is healthy") + // When using jump host, the jump host is the base cluster (with Deckhouse virtualization). + // Get its kubeconfig so tests can create VirtualDisks and attach to VMs. + if config.SSHJumpHost != "" { + logger.Step(4, "Getting base cluster kubeconfig from jump host (for VirtualDisk)") + jumpUser := config.SSHJumpUser + if jumpUser == "" { + jumpUser = sshUser + } + jumpKeyPath := config.SSHJumpKeyPath + if jumpKeyPath == "" { + jumpKeyPath = sshKeyPath + } + baseSSHClient, connErr := ssh.NewClient(jumpUser, config.SSHJumpHost, jumpKeyPath) + if connErr != nil { + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to connect to base cluster (jump host %s): %w", config.SSHJumpHost, connErr) + } + baseTunnel, tunnelErr := ssh.EstablishSSHTunnel(context.Background(), baseSSHClient, "6445") + if tunnelErr != nil { + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to establish base cluster tunnel: %w", tunnelErr) + } + kubeconfigDir, dirErr := getTestTempDirFromStack() + if dirErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to get test temp dir for kubeconfig: %w", dirErr) + } + _, baseKubeconfigPath, kubeErr := internalcluster.GetKubeconfig(ctx, config.SSHJumpHost, jumpUser, jumpKeyPath, baseSSHClient, kubeconfigDir) + if kubeErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to get base cluster kubeconfig from jump host: %w", kubeErr) + } + if updateErr := internalcluster.UpdateKubeconfigPort(baseKubeconfigPath, baseTunnel.LocalPort); updateErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to update base cluster kubeconfig port: %w", updateErr) + } + baseKubeconfig, buildErr := clientcmd.BuildConfigFromFlags("", baseKubeconfigPath) + if buildErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to build base cluster rest config: %w", buildErr) + } + if config.KubeInsecureSkipTLSVerify == "true" || config.KubeInsecureSkipTLSVerify == "True" { + baseKubeconfig.TLSClientConfig.Insecure = true + } + configureExtendedTimeouts(baseKubeconfig) + clusterResources.BaseClusterClient = baseSSHClient + clusterResources.BaseKubeconfig = baseKubeconfig + clusterResources.BaseKubeconfigPath = baseKubeconfigPath + clusterResources.BaseTunnelInfo = baseTunnel + logger.StepComplete(4, "Base cluster kubeconfig ready (VirtualDisk operations)") + } + logger.Success("Existing cluster is ready for use") return clusterResources, nil } @@ -1457,8 +1758,12 @@ func CleanupTestCluster(ctx context.Context, resources *TestClusterResources) er logger.Step(6, "Stopping base cluster tunnel and closing SSH client") // Step 6: Stop base cluster tunnel and close base cluster SSH client - if baseTunnel != nil && baseTunnel.StopFunc != nil { - if err := baseTunnel.StopFunc(); err != nil { + baseTunnelToStop := baseTunnel + if baseTunnelToStop == nil && resources.BaseTunnelInfo != nil { + baseTunnelToStop = resources.BaseTunnelInfo // e.g. from UseExistingCluster with SSH_JUMP_HOST + } + if baseTunnelToStop != nil && baseTunnelToStop.StopFunc != nil { + if err := baseTunnelToStop.StopFunc(); err != nil { errs = append(errs, fmt.Errorf("failed to stop base cluster SSH tunnel: %w", err)) logger.Error("Failed to stop base cluster SSH tunnel: %v", err) } else { @@ -1739,6 +2044,9 @@ type ConnectClusterOptions struct { TargetUser string // Required when UseJumpHost is true TargetHost string // Required when UseJumpHost is true (IP or hostname) TargetKeyPath string // Optional: defaults to SSHKeyPath if empty + + // KubeconfigOutputDir if set saves kubeconfig to this dir instead of temp// (avoids temp/cluster) + KubeconfigOutputDir string } // ConnectToCluster establishes SSH connection to a cluster (base or test), @@ -1862,7 +2170,7 @@ func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClu } // Step 3: Get kubeconfig from cluster master - _, kubeconfigPath, err := internalcluster.GetKubeconfig(ctx, masterHost, masterUser, opts.SSHKeyPath, sshClient) + _, kubeconfigPath, err := internalcluster.GetKubeconfig(ctx, masterHost, masterUser, opts.SSHKeyPath, sshClient, opts.KubeconfigOutputDir) if err != nil { tunnelInfo.StopFunc() sshClient.Close() @@ -1883,6 +2191,9 @@ func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClu sshClient.Close() return nil, fmt.Errorf("failed to rebuild kubeconfig from file: %w", err) } + if config.KubeInsecureSkipTLSVerify == "true" || config.KubeInsecureSkipTLSVerify == "True" { + kubeconfig.TLSClientConfig.Insecure = true + } // Configure extended timeouts for tunnel-based connections configureExtendedTimeouts(kubeconfig) diff --git a/pkg/cluster/vms.go b/pkg/cluster/vms.go index 4ee1402..72f5cd1 100644 --- a/pkg/cluster/vms.go +++ b/pkg/cluster/vms.go @@ -639,27 +639,25 @@ type vmIPResult struct { hostname string } +// GatherVMInfoOptions optionally customizes GatherVMInfo behaviour. +// Pass nil for default (gather all VMs including setup). +type GatherVMInfoOptions struct { + // SkipSetupVM when true skips the setup/bootstrap VM. Use when resuming after Deckhouse is up: + // the bootstrap node is removed at that point and only master/worker VMs exist. + SkipSetupVM bool +} + // GatherVMInfo gathers IP addresses for all VMs in the cluster definition and fills them into ClusterDefinition. // This should be called once while connected to the base cluster, before switching to test cluster. // It modifies clusterDef in-place by setting IPAddress field for each VM node. -func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namespace string, clusterDef *config.ClusterDefinition, vmResources *VMResources) error { +// When opts.SkipSetupVM is true, the setup (bootstrap) VM is not queried and clusterDef.Setup is left unchanged. +func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namespace string, clusterDef *config.ClusterDefinition, vmResources *VMResources, opts *GatherVMInfoOptions) error { + if opts == nil { + opts = &GatherVMInfoOptions{} + } var wg sync.WaitGroup results := make(chan vmIPResult) - // Count total VMs to gather info for - totalVMs := 0 - for i := range clusterDef.Masters { - if clusterDef.Masters[i].HostType == config.HostTypeVM { - totalVMs++ - } - } - for i := range clusterDef.Workers { - if clusterDef.Workers[i].HostType == config.HostTypeVM { - totalVMs++ - } - } - totalVMs++ // setup node - // Gather info for all masters in parallel for i := range clusterDef.Masters { master := &clusterDef.Masters[i] @@ -686,14 +684,16 @@ func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namesp } } - // Gather info for setup node in parallel + // Gather info for setup node unless skipped (e.g. resume: bootstrap VM already removed) setupVMName := vmResources.SetupVMName - wg.Add(1) - go func() { - defer wg.Done() - ip, err := GetVMIPAddress(ctx, virtClient, namespace, setupVMName) - results <- vmIPResult{node: nil, ip: ip, err: err, hostname: setupVMName} - }() + if !opts.SkipSetupVM && setupVMName != "" { + wg.Add(1) + go func() { + defer wg.Done() + ip, err := GetVMIPAddress(ctx, virtClient, namespace, setupVMName) + results <- vmIPResult{node: nil, ip: ip, err: err, hostname: setupVMName} + }() + } // Close results channel when all goroutines complete go func() { @@ -716,15 +716,18 @@ func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namesp } } + if opts.SkipSetupVM { + // Do not touch clusterDef.Setup; bootstrap VM is gone + return nil + } + // Create or update clusterDef.Setup with the generated VM info if clusterDef.Setup == nil { - // Create setup node from DefaultSetupVM template setupNode := config.DefaultSetupVM setupNode.Hostname = setupVMName setupNode.IPAddress = setupIP clusterDef.Setup = &setupNode } else { - // Update existing setup node clusterDef.Setup.Hostname = setupVMName clusterDef.Setup.IPAddress = setupIP } diff --git a/pkg/kubernetes/nodes.go b/pkg/kubernetes/nodes.go index ab898da..9884693 100644 --- a/pkg/kubernetes/nodes.go +++ b/pkg/kubernetes/nodes.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" @@ -103,3 +104,21 @@ func WaitForNodesLabeled(ctx context.Context, kubeconfig *rest.Config, nodeNames return nil } + +// GetNodeInternalIP returns the InternalIP of the node in the given cluster. Empty string if not found. +func GetNodeInternalIP(ctx context.Context, kubeconfig *rest.Config, nodeName string) (string, error) { + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return "", fmt.Errorf("create clientset: %w", err) + } + node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("get node %s: %w", nodeName, err) + } + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + return addr.Address, nil + } + } + return "", fmt.Errorf("node %s has no InternalIP", nodeName) +} diff --git a/pkg/kubernetes/virtualdisk.go b/pkg/kubernetes/virtualdisk.go index 1086ac7..9846180 100644 --- a/pkg/kubernetes/virtualdisk.go +++ b/pkg/kubernetes/virtualdisk.go @@ -178,3 +178,63 @@ func WaitForVirtualDiskAttached(ctx context.Context, kubeconfig *rest.Config, na } } } + +// ListVirtualMachineNames returns names of VirtualMachines in the given namespace. +// Used to pick a VM when attaching a VirtualDisk (e.g. in alwaysUseExisting mode). +func ListVirtualMachineNames(ctx context.Context, kubeconfig *rest.Config, namespace string) ([]string, error) { + virtClient, err := virtualization.NewClient(ctx, kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create virtualization client: %w", err) + } + list, err := virtClient.VirtualMachines().List(ctx, namespace) + if err != nil { + return nil, fmt.Errorf("failed to list VirtualMachines in %s: %w", namespace, err) + } + names := make([]string, 0, len(list)) + for i := range list { + names = append(names, list[i].Name) + } + return names, nil +} + +// GetVMIPFromBaseCluster returns the IP address of a VirtualMachine in the base cluster (namespace). +// Used to SSH to the VM (e.g. cloud@ip) from the jump host to run lsblk on nested nodes. +func GetVMIPFromBaseCluster(ctx context.Context, baseKubeconfig *rest.Config, namespace, vmName string) (string, error) { + virtClient, err := virtualization.NewClient(ctx, baseKubeconfig) + if err != nil { + return "", fmt.Errorf("create virtualization client: %w", err) + } + vm, err := virtClient.VirtualMachines().Get(ctx, namespace, vmName) + if err != nil { + return "", fmt.Errorf("get VM %s/%s: %w", namespace, vmName, err) + } + if vm.Status.IPAddress == "" { + return "", fmt.Errorf("VM %s/%s has no IP in status yet", namespace, vmName) + } + return vm.Status.IPAddress, nil +} + +// DetachAndDeleteVirtualDisk deletes the VirtualMachineBlockDeviceAttachment and then the VirtualDisk. +// Use this for cleanup after a test. Errors are logged but not returned for "not found" (idempotent). +func DetachAndDeleteVirtualDisk(ctx context.Context, kubeconfig *rest.Config, namespace, attachmentName, diskName string) error { + virtClient, err := virtualization.NewClient(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create virtualization client: %w", err) + } + + if attachmentName != "" { + if err := virtClient.VirtualMachineBlockDeviceAttachments().Delete(ctx, namespace, attachmentName); err != nil { + logger.Warn("Failed to delete VirtualMachineBlockDeviceAttachment %s/%s: %v", namespace, attachmentName, err) + } else { + logger.Success("VirtualMachineBlockDeviceAttachment %s/%s deleted", namespace, attachmentName) + } + } + if diskName != "" { + if err := virtClient.VirtualDisks().Delete(ctx, namespace, diskName); err != nil { + logger.Warn("Failed to delete VirtualDisk %s/%s: %v", namespace, diskName, err) + } else { + logger.Success("VirtualDisk %s/%s deleted", namespace, diskName) + } + } + return nil +} diff --git a/pkg/kubernetes/vmpod.go b/pkg/kubernetes/vmpod.go new file mode 100644 index 0000000..764702e --- /dev/null +++ b/pkg/kubernetes/vmpod.go @@ -0,0 +1,91 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// GetVMPodNodeAndContainerID returns the base cluster node name and the first container ID +// for the Pod that runs the given VM (e.g. virt-launcher--*). +// Used to run nsenter into the VM container from the base cluster node. +func GetVMPodNodeAndContainerID(ctx context.Context, baseConfig *rest.Config, namespace, vmName string) (nodeName, containerID string, err error) { + clientset, err := kubernetes.NewForConfig(baseConfig) + if err != nil { + return "", "", fmt.Errorf("create clientset: %w", err) + } + pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return "", "", fmt.Errorf("list pods in %s: %w", namespace, err) + } + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Status.Phase != corev1.PodRunning { + continue + } + // Match Pod that runs this VM: name often contains VM name (e.g. virt-launcher-master-1-xxx) + if !strings.Contains(pod.Name, vmName) { + continue + } + if pod.Spec.NodeName == "" { + continue + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.ContainerID != "" { + return pod.Spec.NodeName, cs.ContainerID, nil + } + } + return "", "", fmt.Errorf("pod %s/%s has no container ID yet", namespace, pod.Name) + } + return "", "", fmt.Errorf("no running pod for VM %s in namespace %s", vmName, namespace) +} + +// GetNodeSSHAddress returns an address (IP or hostname) suitable for SSH from the jump host to the given node. +// It prefers InternalIP, then ExternalIP, so that the jump host can reach the node when the K8s node name does not resolve. +func GetNodeSSHAddress(ctx context.Context, baseConfig *rest.Config, nodeName string) (string, error) { + clientset, err := kubernetes.NewForConfig(baseConfig) + if err != nil { + return "", fmt.Errorf("create clientset: %w", err) + } + node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("get node %s: %w", nodeName, err) + } + var internalIP, externalIP string + for _, addr := range node.Status.Addresses { + switch addr.Type { + case corev1.NodeInternalIP: + internalIP = addr.Address + case corev1.NodeExternalIP: + externalIP = addr.Address + } + } + if internalIP != "" { + return internalIP, nil + } + if externalIP != "" { + return externalIP, nil + } + return "", fmt.Errorf("node %s has no InternalIP or ExternalIP", nodeName) +} diff --git a/pkg/setup/setup.go b/pkg/setup/setup.go new file mode 100644 index 0000000..c8c9e47 --- /dev/null +++ b/pkg/setup/setup.go @@ -0,0 +1,37 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package setup + +import ( + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// Init validates environment and initializes the logger. +// Must be called from test BeforeSuite when using cluster or other storage-e2e packages. +func Init() error { + if err := config.ValidateEnvironment(); err != nil { + return err + } + return logger.Initialize() +} + +// Close closes the logger and any open log files. +// Should be called from test AfterSuite. +func Close() error { + return logger.Close() +} diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go new file mode 100644 index 0000000..2b55882 --- /dev/null +++ b/pkg/ssh/ssh.go @@ -0,0 +1,40 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ssh exposes SSH client for use by external modules (e.g. sds-node-configurator e2e tests). +package ssh + +import ( + "context" + + internalssh "github.com/deckhouse/storage-e2e/internal/infrastructure/ssh" +) + +// Client is the minimal SSH client interface needed for e2e helpers (Exec, Close). +type Client interface { + Exec(ctx context.Context, cmd string) (string, error) + Close() error +} + +// NewClient creates a new SSH client (direct connection). +func NewClient(user, host, keyPath string) (Client, error) { + return internalssh.NewClient(user, host, keyPath) +} + +// NewClientWithJumpHost creates a new SSH client that connects through a jump host. +func NewClientWithJumpHost(jumpUser, jumpHost, jumpKeyPath, targetUser, targetHost, targetKeyPath string) (Client, error) { + return internalssh.NewClientWithJumpHost(jumpUser, jumpHost, jumpKeyPath, targetUser, targetHost, targetKeyPath) +}