Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/cloudevents/generic/options/builder/optionsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

Expand Down Expand Up @@ -56,7 +57,7 @@ func BuildCloudEventsSourceOptions(config any,
clientId, sourceId string, dataType types.CloudEventsDataType) (*options.CloudEventsSourceOptions, error) {
switch config := config.(type) {
case *mqtt.MQTTOptions:
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
return mqttv2.NewSourceOptions(config, clientId, sourceId), nil
case *grpc.GRPCOptions:
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
default:
Expand All @@ -69,7 +70,7 @@ func BuildCloudEventsAgentOptions(config any,
clusterName, clientId string, dataType types.CloudEventsDataType) (*options.CloudEventsAgentOptions, error) {
switch config := config.(type) {
case *mqtt.MQTTOptions:
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
return mqttv2.NewAgentOptions(config, clusterName, clientId), nil
case *grpc.GRPCOptions:
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
Timeout: 60 * time.Second,
},
},
expectedTransportType: "*mqtt.mqttSourceTransport",
expectedTransportType: "*mqtt.mqttTransport",
},
{
name: "grpc config",
Expand Down
118 changes: 70 additions & 48 deletions pkg/cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type mqttAgentTransport struct {
agentID string
}

// Deprecated: use v2.mqtt.NewAgentOptions instead
func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
mqttAgentOptions := &mqttAgentTransport{
MQTTOptions: *mqttOptions,
Expand All @@ -40,8 +41,6 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
}

func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
logger := klog.FromContext(ctx)

topic, err := getAgentPubTopic(ctx)
if err != nil {
return nil, err
Expand All @@ -51,59 +50,18 @@ func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
}

eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}

originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
if err != nil {
return nil, err
}

// agent request to sync resource spec from all sources
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
if len(o.Topics.AgentBroadcast) == 0 {
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")

// TODO after supporting multiple sources, we should list each source
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
}

resyncTopic := strings.Replace(o.Topics.AgentBroadcast, "+", o.clusterName, 1)
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
}

topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
pubTopic, err := AgentPubTopic(ctx, &o.MQTTOptions, o.clusterName, evtCtx)
if err != nil {
return nil, err
}

// agent publishes status events or spec resync events
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
eventsTopic = replaceLast(eventsTopic, "+", topicSource)
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
}

func (o *mqttAgentTransport) Connect(ctx context.Context) error {
subscribe := &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
// the future, client may need a source list, it will subscribe to each source
// receiving the sources events
Topic: replaceLast(o.Topics.SourceEvents, "+", o.clusterName), QoS: byte(o.SubQoS),
},
},
}

// receiving status resync events from all sources
if len(o.Topics.SourceBroadcast) != 0 {
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
Topic: o.Topics.SourceBroadcast,
QoS: byte(o.SubQoS),
})
subscribe, err := AgentSubscribe(&o.MQTTOptions, o.clusterName)
if err != nil {
return err
}

protocol, err := o.GetCloudEventsProtocol(
Expand Down Expand Up @@ -157,3 +115,67 @@ func (o *mqttAgentTransport) Close(ctx context.Context) error {
func (o *mqttAgentTransport) ErrorChan() <-chan error {
return o.errorChan
}

func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
logger := klog.FromContext(ctx)

ceType := evtCtx.GetType()
eventType, err := types.ParseCloudEventsType(ceType)
if err != nil {
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
}

originalSourceVal, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
if err != nil {
return "", err
}

originalSource, ok := originalSourceVal.(string)
if !ok {
return "", fmt.Errorf("originalsource extension must be a string, got %T", originalSourceVal)
}

// agent request to sync resource spec from all sources
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
if len(o.Topics.AgentBroadcast) == 0 {
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")

// TODO after supporting multiple sources, we should list each source
return replaceLast(o.Topics.AgentEvents, "+", clusterName), nil
}

return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil
}

topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
if err != nil {
return "", err
}

// agent publishes status events or spec resync events
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", clusterName)
return replaceLast(eventsTopic, "+", topicSource), nil
}

func AgentSubscribe(o *MQTTOptions, clusterName string) (*paho.Subscribe, error) {
subscribe := &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
// the future, client may need a source list, it will subscribe to each source
// receiving the sources events
Topic: replaceLast(o.Topics.SourceEvents, "+", clusterName), QoS: byte(o.SubQoS),
},
},
}

// receiving status resync events from all sources
if len(o.Topics.SourceBroadcast) != 0 {
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
Topic: o.Topics.SourceBroadcast,
QoS: byte(o.SubQoS),
})
}

return subscribe, nil
}
10 changes: 8 additions & 2 deletions pkg/cloudevents/generic/options/mqtt/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"fmt"

