11package cache
22
33import (
4+ "bufio"
5+ "bytes"
46 "context"
57 "errors"
68 "fmt"
9+ "io"
10+ "regexp"
711 "sort"
812 "strings"
913 "sync"
@@ -29,6 +33,7 @@ import (
2933 "k8s.io/client-go/kubernetes/scheme"
3034 "k8s.io/client-go/rest"
3135 testcore "k8s.io/client-go/testing"
36+ "k8s.io/klog/v2/textlogger"
3237 "sigs.k8s.io/yaml"
3338
3439 "github.com/argoproj/gitops-engine/pkg/utils/kube"
@@ -1393,3 +1398,219 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
13931398// })
13941399// }
13951400//}
1401+
1402+ type syncedBuffer struct {
1403+ mutex sync.Mutex
1404+ buf bytes.Buffer
1405+ }
1406+
1407+ func (lb * syncedBuffer ) Read (p []byte ) (n int , err error ) {
1408+ lb .mutex .Lock ()
1409+ defer lb .mutex .Unlock ()
1410+ return lb .buf .Read (p )
1411+ }
1412+
1413+ func (lb * syncedBuffer ) Write (p []byte ) (n int , err error ) {
1414+ lb .mutex .Lock ()
1415+ defer lb .mutex .Unlock ()
1416+ return lb .buf .Write (p )
1417+ }
1418+
1419+ func Test_watchEvents_Missing_resourceVersion (t * testing.T ) {
1420+ objExample := & unstructured.Unstructured {Object : map [string ]any {
1421+ "apiVersion" : "apiservice.example.com/v1" ,
1422+ "kind" : "Example" ,
1423+ "metadata" : map [string ]any {
1424+ "name" : "example" ,
1425+ },
1426+ }}
1427+
1428+ testCases := []struct {
1429+ name string
1430+ objs []runtime.Object
1431+ funAssert func (t * testing.T , logLines []string )
1432+ waitForLogLines []string
1433+ waitForLogExtra time.Duration
1434+ watchResyncTimeout time.Duration
1435+ }{
1436+ {
1437+ name : "Should_ignore_resource_without_resourceVersion" ,
1438+ objs : []runtime.Object {objExample },
1439+ waitForLogLines : []string {"Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion" },
1440+ funAssert : func (t * testing.T , logLines []string ) {
1441+ t .Helper ()
1442+ require .NotContains (t , logLines , "Resyncing Example.apiservice.example.com on https://test due to timeout" )
1443+ },
1444+ watchResyncTimeout : defaultWatchResyncTimeout ,
1445+ waitForLogExtra : 0 * time .Millisecond ,
1446+ },
1447+ {
1448+ name : "Should_not_ignore_resource_with_resourceVersion" ,
1449+ objs : []runtime.Object {testDeploy ()},
1450+ waitForLogLines : []string {"Start watch Deployment.apps on https://test" },
1451+ funAssert : func (t * testing.T , logLines []string ) {
1452+ t .Helper ()
1453+ require .NotContains (t , logLines , "Ignoring watch for Deployment.apps on https://test due to missing resourceVersion" )
1454+ },
1455+ watchResyncTimeout : defaultWatchResyncTimeout ,
1456+ waitForLogExtra : 100 * time .Millisecond ,
1457+ },
1458+ {
1459+ name : "Should_retry_ignored_resource_on_next_resync" ,
1460+ objs : []runtime.Object {objExample },
1461+ waitForLogLines : []string {"Failed to watch Example.apiservice.example.com on https://test: Resyncing Example.apiservice.example.com on https://test due to timeout, retrying in 1s" },
1462+ funAssert : func (t * testing.T , logLines []string ) {
1463+ t .Helper ()
1464+ require .Contains (t , logLines , "Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion" )
1465+ },
1466+ watchResyncTimeout : 10 * time .Millisecond ,
1467+ waitForLogExtra : 100 * time .Millisecond ,
1468+ },
1469+ }
1470+
1471+ readLinesUntil := func (ctx context.Context , buf io.Reader , wantedLines []string , readExtra time.Duration ) ([]string , error ) {
1472+ wantedStatuses := map [string ]bool {}
1473+ for _ , wantedLine := range wantedLines {
1474+ wantedStatuses [strings .TrimSuffix (wantedLine , "\r \n " )] = false
1475+ }
1476+
1477+ var logLines []string
1478+ readChan := make (chan any )
1479+ go func () {
1480+ lineRgx := regexp .MustCompile (`^.+?\s+\d+\s+.+?\.go:(?:\d+?|\d+?)\]\s+"(?P<msg>.+)"$` )
1481+
1482+ for {
1483+ scanner := bufio .NewScanner (buf )
1484+ for scanner .Scan () {
1485+ match := lineRgx .FindStringSubmatch (scanner .Text ())
1486+ readChan <- match [1 ]
1487+ }
1488+
1489+ if scanner .Err () != nil {
1490+ readChan <- scanner .Err ()
1491+ return
1492+ }
1493+
1494+ // EOF. Waiting for data.
1495+ time .Sleep (50 * time .Millisecond )
1496+ }
1497+ }()
1498+
1499+ var readExtraTimer * time.Timer
1500+ var readExtraTimeoutChan <- chan time.Time
1501+
1502+ for {
1503+ select {
1504+ case <- readExtraTimeoutChan :
1505+ return logLines , ctx .Err ()
1506+ case <- ctx .Done ():
1507+ return logLines , ctx .Err ()
1508+ case read := <- readChan :
1509+ if err , ok := read .(error ); ok {
1510+ return logLines , err
1511+ }
1512+
1513+ // EOF
1514+ if read == nil {
1515+ return logLines , nil
1516+ }
1517+
1518+ logLines = append (logLines , read .(string ))
1519+ if readExtraTimer != nil {
1520+ continue
1521+ }
1522+
1523+ line := read .(string )
1524+ if _ , ok := wantedStatuses [line ]; ok {
1525+ wantedStatuses [line ] = true
1526+
1527+ done := true
1528+ for _ , ok := range wantedStatuses {
1529+ if ! ok {
1530+ done = false
1531+ }
1532+ }
1533+
1534+ if done {
1535+ readExtraTimer = time .NewTimer (readExtra )
1536+ readExtraTimeoutChan = readExtraTimer .C
1537+ }
1538+ }
1539+ }
1540+ }
1541+ }
1542+
1543+ createCluster := func (opts []UpdateSettingsFunc , objs ... runtime.Object ) * clusterCache {
1544+ client := fake .NewSimpleDynamicClientWithCustomListKinds (scheme .Scheme ,
1545+ map [schema.GroupVersionResource ]string {
1546+ {Group : "apiservice.example.com" , Version : "v1" , Resource : "examples" }: "ExampleList" ,
1547+ },
1548+ objs ... )
1549+ reactor := client .ReactionChain [0 ]
1550+ client .PrependReactor ("list" , "*" , func (action testcore.Action ) (handled bool , ret runtime.Object , err error ) {
1551+ handled , ret , err = reactor .React (action )
1552+ if err != nil || ! handled {
1553+ return
1554+ }
1555+
1556+ // The apiservice.example.com group is for testing missing resourceVersion, so we omit setting it for those responses.
1557+ retList , ok := ret .(* unstructured.UnstructuredList )
1558+ if ok && len (retList .Items ) > 0 && retList .Items [0 ].GetObjectKind ().GroupVersionKind ().Group == "apiservice.example.com" {
1559+ return
1560+ }
1561+
1562+ // make sure retList response have resource version
1563+ ret .(metav1.ListInterface ).SetResourceVersion ("123" )
1564+ return
1565+ })
1566+
1567+ apiResources := []kube.APIResourceInfo {{
1568+ GroupKind : schema.GroupKind {Group : "apps" , Kind : "Deployment" },
1569+ GroupVersionResource : schema.GroupVersionResource {Group : "apps" , Version : "v1" , Resource : "deployments" },
1570+ Meta : metav1.APIResource {Namespaced : true },
1571+ }, {
1572+ GroupKind : schema.GroupKind {Group : "apiservice.example.com" , Kind : "Example" },
1573+ GroupVersionResource : schema.GroupVersionResource {Group : "apiservice.example.com" , Version : "v1" , Resource : "examples" },
1574+ Meta : metav1.APIResource {Namespaced : false },
1575+ }}
1576+
1577+ opts = append ([]UpdateSettingsFunc {
1578+ SetKubectl (& kubetest.MockKubectlCmd {APIResources : apiResources , DynamicClient : client }),
1579+ }, opts ... )
1580+
1581+ cache := NewClusterCache (
1582+ & rest.Config {Host : "https://test" },
1583+ opts ... ,
1584+ )
1585+ return cache
1586+ }
1587+
1588+ for _ , testCase := range testCases {
1589+ t .Run (testCase .name , func (t * testing.T ) {
1590+ ctx , ctxCancel := context .WithTimeout (context .Background (), 1 * time .Second )
1591+ defer ctxCancel ()
1592+
1593+ var logBuffer syncedBuffer
1594+ logger := textlogger .NewLogger (textlogger .NewConfig (textlogger .Output (& logBuffer ), textlogger .Verbosity (1 ), textlogger .FixedTime (time .Unix (0 , 0 ))))
1595+
1596+ cluster := createCluster ([]UpdateSettingsFunc {
1597+ SetLogr (logger ),
1598+ SetWatchResyncTimeout (testCase .watchResyncTimeout ),
1599+ }, testCase .objs ... )
1600+
1601+ defer func () {
1602+ cluster .Invalidate ()
1603+ }()
1604+
1605+ err := cluster .EnsureSynced ()
1606+ require .NoError (t , err )
1607+
1608+ logLines , err := readLinesUntil (ctx , & logBuffer , testCase .waitForLogLines , testCase .waitForLogExtra )
1609+ require .NoError (t , err )
1610+ testCase .funAssert (t , logLines )
1611+ for _ , wantedLogLine := range testCase .waitForLogLines {
1612+ require .Contains (t , logLines , wantedLogLine )
1613+ }
1614+ })
1615+ }
1616+ }
0 commit comments