diff --git a/go.mod b/go.mod index 6a8340c5d..c59f124fc 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( kmodules.xyz/custom-resources v0.30.0 kmodules.xyz/offshoot-api v0.30.1 kubevault.dev/apimachinery v0.18.3 - stash.appscode.dev/apimachinery v0.40.0 + stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5 ) require ( diff --git a/go.sum b/go.sum index 0bc1398e8..38d6b2972 100644 --- a/go.sum +++ b/go.sum @@ -754,5 +754,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= -stash.appscode.dev/apimachinery v0.40.0 h1:U6oNI0Ivx+Wo74GVnMDv9VoI1zMwdIGgd5HK2rs5oKc= -stash.appscode.dev/apimachinery v0.40.0/go.mod h1:y1VgM/7CT990qqHAtE0JGg1N0sFWzmrq/9HzUU5V8dc= +stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5 h1:ymy/F4PBvEu5RFDWz6T5UwrRWiAjmWnLZ9GwPQyyg7Q= +stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5/go.mod h1:y1VgM/7CT990qqHAtE0JGg1N0sFWzmrq/9HzUU5V8dc= diff --git a/pkg/backup.go b/pkg/backup.go index a4a96c3aa..09906e634 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -254,6 +254,10 @@ func (opt *vaultOptions) backupVault(targetRef api_v1beta1.TargetRef) (*restic.B if err != nil { return nil, err } + err = resticWrapper.EnsureNoExclusiveLock(opt.kubeClient, opt.namespace) + if err != nil { + return nil, err + } return resticWrapper.RunBackup(opt.backupOptions, targetRef) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 825b528d4..a87bd0637 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1258,7 +1258,7 @@ sigs.k8s.io/structured-merge-diff/v4/value ## explicit; go 1.12 sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 -# stash.appscode.dev/apimachinery v0.40.0 +# stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5 ## explicit; go 1.23.0 stash.appscode.dev/apimachinery/apis stash.appscode.dev/apimachinery/apis/repositories diff --git a/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go b/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go index f36100ded..0d7c05d7a 100644 --- a/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go +++ b/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go @@ -595,6 +595,32 @@ func (w *ResticWrapper) listKey() ([]byte, error) { return w.run(Command{Name: ResticCMD, Args: args}) } +func (w *ResticWrapper) listLocks() ([]byte, error) { + klog.Infoln("Listing restic locks") + + args := []interface{}{"list", "locks", "--no-lock"} + + args = w.appendCacheDirFlag(args) + args = w.appendMaxConnectionsFlag(args) + args = w.appendCaCertFlag(args) + args = w.appendInsecureTLSFlag(args) + + return w.run(Command{Name: ResticCMD, Args: args}) +} + +func (w *ResticWrapper) lockStats(lockID string) ([]byte, error) { + klog.Infoln("Getting stats of restic lock") + + args := []interface{}{"cat", "lock", lockID, "--no-lock"} + + args = w.appendCacheDirFlag(args) + args = w.appendMaxConnectionsFlag(args) + args = w.appendCaCertFlag(args) + args = w.appendInsecureTLSFlag(args) + + return w.run(Command{Name: ResticCMD, Args: args}) +} + func (w *ResticWrapper) updateKey(params keyParams) ([]byte, error) { klog.Infoln("Updating restic key") diff --git a/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go b/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go index bc4d5f5ff..e6de5fec8 100644 --- a/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go +++ b/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go @@ -20,11 +20,13 @@ import ( "bufio" "bytes" "encoding/json" + "fmt" "io" "os" "path/filepath" "regexp" "strings" + "time" api_v1beta1 "stash.appscode.dev/apimachinery/apis/stash/v1beta1" ) @@ -241,3 +243,34 @@ type ForgetGroup struct { type StatsContainer struct { TotalSize uint64 `json:"total_size"` } + +type LockStats struct { + Time time.Time `json:"time"` + Exclusive bool `json:"exclusive"` // true if the lock is exclusive, false if it is non-exclusive + Hostname string `json:"hostname"` // Hostname of the machine where the lock was created, our case PodName + Username string `json:"username"` + PID int `json:"pid"` + UID int `json:"uid"` + GID int `json:"gid"` +} + +func extractLockStats(raw []byte) (*LockStats, error) { + var stats LockStats + if err := json.Unmarshal(raw, &stats); err != nil { + return nil, fmt.Errorf("cannot decode lock JSON: %w", err) + } + return &stats, nil +} + +func extractLockIDs(r io.Reader) ([]string, error) { + sc := bufio.NewScanner(r) + var ids []string + + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if len(line) >= 64 { + ids = append(ids, line[:64]) + } + } + return ids, sc.Err() +} diff --git a/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go b/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go index ae9b74b3e..6914b6634 100644 --- a/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go +++ b/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go @@ -16,7 +16,102 @@ limitations under the License. package restic +import ( + "bytes" + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + kutil "kmodules.xyz/client-go" +) + func (w *ResticWrapper) UnlockRepository() error { _, err := w.unlock() return err } + +// getLockIDs lists every lock ID currently held in the repository. +func (w *ResticWrapper) getLockIDs() ([]string, error) { + w.sh.ShowCMD = true + out, err := w.listLocks() + if err != nil { + return nil, err + } + return extractLockIDs(bytes.NewReader(out)) +} + +// getLockStats returns the decoded JSON for a single lock. +func (w *ResticWrapper) getLockStats(lockID string) (*LockStats, error) { + w.sh.ShowCMD = true + out, err := w.lockStats(lockID) + if err != nil { + return nil, err + } + return extractLockStats(out) +} + +// getPodNameIfAnyExclusiveLock scans every lock and returns the hostname aka (Pod name) of the first exclusive lock it finds, or "" if none exist. +func (w *ResticWrapper) getPodNameIfAnyExclusiveLock() (string, error) { + klog.Infoln("Checking for exclusive locks in the repository...") + ids, err := w.getLockIDs() + if err != nil { + return "", fmt.Errorf("failed to list locks: %w", err) + } + for _, id := range ids { + st, err := w.getLockStats(id) + if err != nil { + return "", fmt.Errorf("failed to inspect lock %s: %w", id, err) + } + if st.Exclusive { // There's no chances to get multiple exclusive locks, so we can return the first one we find. + return st.Hostname, nil + } + } + return "", nil +} + +// EnsureNoExclusiveLock blocks until any exclusive lock is released. +// If a lock is held by a Running Pod, it waits; otherwise it unlocks. +func (w *ResticWrapper) EnsureNoExclusiveLock(k8sClient kubernetes.Interface, namespace string) error { + klog.Infoln("Ensuring no exclusive lock is held in the repository...") + podName, err := w.getPodNameIfAnyExclusiveLock() + if err != nil { + return fmt.Errorf("failed to query exclusive lock: %w", err) + } + if podName == "" { + klog.Infoln("No exclusive lock found, nothing to do.") + return nil // nothing to do + } + + return wait.PollUntilContextTimeout( + context.Background(), + 5*time.Second, + kutil.ReadinessTimeout, + true, + func(ctx context.Context) (bool, error) { + klog.Infoln("Getting Pod:", podName, "to check if it's finished...") + pod, err := k8sClient.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): // Pod gone → unlock + klog.Infoln("Pod:", podName, "not found, unlocking repository...") + _, err := w.unlock() + return true, err + case err != nil: // API error → stop + return false, err + case pod.Status.Phase == corev1.PodSucceeded || + pod.Status.Phase == corev1.PodFailed: // Pod finished → unlock + klog.Infoln("Pod:", podName, "finished with phase", pod.Status.Phase, ", unlocking repository...") + _, err := w.unlock() + return true, err + default: // Not finished yet → keep waiting + klog.Infoln("Pod:", podName, "is in phase", pod.Status.Phase, ", waiting for it to finish...") + return false, nil + } + }, + ) +}