"github.com/eclipse/paho.golang/paho/log"
"k8s.io/klog/v2"
)
Expand All @@ -14,8 +15,13 @@ type PahoDebugLogger struct {
logger klog.Logger
}

var _ log.Logger = &PahoErrorLogger{}
var _ log.Logger = &PahoDebugLogger{}
func NewPahoErrorLogger(logger klog.Logger) log.Logger {
return &PahoErrorLogger{logger: logger}
}

func NewPahoDebugLogger(logger klog.Logger) log.Logger {
return &PahoDebugLogger{logger: logger}
}

func (l *PahoErrorLogger) Println(v ...interface{}) {
l.logger.Error(fmt.Errorf("get err %s", fmt.Sprint(v...)), "MQTT error message")
Expand Down
7 changes: 4 additions & 3 deletions pkg/cloudevents/generic/options/mqtt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"crypto/tls"
"fmt"
"k8s.io/klog/v2"
"net"
"os"
"regexp"
"strings"
"time"

"k8s.io/klog/v2"

cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
Expand Down Expand Up @@ -235,8 +236,8 @@ func (o *MQTTOptions) GetCloudEventsProtocol(

opts := []cloudeventsmqtt.Option{
cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID)),
cloudeventsmqtt.WithDebugLogger(&PahoDebugLogger{logger: logger}),
cloudeventsmqtt.WithErrorLogger(&PahoErrorLogger{logger: logger}),
cloudeventsmqtt.WithDebugLogger(NewPahoDebugLogger(logger)),
cloudeventsmqtt.WithErrorLogger(NewPahoErrorLogger(logger)),
}
opts = append(opts, clientOpts...)
return cloudeventsmqtt.New(ctx, config, opts...)
Expand Down
107 changes: 66 additions & 41 deletions pkg/cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type mqttSourceTransport struct {
clientID string
}

// Deprecated: use v2.mqtt.NewSourceOptions instead
func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
mqttSourceOptions := &mqttSourceTransport{
MQTTOptions: *mqttOptions,
Expand All @@ -47,58 +48,20 @@ func (o *mqttSourceTransport) WithContext(ctx context.Context, evtCtx cloudevent
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
}

eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}

clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
pubTopic, err := SourcePubTopic(ctx, &o.MQTTOptions, o.sourceID, evtCtx)
if err != nil {
return nil, err
}

if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
// source request to get resources status from all agents
if len(o.Topics.SourceBroadcast) == 0 {
return nil, fmt.Errorf("the source broadcast topic not set")
}

resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", o.sourceID, 1)
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
}

// source publishes spec events or status resync events
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
}

func (o *mqttSourceTransport) Connect(ctx context.Context) error {
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
subscribe, err := SourceSubscribe(&o.MQTTOptions, o.sourceID)
if err != nil {
return err
}

if topicSource != o.sourceID {
return fmt.Errorf("the topic source %q does not match with the client sourceID %q",
o.Topics.AgentEvents, o.sourceID)
}

subscribe := &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
},
},
}

if len(o.Topics.AgentBroadcast) != 0 {
// receiving spec resync events from all agents
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
Topic: o.Topics.AgentBroadcast,
QoS: byte(o.SubQoS),
})
}

protocol, err := o.GetCloudEventsProtocol(
ctx,
o.clientID,
Expand Down Expand Up @@ -151,3 +114,65 @@ func (o *mqttSourceTransport) Close(ctx context.Context) error {
func (o *mqttSourceTransport) ErrorChan() <-chan error {
return o.errorChan
}

func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
ceType := evtCtx.GetType()
eventType, err := types.ParseCloudEventsType(ceType)
if err != nil {
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
}

clusterNameVal, err := evtCtx.GetExtension(types.ExtensionClusterName)
if err != nil {
return "", err
}

clusterName, ok := clusterNameVal.(string)
if !ok {
return "", fmt.Errorf("clustername extension must be a string, got %T", clusterNameVal)
}

if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
// source request to get resources status from all agents
if len(o.Topics.SourceBroadcast) == 0 {
return "", fmt.Errorf("the source broadcast topic not set")
}

resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", sourceID, 1)
return resyncTopic, nil
}

// source publishes spec events or status resync events
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", clusterName, 1)
return eventsTopic, nil
}

func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
if err != nil {
return nil, err
}

if topicSource != sourceID {
return nil, fmt.Errorf("the topic source %q does not match the client sourceID %q",
topicSource, sourceID)
}

subscribe := &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
},
},
}

if len(o.Topics.AgentBroadcast) != 0 {
// receiving spec resync events from all agents
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
Topic: o.Topics.AgentBroadcast,
QoS: byte(o.SubQoS),
})
}

return subscribe, nil
}
Loading