diff --git a/protocol/sqs/v2/go.mod b/protocol/sqs/v2/go.mod new file mode 100644 index 000000000..3aa882597 --- /dev/null +++ b/protocol/sqs/v2/go.mod @@ -0,0 +1,24 @@ +module github.com/cloudevents/sdk-go/protocol/sqs/v2 + +go 1.23.0 + +toolchain go1.23.8 + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +require ( + github.com/aws/aws-sdk-go-v2 v1.38.0 + github.com/aws/aws-sdk-go-v2/service/sqs v1.41.0 + github.com/cloudevents/sdk-go/v2 v2.16.1 +) + +require ( + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect + github.com/aws/smithy-go v1.22.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect +) diff --git a/protocol/sqs/v2/go.sum b/protocol/sqs/v2/go.sum new file mode 100644 index 000000000..d802c3a8d --- /dev/null +++ b/protocol/sqs/v2/go.sum @@ -0,0 +1,37 @@ +github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU= +github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 h1:o9RnO+YZ4X+kt5Z7Nvcishlz0nksIt2PIzDglLMP0vA= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3/go.mod h1:+6aLJzOG1fvMOyzIySYjOFjcguGvVRL68R+uoRencN4= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzNU9TQSNRNeD9kOklqTzyk5v6s= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc= +github.com/aws/aws-sdk-go-v2/service/sqs v1.41.0 h1:xobvQ4NxlXFUNgVwE6cnMI/ww7K7jtQMWKor2Gi61Xg= +github.com/aws/aws-sdk-go-v2/service/sqs v1.41.0/go.mod h1:RExz4LhRKY5iogQ1dz7KVa3JyBY0PBotXovrDj850Sc= +github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= +github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol/sqs/v2/message.go b/protocol/sqs/v2/message.go new file mode 100644 index 000000000..237f3e5d6 --- /dev/null +++ b/protocol/sqs/v2/message.go @@ -0,0 +1,175 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package sqs + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +const ( + prefix = "ce-" + contentType = "Content-Type" +) + +var specs = spec.WithPrefix(prefix) + +type SQSDeleteMessageAPI interface { + DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) +} + +// Message implements binding.Message by wrapping an SQS types.Message. +// This message *can* be read several times safely +type Message struct { + Msg *types.Message + queueURL *string + client SQSDeleteMessageAPI + + version spec.Version + format format.Format +} + +var ( + _ binding.Message = (*Message)(nil) + _ binding.MessageMetadataReader = (*Message)(nil) +) + +// NewMessage wraps an SQS types.Message in a binding.Message. +// The returned message *can* be read several times safely +// The default encoding returned is EncodingStructured unless the SQS message contains a specversion +// header. +func NewMessage(msg *types.Message, sqsClient SQSDeleteMessageAPI, queueURL *string) *Message { + v := getSpecVersion(msg) + fmt := getFormat(msg) + if v == nil && fmt == nil { + fmt = format.JSON + } + return &Message{Msg: msg, version: v, format: fmt, client: sqsClient, queueURL: queueURL} +} + +func getSpecVersion(message *types.Message) spec.Version { + if sv, ok := message.MessageAttributes[specs.PrefixedSpecVersionName()]; ok { + if sv.StringValue != nil { + return specs.Version(aws.ToString(sv.StringValue)) + } + } + return nil +} + +func getFormat(message *types.Message) format.Format { + if sv, ok := message.MessageAttributes[contentType]; ok { + if sv.StringValue != nil && format.IsFormat(aws.ToString(sv.StringValue)) { + return format.Lookup(aws.ToString(sv.StringValue)) + } + } + return nil +} + +// ReadEncoding return the type of the message Encoding. +func (m *Message) ReadEncoding() binding.Encoding { + if m.version != nil { + return binding.EncodingBinary + } + if m.format != nil { + return binding.EncodingStructured + } + return binding.EncodingUnknown +} + +// ReadStructured transfers a structured-mode event to a StructuredWriter. +func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.version != nil { + return binding.ErrNotStructured + } + if m.format == nil { + return binding.ErrNotStructured + } + data := []byte(*m.Msg.Body) + return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(data)) +} + +// ReadBinary transfers a binary-mode event to an BinaryWriter. +func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + if m.format != nil { + return binding.ErrNotBinary + } + var err error + for k, attr := range m.Msg.MessageAttributes { + v := aws.ToString(attr.StringValue) + if strings.HasPrefix(k, prefix) { + attr := m.version.Attribute(k) + if attr != nil { + err = encoder.SetAttribute(attr, v) + } else { + err = encoder.SetExtension(strings.ToLower(strings.TrimPrefix(k, prefix)), v) + } + } else if k == contentType { + err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), v) + } + if err != nil { + return err + } + } + + if m.Msg.Body != nil { + err = encoder.SetData(bytes.NewBuffer([]byte(*m.Msg.Body))) + } + + return err +} + +// GetAttribute implements binding.MessageMetadataReader +func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { + attr := m.version.AttributeFromKind(k) + if attr == nil { + return nil, nil + } + key := withPrefix(attr.Name()) + if msgAttr, ok := m.Msg.MessageAttributes[key]; ok && aws.ToString(msgAttr.StringValue) != "" { + return attr, aws.ToString(msgAttr.StringValue) + } + return nil, nil +} + +// GetExtension implements binding.MessageMetadataReader +func (m *Message) GetExtension(name string) interface{} { + key := withPrefix(name) + if attr, ok := m.Msg.MessageAttributes[key]; ok && attr.StringValue != nil { + return aws.ToString(attr.StringValue) + } + return nil +} + +// Finish implements binding.Message +// It deletes the message from SQS when the CloudEvent has been ACKed. +func (m *Message) Finish(err error) error { + if protocol.IsACK(err) { + // If the error is an ACK, we delete the message from SQS. + ctx := context.Background() + _, err = m.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ + QueueUrl: m.queueURL, + ReceiptHandle: m.Msg.ReceiptHandle, + }) + return err + } + return nil +} + +// withPrefix prepends the prefix to the attribute name +func withPrefix(attributeName string) string { + return fmt.Sprintf("%s%s", prefix, attributeName) +} diff --git a/protocol/sqs/v2/message_test.go b/protocol/sqs/v2/message_test.go new file mode 100644 index 000000000..9cc53db9d --- /dev/null +++ b/protocol/sqs/v2/message_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package sqs + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/cloudevents/sdk-go/v2/event" +) + +type mockGetObjectAPI func(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) + +func (m mockGetObjectAPI) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { + return m(ctx, params, optFns...) +} + +func TestReadStructured(t *testing.T) { + tests := []struct { + name string + client func(t *testing.T) SQSDeleteMessageAPI + msg *types.Message + queueName *string + wantErr error + }{ + { + name: "nil format", + msg: &types.Message{ + Body: aws.String(""), + }, + client: func(t *testing.T) SQSDeleteMessageAPI { + return mockGetObjectAPI(func(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { + return nil, nil + }) + }, + }, + { + name: "json format", + msg: &types.Message{ + Body: aws.String(""), + MessageAttributes: map[string]types.MessageAttributeValue{ + contentType: stringMessageAttribute(event.ApplicationCloudEventsJSON), + }, + }, + client: func(t *testing.T) SQSDeleteMessageAPI { + return mockGetObjectAPI(func(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { + t.Helper() + return nil, nil + }) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + msg := NewMessage(tc.msg, tc.client(t), tc.queueName) + err := msg.ReadStructured(context.Background(), msgToWriter(tc.msg)) + if err != tc.wantErr { + t.Errorf("Error unexpected. got: %v, want: %v", err, tc.wantErr) + } + }) + } +} + +func TestReadBinary(t *testing.T) { + msg := &types.Message{ + Body: aws.String("{hello:world}"), + MessageAttributes: map[string]types.MessageAttributeValue{ + "ce-specversion": stringMessageAttribute("1.0"), + "ce-type": stringMessageAttribute("binary.test"), + "ce-source": stringMessageAttribute("test-source"), + "ce-id": stringMessageAttribute("ABC-123"), + }, + ReceiptHandle: aws.String("test-receipt-handle"), + } + + client := func(t *testing.T) SQSDeleteMessageAPI { + return mockGetObjectAPI(func(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { + t.Helper() + return nil, nil + }) + } + message := NewMessage(msg, client(t), aws.String("test-queue")) + err := message.ReadBinary(context.Background(), msgToWriter(msg)) + if err != nil { + t.Errorf("Error unexpected. got: %v", err) + } +} + +func msgToWriter(msg *types.Message) *sqsMessageWriter { + return &sqsMessageWriter{ + MessageBody: msg.Body, + MessageAttributes: msg.MessageAttributes, + } +} diff --git a/protocol/sqs/v2/options.go b/protocol/sqs/v2/options.go new file mode 100644 index 000000000..086186887 --- /dev/null +++ b/protocol/sqs/v2/options.go @@ -0,0 +1,69 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package sqs + +import ( + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +// Option provides a way to configure the protocol +type Option func(*Protocol) error + +func WithNewClientFromConfig(cfg aws.Config, optFns ...func(*sqs.Options)) Option { + return func(p *Protocol) error { + p.client = sqs.NewFromConfig(cfg, optFns...) + return nil + } +} + +func WithClient(client *sqs.Client) Option { + return func(p *Protocol) error { + if client == nil { + return fmt.Errorf("client cannot be nil") + } + p.client = client + return nil + } +} + +func WithVisibilityTimeout(visibilityTimeout int32) Option { + return func(p *Protocol) error { + if visibilityTimeout <= 0 { + return fmt.Errorf("visibilityTimeout must be greater than 0") + } + p.visibilityTimeout = visibilityTimeout + return nil + } +} + +func WithMaxMessages(maxMessages int32) Option { + return func(p *Protocol) error { + if maxMessages <= 0 { + return fmt.Errorf("maxMessages must be greater than 0") + } + if maxMessages > 10 { + return fmt.Errorf("maxMessages must be less than 10") + } + p.maxMessages = maxMessages + return nil + } +} + +func WithWaitTimeSeconds(waitTimeSeconds int32) Option { + return func(p *Protocol) error { + if waitTimeSeconds <= 0 { + return fmt.Errorf("waitTimeSeconds must be greater than 0 second") + } + if waitTimeSeconds > 20 { + return fmt.Errorf("waitTimeSeconds must be less than 20 seconds") + } + p.waitTimeSeconds = waitTimeSeconds + return nil + } +} diff --git a/protocol/sqs/v2/protocol.go b/protocol/sqs/v2/protocol.go new file mode 100644 index 000000000..b113c3e73 --- /dev/null +++ b/protocol/sqs/v2/protocol.go @@ -0,0 +1,130 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package sqs + +import ( + "context" + "fmt" + "io" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/cloudevents/sdk-go/v2/binding" + cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +const ( + defaultVisibilityTimeout = 30 + defaultMaxMessages = 10 + defaultWaitTimeSeconds = 20 +) + +type Protocol struct { + client *sqs.Client + incoming chan *Message + queueURL *string + maxMessages int32 + waitTimeSeconds int32 + visibilityTimeout int32 +} + +// New creates a new SQS protocol. +func New(queueName string, opts ...Option) (*Protocol, error) { + p := &Protocol{ + incoming: make(chan *Message), + queueURL: aws.String(queueName), + visibilityTimeout: defaultVisibilityTimeout, + maxMessages: defaultMaxMessages, + waitTimeSeconds: defaultWaitTimeSeconds, + } + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + if p.client == nil { + return nil, fmt.Errorf("sqs client is nil") + } + return p, nil +} + +func (p *Protocol) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +// Send sends messages. Send implements Sender.Sender +func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error { + var err error + defer func() { _ = in.Finish(err) }() + msgInput := sqs.SendMessageInput{ + QueueUrl: p.queueURL, + } + err = WriteMessageInput(ctx, in, &msgInput, transformers...) + if err != nil { + return err + } + _, err = p.client.SendMessage(ctx, &msgInput) + return err +} + +// OpenInbound implements Opener.OpenInbound +func (p *Protocol) OpenInbound(ctx context.Context) error { + logger := cecontext.LoggerFrom(ctx) + logger.Infof("Starting SQS Message polling for %s", aws.ToString(p.queueURL)) + for { + select { + case <-ctx.Done(): + return nil + default: + res, err := p.getMessages(ctx) + if err != nil { + continue + } + for _, message := range res.Messages { + p.incoming <- NewMessage(&message, p.client, p.queueURL) + } + } + } +} + +func (p *Protocol) getMessages(ctx context.Context) (*sqs.ReceiveMessageOutput, error) { + return p.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: p.queueURL, + MaxNumberOfMessages: p.maxMessages, + WaitTimeSeconds: p.waitTimeSeconds, + VisibilityTimeout: p.visibilityTimeout, + MessageSystemAttributeNames: []types.MessageSystemAttributeName{ + types.MessageSystemAttributeNameAWSTraceHeader, + }, + MessageAttributeNames: []string{ + string(types.QueueAttributeNameAll), + }, + }) +} + +// Receive implements Receiver.Receive. +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { + select { + case msg, ok := <-p.incoming: + if !ok { + return nil, io.EOF + } + return msg, nil + case <-ctx.Done(): + return nil, io.EOF + } +} + +var ( + _ protocol.Receiver = (*Protocol)(nil) + _ protocol.Sender = (*Protocol)(nil) + _ protocol.Opener = (*Protocol)(nil) +) diff --git a/protocol/sqs/v2/write_message.go b/protocol/sqs/v2/write_message.go new file mode 100644 index 000000000..c8b43396d --- /dev/null +++ b/protocol/sqs/v2/write_message.go @@ -0,0 +1,117 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package sqs + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + cetypes "github.com/cloudevents/sdk-go/v2/types" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +func WriteMessageInput(ctx context.Context, m binding.Message, msgInput *sqs.SendMessageInput, transformers ...binding.Transformer) error { + structuredWriter := (*sqsMessageWriter)(msgInput) + binaryWriter := (*sqsMessageWriter)(msgInput) + + _, err := binding.Write( + ctx, + m, + structuredWriter, + binaryWriter, + transformers..., + ) + return err +} + +type sqsMessageWriter sqs.SendMessageInput + +// StructuredWriter implements StructuredWriter.SetStructuredEvent +func (w *sqsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { + val, err := io.ReadAll(event) + if err != nil { + return err + } + w.MessageBody = aws.String(string(val)) + w.MessageAttributes = make(map[string]types.MessageAttributeValue) + return nil +} + +// Start implements BinaryWriter.Start +func (b *sqsMessageWriter) Start(ctx context.Context) error { + b.MessageAttributes = make(map[string]types.MessageAttributeValue) + return nil +} + +// End implements BinaryWriter.End +func (b *sqsMessageWriter) End(ctx context.Context) error { + return nil +} + +// SetData implements BinaryWriter.SetData +func (b *sqsMessageWriter) SetData(reader io.Reader) error { + data, err := io.ReadAll(reader) + if err != nil { + return err + } + b.MessageBody = aws.String(string(data)) + return nil +} + +// SetAttribute implements MessageMetadataWriter.SetAttribute +func (b *sqsMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + if attribute.Kind() == spec.DataContentType { + if value == nil { + return nil + } + s, err := cetypes.Format(value) + if err != nil { + return err + } + b.MessageAttributes[contentType] = stringMessageAttribute(s) + } else { + prefixedName := withPrefix(attribute.Name()) + if value == nil { + delete(b.MessageAttributes, prefixedName) + return nil + } + convertedValue := fmt.Sprint(value) + if attribute.Kind().String() == spec.Time.String() { + timeValue := value.(time.Time) + convertedValue = timeValue.Format(time.RFC3339Nano) + } + b.MessageAttributes[prefixedName] = stringMessageAttribute(convertedValue) + } + return nil +} + +// SetExtension implements MessageMetadataWriter.SetExtension +func (b *sqsMessageWriter) SetExtension(name string, value interface{}) error { + prefixedName := withPrefix(name) + convertedValue := fmt.Sprint(value) + b.MessageAttributes[prefixedName] = stringMessageAttribute(convertedValue) + return nil +} + +var ( + _ binding.BinaryWriter = (*sqsMessageWriter)(nil) // Test it conforms to the interface + _ binding.StructuredWriter = (*sqsMessageWriter)(nil) // Test it conforms to the interface +) + +func stringMessageAttribute(val string) types.MessageAttributeValue { + return types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(val), + } +} diff --git a/samples/sqs/go.mod b/samples/sqs/go.mod new file mode 100644 index 000000000..de29606e4 --- /dev/null +++ b/samples/sqs/go.mod @@ -0,0 +1,38 @@ +module github.com/cloudevents/sdk-go/samples/sqs + +go 1.23.0 + +toolchain go1.23.8 + +require ( + github.com/aws/aws-sdk-go-v2/config v1.31.0 + github.com/cloudevents/sdk-go/protocol/sqs/v2 v2.0.0-00010101000000-000000000000 + github.com/cloudevents/sdk-go/v2 v2.16.1 + github.com/google/uuid v1.6.0 + github.com/kelseyhightower/envconfig v1.4.0 +) + +require ( + github.com/aws/aws-sdk-go-v2 v1.38.0 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sqs v1.41.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.37.0 // indirect + github.com/aws/smithy-go v1.22.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect +) + +replace github.com/cloudevents/sdk-go/v2 => ../../v2 + +replace github.com/cloudevents/sdk-go/protocol/sqs/v2 => ../../protocol/sqs/v2 diff --git a/samples/sqs/go.sum b/samples/sqs/go.sum new file mode 100644 index 000000000..e2eda5b72 --- /dev/null +++ b/samples/sqs/go.sum @@ -0,0 +1,63 @@ +github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU= +github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= +github.com/aws/aws-sdk-go-v2/config v1.31.0 h1:9yH0xiY5fUnVNLRWO0AtayqwU1ndriZdN78LlhruJR4= +github.com/aws/aws-sdk-go-v2/config v1.31.0/go.mod h1:VeV3K72nXnhbe4EuxxhzsDc/ByrCSlZwUnWH52Nde/I= +github.com/aws/aws-sdk-go-v2/credentials v1.18.4 h1:IPd0Algf1b+Qy9BcDp0sCUcIWdCQPSzDoMK3a8pcbUM= +github.com/aws/aws-sdk-go-v2/credentials v1.18.4/go.mod h1:nwg78FjH2qvsRM1EVZlX9WuGUJOL5od+0qvm0adEzHk= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3 h1:GicIdnekoJsjq9wqnvyi2elW6CGMSYKhdozE7/Svh78= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3/go.mod h1:R7BIi6WNC5mc1kfRM7XM/VHC3uRWkjc396sfabq4iOo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 h1:o9RnO+YZ4X+kt5Z7Nvcishlz0nksIt2PIzDglLMP0vA= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3/go.mod h1:+6aLJzOG1fvMOyzIySYjOFjcguGvVRL68R+uoRencN4= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzNU9TQSNRNeD9kOklqTzyk5v6s= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 h1:ieRzyHXypu5ByllM7Sp4hC5f/1Fy5wqxqY0yB85hC7s= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3/go.mod h1:O5ROz8jHiOAKAwx179v+7sHMhfobFVi6nZt8DEyiYoM= +github.com/aws/aws-sdk-go-v2/service/sqs v1.41.0 h1:xobvQ4NxlXFUNgVwE6cnMI/ww7K7jtQMWKor2Gi61Xg= +github.com/aws/aws-sdk-go-v2/service/sqs v1.41.0/go.mod h1:RExz4LhRKY5iogQ1dz7KVa3JyBY0PBotXovrDj850Sc= +github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 h1:Mc/MKBf2m4VynyJkABoVEN+QzkfLqGj0aiJuEe7cMeM= +github.com/aws/aws-sdk-go-v2/service/sso v1.28.0/go.mod h1:iS5OmxEcN4QIPXARGhavH7S8kETNL11kym6jhoS7IUQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 h1:6csaS/aJmqZQbKhi1EyEMM7yBW653Wy/B9hnBofW+sw= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0/go.mod h1:59qHWaY5B+Rs7HGTuVGaC32m0rdpQ68N8QCN3khYiqs= +github.com/aws/aws-sdk-go-v2/service/sts v1.37.0 h1:MG9VFW43M4A8BYeAfaJJZWrroinxeTi2r3+SnmLQfSA= +github.com/aws/aws-sdk-go-v2/service/sts v1.37.0/go.mod h1:JdeBDPgpJfuS6rU/hNglmOigKhyEZtBmbraLE4GK1J8= +github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= +github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/samples/sqs/receiver/main.go b/samples/sqs/receiver/main.go new file mode 100644 index 000000000..e9b8e44c7 --- /dev/null +++ b/samples/sqs/receiver/main.go @@ -0,0 +1,56 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/cloudevents/sdk-go/protocol/sqs/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/kelseyhightower/envconfig" +) + +type envConfig struct { + QueueURL string `envconfig:"AWS_SQS_QUEUE_URL" required:"true"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatalf("[ERROR] Failed to process env var: %s", err) + } + ctx := context.Background() + awsCfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + log.Fatalf("failed to load AWS SDK configuration: %s", err.Error()) + } + + // set a default topic with test-topic1 + p, err := sqs.New(env.QueueURL, sqs.WithNewClientFromConfig(awsCfg)) + if err != nil { + log.Fatalf("failed to create protocol: %v", err) + } + + c, err := cloudevents.NewClient(p) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Printf("receiver start consuming messages from test-topic\n") + err = c.StartReceiver(ctx, receive) + if err != nil { + log.Fatalf("failed to start receiver: %s", err) + } else { + log.Printf("receiver stopped\n") + } +} + +func receive(ctx context.Context, event cloudevents.Event) { + fmt.Printf("%s", event) +} diff --git a/samples/sqs/sender/main.go b/samples/sqs/sender/main.go new file mode 100644 index 000000000..b47e82129 --- /dev/null +++ b/samples/sqs/sender/main.go @@ -0,0 +1,75 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/cloudevents/sdk-go/protocol/sqs/v2" + "github.com/kelseyhightower/envconfig" +) + +const ( + count = 10 +) + +type envConfig struct { + QueueURL string `envconfig:"AWS_SQS_QUEUE_URL" required:"true"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatalf("[ERROR] Failed to process env var: %s", err) + } + ctx := context.Background() + awsCfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + log.Fatalf("failed to load AWS SDK configuration: %s", err.Error()) + } + + // set a default topic with test-topic1 + p, err := sqs.New(env.QueueURL, sqs.WithNewClientFromConfig(awsCfg)) + if err != nil { + log.Fatalf("failed to create protocol: %v", err) + } + + c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + for i := 0; i < count; i++ { + e := cloudevents.NewEvent() + e.SetID(uuid.New().String()) + e.SetType("com.cloudevents.sample.sent") + e.SetSource("https://github.com/cloudevents/sdk-go/samples/sqs/sender") + err = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "id": i, + "message": "Hello, World!", + }) + if err != nil { + log.Printf("failed to set data: %v", err) + } + if result := c.Send( + ctx, + // binding.WithForceStructured(ctx), + // binding.WithForceBinary(ctx), + e, + ); cloudevents.IsUndelivered(result) { + log.Printf("failed to send: %v", result) + } else { + log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + time.Sleep(1 * time.Second) + } +}