Skip to content

Commit 07653b5

Browse files
committed
move r2-pump from GCP function to binary
1 parent 5d42894 commit 07653b5

File tree

10 files changed

+329
-199
lines changed

10 files changed

+329
-199
lines changed

.github/workflows/main.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ jobs:
113113
asset_name: git-sync
114114
asset_content_type: application/octet-stream
115115

116+
- uses: actions/upload-release-asset@v1
117+
name: release r2-pump
118+
env:
119+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
120+
with:
121+
upload_url: ${{ steps.create_release.outputs.upload_url }}
122+
asset_path: ./bin/r2-pump
123+
asset_name: r2-pump
124+
asset_content_type: application/octet-stream
125+
116126
test:
117127
name: Test
118128
runs-on: ubuntu-latest

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ define generate-func-make
77
endef
88

99
.PHONY: all
10-
all: bin/process-version-host bin/git-sync bin/checker \
10+
all: bin/process-version-host bin/git-sync bin/checker bin/r2-pump \
1111
;$(foreach n,${CLOUD_FUNCTIONS},$(call generate-func-make,$n))
1212

1313
bin/checker:
@@ -22,6 +22,9 @@ bin/process-version-host:
2222
bin/process-version:
2323
go build $(GO_BUILD_ARGS) -o bin/process-version ./cmd/process-version
2424

25+
bin/r2-pump:
26+
go build $(GO_BUILD_ARGS) -o bin/r2-pump ./cmd/r2-pump
27+
2528
.PHONY: schema
2629
schema:
2730
./bin/packages human > schema_human.json

audit/audit.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,14 @@ func WroteAlgolia(ctx context.Context, pkgName string, currVersion string, lastV
187187
return nil
188188
}
189189

