Skip to content

Commit 2bf81bc

Browse files
authored
Merge branch 'main' into gh-1026-expose-publishSettings
2 parents 1b98e55 + 8efefb0 commit 2bf81bc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2616
-226
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* @cloudevents/sdk-go-maintainers

.github/workflows/go-lint.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030

3131
- name: Go Lint on ./v2
3232
if: steps.golangci_configuration.outputs.files_exists == 'true'
33-
uses: golangci/golangci-lint-action@v4
33+
uses: golangci/golangci-lint-action@v6
3434
with:
3535
version: v1.54
3636
working-directory: v2

.github/workflows/integration.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ jobs:
2727
- 9091:9091
2828
- 9092:9092
2929

30+
kafka_confluent:
31+
image: confluentinc/confluent-local:7.6.0
32+
ports:
33+
- "9192:9192"
34+
env:
35+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29192,PLAINTEXT_HOST://localhost:9192'
36+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:29193'
37+
KAFKA_LISTENERS: 'PLAINTEXT://localhost:29192,CONTROLLER://localhost:29193,PLAINTEXT_HOST://0.0.0.0:9192'
38+
3039
natss:
3140
image: nats-streaming:0.22.1
3241
ports:

docs/Gemfile.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,14 @@ GEM
201201
rb-fsevent (~> 0.10, >= 0.10.3)
202202
rb-inotify (~> 0.9, >= 0.9.10)
203203
mercenary (0.3.6)
204-
mini_portile2 (2.8.5)
204+
mini_portile2 (2.8.6)
205205
minima (2.5.1)
206206
jekyll (>= 3.5, < 5.0)
207207
jekyll-feed (~> 0.9)
208208
jekyll-seo-tag (~> 2.1)
209209
minitest (5.17.0)
210210
multipart-post (2.1.1)
211-
nokogiri (1.16.2)
211+
nokogiri (1.16.5)
212212
mini_portile2 (~> 2.8.2)
213213
racc (~> 1.4)
214214
octokit (4.18.0)

docs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ err := json.Unmarshal(bytes, &event)
124124
| AVRO Event Format | :x: | :x: |
125125
| [HTTP Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/http) | :heavy_check_mark: | :heavy_check_mark: |
126126
| [JSON Event Format](event_data_structure.md#marshalunmarshal-event-to-json) | :heavy_check_mark: | :heavy_check_mark: |
127-
| [Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
127+
| [Sarama Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
128+
| [Confluent Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka_confluent) | :heavy_check_mark: | :heavy_check_mark: |
128129
| MQTT Protocol Binding | :x: | :x: |
129130
| [NATS Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/nats) | :heavy_check_mark: | :heavy_check_mark: |
130131
| [STAN Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/stan) | :heavy_check_mark: | :heavy_check_mark: |

observability/opencensus/v2/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ require (
2222
go.uber.org/atomic v1.4.0 // indirect
2323
go.uber.org/multierr v1.1.0 // indirect
2424
go.uber.org/zap v1.10.0 // indirect
25-
golang.org/x/net v0.17.0 // indirect
25+
golang.org/x/net v0.23.0 // indirect
2626
gopkg.in/yaml.v3 v3.0.1 // indirect
2727
)
2828

observability/opencensus/v2/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
8080
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
8181
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
8282
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
83-
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
84-
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
83+
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
84+
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
8585
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
8686
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
8787
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -97,11 +97,11 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
9797
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
9898
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
9999
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
100-
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
100+
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
101101
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
102102
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
103103
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
104-
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
104+
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
105105
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
106106
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
107107
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

protocol/amqp/v2/message.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ func NewMessage(message *amqp.Message, receiver *amqp.Receiver) *Message {
5050
return &Message{AMQP: message, AMQPrcv: receiver, format: fmt, version: vn}
5151
}
5252

53-
var _ binding.Message = (*Message)(nil)
54-
var _ binding.MessageMetadataReader = (*Message)(nil)
53+
var (
54+
_ binding.Message = (*Message)(nil)
55+
_ binding.MessageMetadataReader = (*Message)(nil)
56+
)
5557

5658
func getSpecVersion(message *amqp.Message) spec.Version {
5759
if sv, ok := message.ApplicationProperties[specs.PrefixedSpecVersionName()]; ok {
@@ -74,7 +76,8 @@ func (m *Message) ReadEncoding() binding.Encoding {
7476

7577
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
7678
if m.format != nil {
77-
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.AMQP.GetData()))
79+
data := m.getAmqpData()
80+
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(data))
7881
}
7982
return binding.ErrNotStructured
8083
}
@@ -106,7 +109,7 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
106109
}
107110
}
108111

