From ed878e282f53d2b9cf2e2eabe366700b1f7e6861 Mon Sep 17 00:00:00 2001 From: Isaac Date: Thu, 11 Dec 2025 23:42:00 +0000 Subject: [PATCH 1/4] fix: socket watcher and server remove socket Signed-off-by: GitHub --- cns/deviceplugin/server.go | 6 ++ cns/deviceplugin/server_test.go | 69 ++++++++++++++++++ cns/deviceplugin/socketwatcher.go | 7 +- cns/deviceplugin/socketwatcher_test.go | 99 +++++++++++++++++++++++--- 4 files changed, 171 insertions(+), 10 deletions(-) create mode 100644 cns/deviceplugin/server_test.go diff --git a/cns/deviceplugin/server.go b/cns/deviceplugin/server.go index c5243368b9..725f57c543 100644 --- a/cns/deviceplugin/server.go +++ b/cns/deviceplugin/server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "os" "time" "github.com/pkg/errors" @@ -45,6 +46,11 @@ func (s *Server) Run(ctx context.Context) error { defer cancel() s.shutdownCh = childCtx.Done() + // remove the socket if it already exists + if err := os.Remove(s.address); err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "error removing socket") + } + l, err := net.Listen("unix", s.address) if err != nil { return errors.Wrap(err, "error listening on socket") diff --git a/cns/deviceplugin/server_test.go b/cns/deviceplugin/server_test.go new file mode 100644 index 0000000000..29f8ab4bef --- /dev/null +++ b/cns/deviceplugin/server_test.go @@ -0,0 +1,69 @@ +package deviceplugin + +import ( + "context" + "net" + "os" + "path/filepath" + "testing" + "time" + + "go.uber.org/zap" +) + +type mockDeviceCounter struct { + count int +} + +func (m *mockDeviceCounter) getDeviceCount() int { + return m.count +} + +func TestServer_Run_CleansUpExistingSocket(t *testing.T) { + // Create a temporary directory for the socket + tmpDir := t.TempDir() + socketPath := filepath.Join(tmpDir, "test.sock") + + // Create a dummy file at the socket path to simulate a stale socket + if err := os.WriteFile(socketPath, []byte("stale socket"), 0o600); err != nil { + t.Fatalf("failed to create dummy socket file: %v", err) + } + + logger := zap.NewNop() + counter := &mockDeviceCounter{count: 1} + server := NewServer(logger, socketPath, counter, time.Second) + + // Create a context that we can cancel to stop the server + ctx, cancel := context.WithCancel(context.Background()) + + // Run the server in a goroutine + errChan := make(chan error) + go func() { + errChan <- server.Run(ctx) + }() + + // Wait for the server to start up, delete the pre-existing file and recreate it as a socket + // We verify this by trying to connect to the socket repeatedly until success or timeout + var conn net.Conn + var err error + // Retry for up to 2 seconds + for start := time.Now(); time.Since(start) < 2*time.Second; time.Sleep(200 * time.Millisecond) { + conn, err = net.Dial("unix", socketPath) + if err == nil { + conn.Close() + break + } + } + + if err != nil { + t.Errorf("failed to connect to socket: %v", err) + } + + // Stop the server + cancel() + + // Wait for Run to return + if err := <-errChan; err != nil { + t.Errorf("server.Run returned error: %v", err) + } +} diff --git a/cns/deviceplugin/socketwatcher.go b/cns/deviceplugin/socketwatcher.go index 05b7df602b..5d1c7d621b 100644 --- a/cns/deviceplugin/socketwatcher.go +++ b/cns/deviceplugin/socketwatcher.go @@ -56,7 +56,12 @@ func (s *SocketWatcher) WatchSocket(ctx context.Context, socket string) <-chan s socketChan := make(chan struct{}) s.socketChans[socket] = socketChan go func() { - defer close(socketChan) + defer func() { + s.mutex.Lock() + delete(s.socketChans, socket) + s.mutex.Unlock() + close(socketChan) + }() ticker := time.NewTicker(s.options.statInterval) defer ticker.Stop() for { diff --git a/cns/deviceplugin/socketwatcher_test.go b/cns/deviceplugin/socketwatcher_test.go index e987358481..cb60fe25c2 100644 --- a/cns/deviceplugin/socketwatcher_test.go +++ b/cns/deviceplugin/socketwatcher_test.go @@ -13,7 +13,10 @@ import ( func TestWatchContextCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - logger, _ := zap.NewDevelopment() + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } s := deviceplugin.NewSocketWatcher(logger) done := make(chan struct{}) go func(done chan struct{}) { @@ -44,14 +47,21 @@ func TestWatchSocketDeleted(t *testing.T) { if err != nil { t.Fatalf("error creating temporary directory: %v", err) } - defer os.RemoveAll(tempDir) // Ensure the directory is cleaned up + defer func() { + if removeErr := os.RemoveAll(tempDir); removeErr != nil { + t.Fatalf("failed to remove temp dir: %v", removeErr) + } + }() // Ensure the directory is cleaned up socket := filepath.Join(tempDir, "to-be-deleted.sock") - if _, err := os.Create(socket); err != nil { - t.Fatalf("error creating test file %s: %v", socket, err) + if _, createErr := os.Create(socket); createErr != nil { + t.Fatalf("error creating test file %s: %v", socket, createErr) } - logger, _ := zap.NewDevelopment() + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } s := deviceplugin.NewSocketWatcher(logger, deviceplugin.SocketWatcherStatInterval(time.Second)) done := make(chan struct{}) go func(done chan struct{}) { @@ -84,14 +94,21 @@ func TestWatchSocketTwice(t *testing.T) { if err != nil { t.Fatalf("error creating temporary directory: %v", err) } - defer os.RemoveAll(tempDir) // Ensure the directory is cleaned up + defer func() { + if removeErr := os.RemoveAll(tempDir); removeErr != nil { + t.Fatalf("failed to remove temp dir: %v", removeErr) + } + }() // Ensure the directory is cleaned up socket := filepath.Join(tempDir, "to-be-deleted.sock") - if _, err := os.Create(socket); err != nil { - t.Fatalf("error creating test file %s: %v", socket, err) + if _, createErr := os.Create(socket); createErr != nil { + t.Fatalf("error creating test file %s: %v", socket, createErr) } - logger, _ := zap.NewDevelopment() + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } s := deviceplugin.NewSocketWatcher(logger, deviceplugin.SocketWatcherStatInterval(time.Second)) done1 := make(chan struct{}) done2 := make(chan struct{}) @@ -134,3 +151,67 @@ func TestWatchSocketTwice(t *testing.T) { t.Fatal("socket watcher is still watching 5 seconds after file is deleted") } } + +func TestWatchSocketCleanup(t *testing.T) { + // Create a temporary directory + tempDir, err := os.MkdirTemp("", "socket-watcher-test-") + if err != nil { + t.Fatalf("error creating temporary directory: %v", err) + } + defer func() { + if removeErr := os.RemoveAll(tempDir); removeErr != nil { + t.Fatalf("failed to remove temp dir: %v", removeErr) + } + }() // Ensure the directory is cleaned up + + socket := filepath.Join(tempDir, "to-be-deleted.sock") + if _, createErr := os.Create(socket); createErr != nil { + t.Fatalf("error creating test file %s: %v", socket, createErr) + } + + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + // Use a short interval for faster test execution + s := deviceplugin.NewSocketWatcher(logger, deviceplugin.SocketWatcherStatInterval(100*time.Millisecond)) + + // 1. Watch the socket + ch1 := s.WatchSocket(context.Background(), socket) + + // Verify it's open + select { + case <-ch1: + t.Fatal("channel should be open initially") + default: + } + + // 2. Delete the socket to trigger watcher exit + if err := os.Remove(socket); err != nil { + t.Fatalf("failed to remove socket: %v", err) + } + + // 3. Wait for ch1 to close + select { + case <-ch1: + // Expected + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for watcher to detect socket deletion") + } + + // 4. Recreate the socket + if _, err := os.Create(socket); err != nil { + t.Fatalf("error recreating test file %s: %v", socket, err) + } + + // 5. Watch the socket again + ch2 := s.WatchSocket(context.Background(), socket) + + // 6. Verify ch2 is open + select { + case <-ch2: + t.Fatal("channel is closed but expected to be open") + case <-time.After(200 * time.Millisecond): + // Wait for at least one tick to ensure the watcher has had a chance to run. + } +} From 39e2aae73bc01766ca9258d6ce0fe07c2ef5c898 Mon Sep 17 00:00:00 2001 From: Isaac Date: Fri, 12 Dec 2025 00:51:36 +0000 Subject: [PATCH 2/4] nit: tempDir() Signed-off-by: GitHub --- cns/deviceplugin/socketwatcher_test.go | 33 +++----------------------- 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/cns/deviceplugin/socketwatcher_test.go b/cns/deviceplugin/socketwatcher_test.go index cb60fe25c2..4f3bae5d81 100644 --- a/cns/deviceplugin/socketwatcher_test.go +++ b/cns/deviceplugin/socketwatcher_test.go @@ -42,16 +42,7 @@ func TestWatchContextCancelled(t *testing.T) { } func TestWatchSocketDeleted(t *testing.T) { - // Create a temporary directory - tempDir, err := os.MkdirTemp("", "socket-watcher-test-") - if err != nil { - t.Fatalf("error creating temporary directory: %v", err) - } - defer func() { - if removeErr := os.RemoveAll(tempDir); removeErr != nil { - t.Fatalf("failed to remove temp dir: %v", removeErr) - } - }() // Ensure the directory is cleaned up + tempDir := t.TempDir() socket := filepath.Join(tempDir, "to-be-deleted.sock") if _, createErr := os.Create(socket); createErr != nil { @@ -89,16 +80,7 @@ func TestWatchSocketDeleted(t *testing.T) { } func TestWatchSocketTwice(t *testing.T) { - // Create a temporary directory - tempDir, err := os.MkdirTemp("", "socket-watcher-test-") - if err != nil { - t.Fatalf("error creating temporary directory: %v", err) - } - defer func() { - if removeErr := os.RemoveAll(tempDir); removeErr != nil { - t.Fatalf("failed to remove temp dir: %v", removeErr) - } - }() // Ensure the directory is cleaned up + tempDir := t.TempDir() socket := filepath.Join(tempDir, "to-be-deleted.sock") if _, createErr := os.Create(socket); createErr != nil { @@ -153,16 +135,7 @@ func TestWatchSocketTwice(t *testing.T) { } func TestWatchSocketCleanup(t *testing.T) { - // Create a temporary directory - tempDir, err := os.MkdirTemp("", "socket-watcher-test-") - if err != nil { - t.Fatalf("error creating temporary directory: %v", err) - } - defer func() { - if removeErr := os.RemoveAll(tempDir); removeErr != nil { - t.Fatalf("failed to remove temp dir: %v", removeErr) - } - }() // Ensure the directory is cleaned up + tempDir := t.TempDir() socket := filepath.Join(tempDir, "to-be-deleted.sock") if _, createErr := os.Create(socket); createErr != nil { From 6ca2265bdb8f44681d5e5d6c3a6252905b1507b0 Mon Sep 17 00:00:00 2001 From: Isaac Date: Fri, 12 Dec 2025 22:31:35 +0000 Subject: [PATCH 3/4] test: use local testdata dir for socket tests to fix Windows CI issues Replaces usage of `t.TempDir()` with a local `testdata` directory for temporary socket files in `socketwatcher_test.go` and `server_test.go`. This change addresses two issues on Windows: 1. `t.TempDir()` creates files in the system temp directory, which can cause cleanup failures and disk space issues on CI agents if files are left behind. 2. Explicitly closes file handles after `os.Create()` to prevent "Access is denied" errors during cleanup, as Windows does not allow deleting open files. Also adds a `.gitignore` to `cns/deviceplugin/testdata` to ensure temporary socket files are not tracked. Signed-off-by: GitHub --- cns/deviceplugin/server_test.go | 4 +-- cns/deviceplugin/socketwatcher_test.go | 41 +++++++++++++++++--------- cns/deviceplugin/testdata/.gitignore | 1 + cns/deviceplugin/testdata/socket.sock | 0 4 files changed, 30 insertions(+), 16 deletions(-) create mode 100644 cns/deviceplugin/testdata/.gitignore delete mode 100644 cns/deviceplugin/testdata/socket.sock diff --git a/cns/deviceplugin/server_test.go b/cns/deviceplugin/server_test.go index 29f8ab4bef..b3d044baca 100644 --- a/cns/deviceplugin/server_test.go +++ b/cns/deviceplugin/server_test.go @@ -21,8 +21,8 @@ func (m *mockDeviceCounter) getDeviceCount() int { func TestServer_Run_CleansUpExistingSocket(t *testing.T) { // Create a temporary directory for the socket - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "test.sock") + socketPath := filepath.Join("testdata", "test.sock") + defer os.Remove(socketPath) // Create a dummy file at the socket path to simulate a stale socket if err := os.WriteFile(socketPath, []byte("stale socket"), 0o600); err != nil { diff --git a/cns/deviceplugin/socketwatcher_test.go b/cns/deviceplugin/socketwatcher_test.go index 4f3bae5d81..3ea3736ff1 100644 --- a/cns/deviceplugin/socketwatcher_test.go +++ b/cns/deviceplugin/socketwatcher_test.go @@ -12,6 +12,14 @@ import ( ) func TestWatchContextCancelled(t *testing.T) { + socket := filepath.Join("testdata", "socket.sock") + f, createErr := os.Create(socket) + if createErr != nil { + t.Fatalf("error creating test file %s: %v", socket, createErr) + } + f.Close() + defer os.Remove(socket) + ctx, cancel := context.WithCancel(context.Background()) logger, err := zap.NewDevelopment() if err != nil { @@ -20,7 +28,7 @@ func TestWatchContextCancelled(t *testing.T) { s := deviceplugin.NewSocketWatcher(logger) done := make(chan struct{}) go func(done chan struct{}) { - <-s.WatchSocket(ctx, "testdata/socket.sock") + <-s.WatchSocket(ctx, socket) close(done) }(done) @@ -42,12 +50,13 @@ func TestWatchContextCancelled(t *testing.T) { } func TestWatchSocketDeleted(t *testing.T) { - tempDir := t.TempDir() - - socket := filepath.Join(tempDir, "to-be-deleted.sock") - if _, createErr := os.Create(socket); createErr != nil { + socket := filepath.Join("testdata", "to-be-deleted.sock") + f, createErr := os.Create(socket) + if createErr != nil { t.Fatalf("error creating test file %s: %v", socket, createErr) } + f.Close() + defer os.Remove(socket) logger, err := zap.NewDevelopment() if err != nil { @@ -80,12 +89,13 @@ func TestWatchSocketDeleted(t *testing.T) { } func TestWatchSocketTwice(t *testing.T) { - tempDir := t.TempDir() - - socket := filepath.Join(tempDir, "to-be-deleted.sock") - if _, createErr := os.Create(socket); createErr != nil { + socket := filepath.Join("testdata", "to-be-deleted.sock") + f, createErr := os.Create(socket) + if createErr != nil { t.Fatalf("error creating test file %s: %v", socket, createErr) } + f.Close() + defer os.Remove(socket) logger, err := zap.NewDevelopment() if err != nil { @@ -135,12 +145,13 @@ func TestWatchSocketTwice(t *testing.T) { } func TestWatchSocketCleanup(t *testing.T) { - tempDir := t.TempDir() - - socket := filepath.Join(tempDir, "to-be-deleted.sock") - if _, createErr := os.Create(socket); createErr != nil { + socket := filepath.Join("testdata", "to-be-deleted.sock") + f, createErr := os.Create(socket) + if createErr != nil { t.Fatalf("error creating test file %s: %v", socket, createErr) } + f.Close() + defer os.Remove(socket) logger, err := zap.NewDevelopment() if err != nil { @@ -173,9 +184,11 @@ func TestWatchSocketCleanup(t *testing.T) { } // 4. Recreate the socket - if _, err := os.Create(socket); err != nil { + f, err = os.Create(socket) + if err != nil { t.Fatalf("error recreating test file %s: %v", socket, err) } + f.Close() // 5. Watch the socket again ch2 := s.WatchSocket(context.Background(), socket) diff --git a/cns/deviceplugin/testdata/.gitignore b/cns/deviceplugin/testdata/.gitignore new file mode 100644 index 0000000000..c74d682773 --- /dev/null +++ b/cns/deviceplugin/testdata/.gitignore @@ -0,0 +1 @@ +*.sock diff --git a/cns/deviceplugin/testdata/socket.sock b/cns/deviceplugin/testdata/socket.sock deleted file mode 100644 index e69de29bb2..0000000000 From 0ebb4b9fa51d51a2c15f74e2932ec3dbf6fd28a3 Mon Sep 17 00:00:00 2001 From: Isaac Date: Fri, 12 Dec 2025 23:26:28 +0000 Subject: [PATCH 4/4] lint: fix err name Signed-off-by: GitHub --- cns/deviceplugin/socketwatcher_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cns/deviceplugin/socketwatcher_test.go b/cns/deviceplugin/socketwatcher_test.go index 3ea3736ff1..4275734726 100644 --- a/cns/deviceplugin/socketwatcher_test.go +++ b/cns/deviceplugin/socketwatcher_test.go @@ -171,8 +171,8 @@ func TestWatchSocketCleanup(t *testing.T) { } // 2. Delete the socket to trigger watcher exit - if err := os.Remove(socket); err != nil { - t.Fatalf("failed to remove socket: %v", err) + if removeErr := os.Remove(socket); removeErr != nil { + t.Fatalf("failed to remove socket: %v", removeErr) } // 3. Wait for ch1 to close