Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
branches:
- main

permissions:
contents: read

jobs:
golangci:
name: lint
Expand All @@ -14,7 +17,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v3
with:
go-version: 1.25.x
go-version: 1.26.x
- name: golangci-lint
uses: golangci/golangci-lint-action@v9
with:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ on:
push:
branches:
- main
permissions:
contents: write
jobs:
semantic-release:
runs-on: ubuntu-latest
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ on:
pull_request:
types: [opened, synchronize, reopened]
name: Test
permissions:
contents: read
jobs:
test:
runs-on: ubuntu-latest
Expand All @@ -19,7 +21,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.25.x
go-version: 1.26.x

- name: test
run: make test
43 changes: 42 additions & 1 deletion clients/sftp/sftp.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
package sftp

import (
"fmt"
"os"
"path/filepath"

"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/knownhosts"
)

func SSHConnect(host, user, password string) (*sftp.Client, error) {
hostKeyCallback, err := defaultHostKeyCallback()
if err != nil {
return nil, err
}

return SSHConnectWithHostKeyCallback(host, user, password, hostKeyCallback)
}

func SSHConnectWithHostKeyCallback(host, user, password string, hostKeyCallback ssh.HostKeyCallback) (*sftp.Client, error) {
config := &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{
ssh.Password(password),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
HostKeyCallback: hostKeyCallback,
}

conn, err := ssh.Dial("tcp", host, config)
Expand All @@ -26,3 +40,30 @@ func SSHConnect(host, user, password string) (*sftp.Client, error) {

return client, nil
}

func defaultHostKeyCallback() (ssh.HostKeyCallback, error) {
knownHostsFiles := []string{"/etc/ssh/ssh_known_hosts"}
if homeDir, err := os.UserHomeDir(); err == nil {
knownHostsFiles = append(knownHostsFiles, filepath.Join(homeDir, ".ssh", "known_hosts"))
}

existingFiles := make([]string, 0, len(knownHostsFiles))
for _, file := range knownHostsFiles {
if _, err := os.Stat(file); err == nil {
existingFiles = append(existingFiles, file)
} else if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to stat known hosts file %s: %w", file, err)
}
}

if len(existingFiles) == 0 {
return nil, fmt.Errorf("no SSH known_hosts files found; add the SFTP host key to ~/.ssh/known_hosts or /etc/ssh/ssh_known_hosts")
}

callback, err := knownhosts.New(existingFiles...)
if err != nil {
return nil, fmt.Errorf("failed to load SSH known_hosts: %w", err)
}

return callback, nil
}
142 changes: 103 additions & 39 deletions fs/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"errors"
"flag"
"io"
"net"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -153,56 +157,116 @@ type testData struct {
func getTestClients(t *testing.T, ctx context.Context) []testData {
t.Helper()

gcsFS, err := NewGCSFS(ctx, "test", connection.GCSConnection{
GCPConnection: connection.GCPConnection{
SkipTLSVerify: true,
Endpoint: "https://localhost:4443/storage/v1/",
},
})
if err != nil {
t.Fatalf("failed to create GCS filesystem: %v", err)
testClients := []testData{{"local", NewLocalFS(t.TempDir())}}

if isTCPAvailable("localhost:4443") {
gcsFS, err := NewGCSFS(ctx, "test", connection.GCSConnection{
GCPConnection: connection.GCPConnection{
SkipTLSVerify: true,
Endpoint: "https://localhost:4443/storage/v1/",
},
})
if err != nil {
t.Fatalf("failed to create GCS filesystem: %v", err)
}
createGSCBucket(t, ctx, gcsFS.Client, "fkae-project", "test")
testClients = append(testClients, testData{"gcsFS", gcsFS})
} else {
t.Log("skipping gcsFS: localhost:4443 is unavailable")
}
createGSCBucket(t, ctx, gcsFS.Client, "fkae-project", "test")

sshfs, err := NewSSHFS("localhost:2222", "foo", "pass")
if err != nil {
t.Fatalf("%v", err)
if isTCPAvailable("localhost:2222") {
if configureSFTPTestKnownHosts(t, "localhost:2222") {
sshfs, err := NewSSHFS("localhost:2222", "foo", "pass")
if err != nil {
t.Fatalf("%v", err)
}
testClients = append(testClients, testData{"sshfs", sshfs})
}
} else {
t.Log("skipping sshfs: localhost:2222 is unavailable")
}

smbFS, err := NewSMBFS("localhost", "445", "users", types.Authentication{
Username: types.EnvVar{ValueStatic: "foo"},
Password: types.EnvVar{ValueStatic: "pass"}})
if err != nil {
t.Fatalf("%v", err)
}

s3FS, err := NewS3FS(ctx, *bucket, connection.S3Connection{
Bucket: *bucket,
UsePathStyle: true,
AWSConnection: connection.AWSConnection{
AccessKey: types.EnvVar{ValueStatic: accessKeyID},
SecretKey: types.EnvVar{ValueStatic: secretKey},
Region: region,
Endpoint: *endpoint,
SkipTLSVerify: *skipVerify,
},
})
if err != nil {
t.Fatal(err)
if isTCPAvailable("localhost:445") {
smbFS, err := NewSMBFS("localhost", "445", "users", types.Authentication{
Username: types.EnvVar{ValueStatic: "foo"},
Password: types.EnvVar{ValueStatic: "pass"}})
if err != nil {
t.Fatalf("%v", err)
}
testClients = append(testClients, testData{"smbfs", smbFS})
} else {
t.Log("skipping smbfs: localhost:445 is unavailable")
}
createBucket(t, s3FS.Client, *bucket)

testClients := []testData{
{"gcsFS", gcsFS},
{"sshfs", sshfs},
{"smbfs", smbFS},
{"s3FS", s3FS},
{"local", NewLocalFS(t.TempDir())},
if s3Address, ok := endpointAddress(*endpoint); ok && isTCPAvailable(s3Address) {
s3FS, err := NewS3FS(ctx, *bucket, connection.S3Connection{
Bucket: *bucket,
UsePathStyle: true,
AWSConnection: connection.AWSConnection{
AccessKey: types.EnvVar{ValueStatic: accessKeyID},
SecretKey: types.EnvVar{ValueStatic: secretKey},
Region: region,
Endpoint: *endpoint,
SkipTLSVerify: *skipVerify,
},
})
if err != nil {
t.Fatal(err)
}
createBucket(t, s3FS.Client, *bucket)
testClients = append(testClients, testData{"s3FS", s3FS})
} else {
t.Logf("skipping s3FS: %s is unavailable", *endpoint)
}

return testClients
}

func configureSFTPTestKnownHosts(t *testing.T, address string) bool {
t.Helper()

host, port, err := net.SplitHostPort(address)
if err != nil {
t.Fatalf("invalid SFTP test address %q: %v", address, err)
}

knownHosts, err := exec.Command("ssh-keyscan", "-p", port, host).Output()
if err != nil || len(knownHosts) == 0 {
t.Logf("skipping sshfs: failed to scan host key for %s: %v", address, err)
return false
}

homeDir := t.TempDir()
sshDir := filepath.Join(homeDir, ".ssh")
if err := os.MkdirAll(sshDir, 0700); err != nil {
t.Fatalf("failed to create test .ssh directory: %v", err)
}
if err := os.WriteFile(filepath.Join(sshDir, "known_hosts"), knownHosts, 0600); err != nil {
t.Fatalf("failed to write test known_hosts: %v", err)
}
t.Setenv("HOME", homeDir)

return true
}

func isTCPAvailable(address string) bool {
conn, err := net.DialTimeout("tcp", address, time.Second)
if err != nil {
return false
}
_ = conn.Close()
return true
}

func endpointAddress(endpoint string) (string, bool) {
u, err := url.Parse(endpoint)
if err != nil || u.Host == "" {
return "", false
}
return u.Host, true
}

func createBucket(t *testing.T, cl *s3.Client, bucket string) {
t.Helper()

Expand Down
8 changes: 4 additions & 4 deletions fs/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/bmatcuk/doublestar/v4"
Expand Down Expand Up @@ -152,13 +152,13 @@ func (t *s3FS) Read(ctx gocontext.Context, key string) (io.ReadCloser, error) {
}

func (t *s3FS) Write(ctx gocontext.Context, path string, data io.Reader) (os.FileInfo, error) {
// manager.NewUploader handles unseekable streaming readers (e.g. TeeReader
// transfermanager handles unseekable streaming readers (e.g. TeeReader
// chains) by buffering parts internally, computing the required checksums,
// and setting Content-Length automatically — all without the caller needing
// to buffer or pre-compute checksums manually.
// See: https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/sdk-utilities-s3.html#unseekable-streaming-input
uploader := manager.NewUploader(t.Client)
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
uploader := transfermanager.New(t.Client)
_, err := uploader.UploadObject(ctx, &transfermanager.UploadObjectInput{
Bucket: aws.String(t.Bucket),
Key: aws.String(path),
Body: data,
Expand Down
21 changes: 16 additions & 5 deletions fs/s3_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func TestS3E2E_ContentLength(t *testing.T) {
func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, region, accessKey, secretKey string) {
t.Helper()

if address, ok := endpointAddress(endpoint); !ok || !isTCPAvailable(address) {
t.Skipf("skipping S3 E2E test: %s is unavailable", endpoint)
}

// Create S3 filesystem client
s3FS, err := NewS3FS(ctx, bucket, connection.S3Connection{
Bucket: bucket,
Expand All @@ -56,7 +60,7 @@ func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, re
if err != nil {
t.Fatalf("failed to create S3 filesystem: %v", err)
}
defer s3FS.Close()
defer closeWithError(t, s3FS)

// Create bucket if it doesn't exist
createBucket(t, s3FS.Client, bucket)
Expand All @@ -80,7 +84,7 @@ func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, re
if err != nil {
t.Fatalf("failed to read from S3: %v", err)
}
defer reader.Close()
defer closeWithError(t, reader)

readContent, err := io.ReadAll(reader)
if err != nil {
Expand Down Expand Up @@ -125,7 +129,7 @@ func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, re
if err != nil {
t.Fatalf("failed to read from S3: %v", err)
}
defer reader.Close()
defer closeWithError(t, reader)

readContent, err := io.ReadAll(reader)
if err != nil {
Expand Down Expand Up @@ -174,7 +178,7 @@ func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, re
if err != nil {
t.Fatalf("failed to read from S3: %v", err)
}
defer reader.Close()
defer closeWithError(t, reader)

readContent, err := io.ReadAll(reader)
if err != nil {
Expand Down Expand Up @@ -206,7 +210,7 @@ func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, re
if err != nil {
t.Fatalf("failed to read from S3: %v", err)
}
defer reader.Close()
defer closeWithError(t, reader)

readContent, err := io.ReadAll(reader)
if err != nil {
Expand All @@ -219,6 +223,13 @@ func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, re
})
}

func closeWithError(t *testing.T, closer io.Closer) {
t.Helper()
if err := closer.Close(); err != nil {
t.Errorf("failed to close: %v", err)
}
}

// testWriter is a simple writer that buffers up to max bytes for testing
type testWriter struct {
buffer *[]byte
Expand Down
Loading
Loading