109-
data := m.AMQP.GetData()
112+
data := m.getAmqpData()
110113
if len(data) != 0 { // Some data
111114
err = encoder.SetData(bytes.NewBuffer(data))
112115
if err != nil {
@@ -137,3 +140,15 @@ func (m *Message) Finish(err error) error {
137140
}
138141
return m.AMQPrcv.AcceptMessage(context.Background(), m.AMQP)
139142
}
143+
144+
// fixes: github.com/cloudevents/spec/issues/1275
145+
func (m *Message) getAmqpData() []byte {
146+
var data []byte
147+
amqpData := m.AMQP.Data
148+
149+
// TODO: replace with slices.Concat once go mod bumped to 1.22
150+
for idx := range amqpData {
151+
data = append(data, amqpData[idx]...)
152+
}
153+
return data
154+
}

protocol/amqp/v2/message_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,58 @@ func TestNewMessage_message_unknown(t *testing.T) {
6262
got := NewMessage(message, &rcv)
6363
require.Equal(t, binding.EncodingUnknown, got.ReadEncoding())
6464
}
65+
66+
func TestMessage_getAmqpData(t *testing.T) {
67+
tests := []struct {
68+
name string
69+
message *amqp.Message
70+
want []byte
71+
}{
72+
{
73+
name: "nil data",
74+
message: amqp.NewMessage(nil),
75+
want: nil,
76+
},
77+
{
78+
name: "empty string",
79+
message: amqp.NewMessage([]byte(`""`)),
80+
want: []byte(`""`),
81+
},
82+
{
83+
name: "simple string",
84+
message: amqp.NewMessage([]byte("hello world")),
85+
want: []byte("hello world"),
86+
},
87+
{
88+
name: "multiple data with simple strings",
89+
message: &amqp.Message{Data: [][]byte{
90+
[]byte("hello"),
91+
[]byte(" "),
92+
[]byte("world"),
93+
}},
94+
want: []byte("hello world"),
95+
},
96+
{
97+
name: "multiple data to build JSON array",
98+
message: &amqp.Message{Data: [][]byte{
99+
[]byte("["),
100+
[]byte("Foo"),
101+
[]byte(","),
102+
[]byte("Bar"),
103+
[]byte(","),
104+
[]byte("Baz"),
105+
[]byte("]"),
106+
}},
107+
want: []byte("[Foo,Bar,Baz]"),
108+
},
109+
}
110+
for _, tt := range tests {
111+
t.Run(tt.name, func(t *testing.T) {
112+
m := &Message{
113+
AMQP: tt.message,
114+
}
115+
got := m.getAmqpData()
116+
require.Equal(t, tt.want, got)
117+
})
118+
}
119+
}

protocol/kafka_confluent/v2/go.mod

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
module github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2
2+
3+
go 1.18
4+
5+
replace github.com/cloudevents/sdk-go/v2 => ../../../v2
6+
7+
require (
8+
github.com/cloudevents/sdk-go/v2 v2.15.2
9+
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
10+
github.com/stretchr/testify v1.8.4
11+
)
12+
13+
require (
14+
github.com/davecgh/go-spew v1.1.1 // indirect
15+
github.com/google/go-cmp v0.5.9 // indirect
16+
github.com/google/uuid v1.3.0 // indirect
17+
github.com/json-iterator/go v1.1.11 // indirect
18+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
19+
github.com/modern-go/reflect2 v1.0.1 // indirect
20+
github.com/pmezard/go-difflib v1.0.0 // indirect
21+
go.uber.org/atomic v1.4.0 // indirect
22+
go.uber.org/multierr v1.1.0 // indirect
23+
go.uber.org/zap v1.10.0 // indirect
24+
gopkg.in/yaml.v3 v3.0.1 // indirect
25+
)

0 commit comments

Comments
 (0)