190-
func WroteR2(ctx context.Context, pkgName string, version string, keys []string, ext string) error {
190+
func WroteR2(ctx context.Context, pkgName string, version string, keys []string) error {
191191
content := bytes.NewBufferString("")
192192
fmt.Fprint(content, "Files:\n")
193193
for _, key := range keys {
194194
fmt.Fprintf(content, "- %s\n", key)
195195
}
196196

197-
if err := create(ctx, pkgName, version, "r2-publish/"+ext, content); err != nil {
197+
if err := create(ctx, pkgName, version, "r2-publish", content); err != nil {
198198
return errors.Wrap(err, "could not create audit log file")
199199
}
200200
return nil

cmd/r2-pump/main.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
b64 "encoding/base64"
7+
"encoding/json"
8+
"fmt"
9+
"io"
10+
"io/ioutil"
11+
"log"
12+
"net/http"
13+
"os"
14+
"runtime"
15+
"time"
16+
17+
"github.com/pkg/errors"
18+
19+
"github.com/cdnjs/tools/audit"
20+
"github.com/cdnjs/tools/gcp"
21+
"github.com/cdnjs/tools/metrics"
22+
"github.com/cdnjs/tools/packages"
23+
"github.com/cdnjs/tools/sentry"
24+
25+
"github.com/aws/aws-sdk-go-v2/aws"
26+
"github.com/aws/aws-sdk-go-v2/config"
27+
"github.com/aws/aws-sdk-go-v2/credentials"
28+
"github.com/aws/aws-sdk-go-v2/service/s3"
29+
30+
"cloud.google.com/go/pubsub"
31+
)
32+
33+
var (
34+
PROJECT = os.Getenv("PROJECT")
35+
SUBSCRIPTION = os.Getenv("SUBSCRIPTION")
36+
37+
R2_BUCKET = os.Getenv("R2_BUCKET")
38+
R2_KEY_ID = os.Getenv("R2_KEY_ID")
39+
R2_KEY_SECRET = os.Getenv("R2_KEY_SECRET")
40+
R2_ENDPOINT = os.Getenv("R2_ENDPOINT")
41+
)
42+
43+
func init() {
44+
sentry.Init()
45+
}
46+
47+
func main() {
48+
ctx := context.Background()
49+
client, err := pubsub.NewClient(ctx, PROJECT)
50+
if err != nil {
51+
log.Fatalf("could not create pubsub Client: %v", err)
52+
}
53+
sub := client.Subscription(SUBSCRIPTION)
54+
sub.ReceiveSettings.Synchronous = true
55+
sub.ReceiveSettings.MaxOutstandingMessages = 5
56+
sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()
57+
58+
for {
59+
log.Printf("started consuming messages\n")
60+
if err := consume(ctx, client, sub); err != nil {
61+
log.Fatalf("could not pull messages: %s", err)
62+
}
63+
}
64+
}
65+
66+
type Message struct {
67+
GCSEvent gcp.GCSEvent `json:"gcsEvent"`
68+
}
69+
70+
func consume(ctx context.Context, client *pubsub.Client, sub *pubsub.Subscription) error {
71+
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
72+
log.Printf("received message: %s\n", msg.Data)
73+
74+
msg.Ack()
75+
if err := processMessage(ctx, msg.Data); err != nil {
76+
log.Printf("failed to process message: %s\n", err)
77+
}
78+
})
79+
if err != nil {
80+
return errors.Wrap(err, "could not receive from subscription")
81+
}
82+
return nil
83+
}
84+
85+
func processMessage(ctx context.Context, data []byte) error {
86+
var message Message
87+
if err := json.Unmarshal(data, &message); err != nil {
88+
return errors.Wrap(err, "failed to parse")
89+
}
90+
91+
return invoke(ctx, message.GCSEvent)
92+
}
93+
94+
func invoke(ctx context.Context, e gcp.GCSEvent) error {
95+
sentry.Init()
96+
defer sentry.PanicHandler()
97+
98+
pkgName := e.Metadata["package"].(string)
99+
version := e.Metadata["version"].(string)
100+
log.Printf("Invoke %s %s\n", pkgName, version)
101+
102+
configStr, err := b64.StdEncoding.DecodeString(e.Metadata["config"].(string))
103+
if err != nil {
104+
return fmt.Errorf("could not decode config: %v", err)
105+
}
106+
107+
archive, err := gcp.ReadObject(ctx, e.Bucket, e.Name)
108+
if err != nil {
109+
return fmt.Errorf("could not read object: %v", err)
110+
}
111+
112+
bucket := aws.String(R2_BUCKET)
113+
114+
r2Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
115+
return aws.Endpoint{
116+
URL: R2_ENDPOINT,
117+
}, nil
118+
})
119+
120+
cfg, err := config.LoadDefaultConfig(ctx,
121+
config.WithEndpointResolverWithOptions(r2Resolver),
122+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(R2_KEY_ID, R2_KEY_SECRET, "")),
123+
)
124+
if err != nil {
125+
return fmt.Errorf("could not load config: %s", err)
126+
}
127+
128+
s3Client := s3.NewFromConfig(cfg)
129+
130+
keys := make([]string, 0)
131+
132+
onFile := func(name string, r io.Reader) error {
133+
// remove leading slash
134+
name = name[1:]
135+
key := fmt.Sprintf("%s/%s/%s", pkgName, version, name)
136+
137+
content, err := ioutil.ReadAll(r)
138+
if err != nil {
139+
return errors.Wrap(err, "could not read file")
140+
}
141+
142+
keys = append(keys, key)
143+
144+
meta := newMetadata(len(content))
145+
146+
s3Object := s3.PutObjectInput{
147+
Body: bytes.NewReader(content),
148+
Bucket: bucket,
149+
Key: aws.String(key),
150+
Metadata: meta,
151+
}
152+
if err := uploadFile(ctx, s3Client, &s3Object); err != nil {
153+
return errors.Wrap(err, "failed to upload file")
154+
}
155+
return nil
156+
}
157+
if err := gcp.Inflate(bytes.NewReader(archive), onFile); err != nil {
158+
return fmt.Errorf("could not inflate archive: %s", err)
159+
}
160+
161+
if len(keys) == 0 {
162+
log.Printf("%s: no files to publish\n", pkgName)
163+
}
164+
165+
pkg := new(packages.Package)
166+
if err := json.Unmarshal([]byte(configStr), &pkg); err != nil {
167+
return fmt.Errorf("failed to parse config: %s", err)
168+
}
169+
170+
if err := audit.WroteR2(ctx, pkgName, version, keys); err != nil {
171+
log.Printf("failed to audit: %s\n", err)
172+
}
173+
if err := metrics.NewUpdatePublishedR2(); err != nil {
174+
return errors.Wrap(err, "could not report metrics")
175+
}
176+
177+
return nil
178+
}
179+
180+
func newMetadata(size int) map[string]string {
181+
lastModifiedTime := time.Now()
182+
lastModifiedSeconds := lastModifiedTime.UnixNano() / int64(time.Second)
183+
lastModifiedStr := lastModifiedTime.Format(http.TimeFormat)
184+
etag := fmt.Sprintf("%x-%x", lastModifiedSeconds, size)
185+
186+
meta := make(map[string]string)
187+
188+
// https://github.com/cdnjs/origin-worker/blob/ff91d30586c9e924ff919407401dff6f52826b4d/src/index.js#L212-L213
189+
meta["etag"] = etag
190+
meta["last_modified"] = lastModifiedStr
191+
192+
return meta
193+
}
194+
195+
func uploadFile(ctx context.Context, s3Client *s3.Client, obj *s3.PutObjectInput) error {
196+
if _, err := s3Client.PutObject(ctx, obj); err != nil {
197+
return errors.Wrapf(err, "failed to put Object %s", *obj.Key)
198+
}
199+
200+
return nil
201+
}

