@@ -16,7 +16,102 @@ limitations under the License.
1616
1717package restic
1818
19+ import (
20+ "bytes"
21+ "context"
22+ "fmt"
23+ "time"
24+
25+ corev1 "k8s.io/api/core/v1"
26+ "k8s.io/apimachinery/pkg/api/errors"
27+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+ "k8s.io/apimachinery/pkg/util/wait"
29+ "k8s.io/client-go/kubernetes"
30+ "k8s.io/klog/v2"
31+ kutil "kmodules.xyz/client-go"
32+ )
33+
1934func (w * ResticWrapper ) UnlockRepository () error {
2035 _ , err := w .unlock ()
2136 return err
2237}
38+
39+ // getLockIDs lists every lock ID currently held in the repository.
40+ func (w * ResticWrapper ) getLockIDs () ([]string , error ) {
41+ w .sh .ShowCMD = true
42+ out , err := w .listLocks ()
43+ if err != nil {
44+ return nil , err
45+ }
46+ return extractLockIDs (bytes .NewReader (out ))
47+ }
48+
49+ // getLockStats returns the decoded JSON for a single lock.
50+ func (w * ResticWrapper ) getLockStats (lockID string ) (* LockStats , error ) {
51+ w .sh .ShowCMD = true
52+ out , err := w .lockStats (lockID )
53+ if err != nil {
54+ return nil , err
55+ }
56+ return extractLockStats (out )
57+ }
58+
59+ // getPodNameIfAnyExclusiveLock scans every lock and returns the hostname aka (Pod name) of the first exclusive lock it finds, or "" if none exist.
60+ func (w * ResticWrapper ) getPodNameIfAnyExclusiveLock () (string , error ) {
61+ klog .Infoln ("Checking for exclusive locks in the repository..." )
62+ ids , err := w .getLockIDs ()
63+ if err != nil {
64+ return "" , fmt .Errorf ("failed to list locks: %w" , err )
65+ }
66+ for _ , id := range ids {
67+ st , err := w .getLockStats (id )
68+ if err != nil {
69+ return "" , fmt .Errorf ("failed to inspect lock %s: %w" , id , err )
70+ }
71+ if st .Exclusive { // There's no chances to get multiple exclusive locks, so we can return the first one we find.
72+ return st .Hostname , nil
73+ }
74+ }
75+ return "" , nil
76+ }
77+
78+ // EnsureNoExclusiveLock blocks until any exclusive lock is released.
79+ // If a lock is held by a Running Pod, it waits; otherwise it unlocks.
80+ func (w * ResticWrapper ) EnsureNoExclusiveLock (k8sClient kubernetes.Interface , namespace string ) error {
81+ klog .Infoln ("Ensuring no exclusive lock is held in the repository..." )
82+ podName , err := w .getPodNameIfAnyExclusiveLock ()
83+ if err != nil {
84+ return fmt .Errorf ("failed to query exclusive lock: %w" , err )
85+ }
86+ if podName == "" {
87+ klog .Infoln ("No exclusive lock found, nothing to do." )
88+ return nil // nothing to do
89+ }
90+
91+ return wait .PollUntilContextTimeout (
92+ context .Background (),
93+ 5 * time .Second ,
94+ kutil .ReadinessTimeout ,
95+ true ,
96+ func (ctx context.Context ) (bool , error ) {
97+ klog .Infoln ("Getting Pod:" , podName , "to check if it's finished..." )
98+ pod , err := k8sClient .CoreV1 ().Pods (namespace ).Get (ctx , podName , metav1.GetOptions {})
99+ switch {
100+ case errors .IsNotFound (err ): // Pod gone → unlock
101+ klog .Infoln ("Pod:" , podName , "not found, unlocking repository..." )
102+ _ , err := w .unlock ()
103+ return true , err
104+ case err != nil : // API error → stop
105+ return false , err
106+ case pod .Status .Phase == corev1 .PodSucceeded ||
107+ pod .Status .Phase == corev1 .PodFailed : // Pod finished → unlock
108+ klog .Infoln ("Pod:" , podName , "finished with phase" , pod .Status .Phase , ", unlocking repository..." )
109+ _ , err := w .unlock ()
110+ return true , err
111+ default : // Not finished yet → keep waiting
112+ klog .Infoln ("Pod:" , podName , "is in phase" , pod .Status .Phase , ", waiting for it to finish..." )
113+ return false , nil
114+ }
115+ },
116+ )
117+ }
0 commit comments