functions/r2-pump/go.mod

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,46 +5,22 @@ go 1.20
55
replace github.com/cdnjs/tools => ../../
66

77
require (
8-
github.com/aws/aws-sdk-go v1.44.282
9-
github.com/aws/aws-sdk-go-v2 v1.18.1
10-
github.com/aws/aws-sdk-go-v2/config v1.18.27
11-
github.com/aws/aws-sdk-go-v2/credentials v1.13.26
12-
github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0
8+
cloud.google.com/go/pubsub v1.10.3
139
github.com/cdnjs/tools v0.0.0-00010101000000-000000000000
14-
github.com/cloudflare/cloudflare-go v0.69.0
1510
github.com/pkg/errors v0.9.1
1611
)
1712

1813
require (
1914
cloud.google.com/go v0.81.0 // indirect
2015
cloud.google.com/go/storage v1.15.0 // indirect
2116
github.com/agnivade/levenshtein v1.1.1 // indirect
22-
github.com/algolia/algoliasearch-client-go/v3 v3.4.0 // indirect
23-
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
24-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
25-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
26-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
27-
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
28-
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 // indirect
29-
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
30-
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 // indirect
31-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
32-
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 // indirect
33-
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
34-
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
35-
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
36-
github.com/aws/smithy-go v1.13.5 // indirect
3717
github.com/blang/semver v3.5.1+incompatible // indirect
3818
github.com/getsentry/sentry-go v0.6.1 // indirect
3919
github.com/gobwas/glob v0.2.3 // indirect
4020
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
4121
github.com/golang/protobuf v1.5.2 // indirect
42-
github.com/google/go-github v17.0.0+incompatible // indirect
43-
github.com/google/go-querystring v1.1.0 // indirect
22+
github.com/google/go-cmp v0.5.8 // indirect
4423
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
45-
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
46-
github.com/hashicorp/go-retryablehttp v0.7.3 // indirect
47-
github.com/jmespath/go-jmespath v0.4.0 // indirect
4824
github.com/jstemmer/go-junit-report v0.9.1 // indirect
4925
github.com/karrick/godirwalk v1.15.6 // indirect
5026
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
@@ -55,9 +31,9 @@ require (
5531
golang.org/x/mod v0.14.0 // indirect
5632
golang.org/x/net v0.20.0 // indirect
5733
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 // indirect
34+
golang.org/x/sync v0.6.0 // indirect
5835
golang.org/x/sys v0.16.0 // indirect
5936
golang.org/x/text v0.14.0 // indirect
60-
golang.org/x/time v0.3.0 // indirect
6137
golang.org/x/tools v0.17.0 // indirect
6238
google.golang.org/api v0.45.0 // indirect
6339
google.golang.org/appengine v1.6.7 // indirect

0 commit comments

Comments
 (0)