diff --git a/go.mod b/go.mod index ad58f50d3d..7b42c4e1d3 100644 --- a/go.mod +++ b/go.mod @@ -99,8 +99,8 @@ require ( github.com/microcosm-cc/bluemonday v1.0.27 github.com/microsoft/go-mssqldb v1.8.2 github.com/microsoft/gocosmos v1.1.1 - github.com/nats-io/nats.go v1.37.0 - github.com/nats-io/nkeys v0.4.7 + github.com/nats-io/nats.go v1.45.0 + github.com/nats-io/nkeys v0.4.11 github.com/nats-io/stan.go v0.10.4 github.com/neo4j/neo4j-go-driver/v5 v5.24.0 github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 diff --git a/go.sum b/go.sum index 2a4ebd9ee6..2db2881f77 100644 --- a/go.sum +++ b/go.sum @@ -1662,11 +1662,11 @@ github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/ github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= -github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= +github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0= diff --git a/internal/impl/nats/input_os.go b/internal/impl/nats/input_os.go new file mode 100644 index 0000000000..a6e842597c --- /dev/null +++ b/internal/impl/nats/input_os.go @@ -0,0 +1,253 @@ +package nats + +import ( + "context" + "fmt" + "sync" + + "github.com/Jeffail/shutdown" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + osiBucketField = "bucket" + osiIgnoreDeletesField = "ignore_deletes" + osiIncludeHistoryField = "include_history" + osiMetadataOnlyField = "meta_only" + osiTest = "test" +) + +func natsOSInputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Services"). + Version("1.8.0"). + Summary("Watches for updates in a nats object store."). + Description(` +### Metadata + +This input adds the following metadata fields to each message: + +` + "``` text" + ` +- nats_object_store_name +- nats_object_store_description +- nats_object_store_headers +- nats_object_store_metadata +- nats_object_store_bucket +- nats_object_store_nuid +- nats_object_store_size +- nats_object_store_modtime +- nats_object_store_chunks +- nats_object_store_digest +- nats_object_store_deleted + +` + "```" + ` + +` + connectionNameDescription() + authDescription()). + Fields(Docs("object store", []*service.ConfigField{ + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), + service.NewAutoRetryNacksToggleField(), + }...)...) +} + +func init() { + err := service.RegisterInput( + "nats_object_store", natsOSInputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + reader, err := newOSInput(conf, mgr) + if err != nil { + return nil, err + } + return service.AutoRetryNacksToggled(conf, reader) + }, + ) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +type osInput struct { + connDetails connectionDetails + bucket string + createBucket bool + + log *service.Logger + shutSig *shutdown.Signaller + + connMut sync.Mutex + natsConn *nats.Conn + watcher jetstream.ObjectWatcher + os jetstream.ObjectStore +} + +func newOSInput(conf *service.ParsedConfig, mgr *service.Resources) (*osInput, error) { + osi := osInput{ + log: mgr.Logger(), + shutSig: shutdown.NewSignaller(), + } + + var err error + if osi.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { + return nil, err + } + + if osi.bucket, err = conf.FieldString(osiBucketField); err != nil { + return nil, err + } + + if osi.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + + return &osi, nil +} + +//------------------------------------------------------------------------------ + +func (osi *osInput) Connect(ctx context.Context) (err error) { + osi.connMut.Lock() + defer osi.connMut.Unlock() + + if osi.natsConn != nil { + return nil + } + + defer func() { + if err != nil { + if osi.watcher != nil { + _ = osi.watcher.Stop() + } + if osi.natsConn != nil { + osi.natsConn.Close() + } + } + }() + + if osi.natsConn, err = osi.connDetails.get(ctx); err != nil { + return err + } + + js, err := jetstream.New(osi.natsConn) + if err != nil { + return err + } + + // Check if bucket exists first, create only if config allows + osi.os, err = js.ObjectStore(ctx, osi.bucket) + if err != nil { + if osi.createBucket { + osi.os, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{ + Bucket: osi.bucket, + }) + if err != nil { + return fmt.Errorf("failed to create bucket %s: %w", osi.bucket, err) + } + osi.log.Infof("Created bucket %s", osi.bucket) + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", osi.bucket) + } + } + + osi.watcher, err = osi.os.Watch(ctx, nil) + if err != nil { + return err + } + return nil +} + +func (osi *osInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + osi.connMut.Lock() + watcher := osi.watcher + osi.connMut.Unlock() + + if watcher == nil { + return nil, nil, service.ErrNotConnected + } + + for { + var objectInfo *jetstream.ObjectInfo + var open bool + + select { + case objectInfo, open = <-watcher.Updates(): + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + + if !open { + osi.disconnect() + return nil, nil, service.ErrNotConnected + } + + if objectInfo == nil { + continue + } + + msg, err := osi.newMessageFromObjectInfo(ctx, objectInfo) + if err != nil { + return nil, nil, err + } + + return msg, func(ctx context.Context, res error) error { + return nil + }, nil + } +} + +func (osi *osInput) Close(ctx context.Context) error { + go func() { + osi.disconnect() + osi.shutSig.TriggerHasStopped() + }() + select { + case <-osi.shutSig.HasStoppedChan(): + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +//------------------------------------------------------------------------------ + +func (osi *osInput) disconnect() { + osi.connMut.Lock() + defer osi.connMut.Unlock() + + if osi.watcher != nil { + _ = osi.watcher.Stop() + osi.watcher = nil + } + if osi.natsConn != nil { + osi.natsConn.Close() + osi.natsConn = nil + } +} + +func (osi *osInput) newMessageFromObjectInfo(ctx context.Context, object *jetstream.ObjectInfo) (*service.Message, error) { + objectBytes, err := osi.os.GetBytes(ctx, object.Name) + if err != nil { + return nil, err + } + + msg := service.NewMessage(objectBytes) + + msg.MetaSetMut("nats_object_store_name", object.Name) + msg.MetaSetMut("nats_object_store_description", object.Description) + msg.MetaSetMut("nats_object_store_headers", object.Headers) + msg.MetaSetMut("nats_object_store_metadata", object.Metadata) + msg.MetaSetMut("nats_object_store_bucket", object.Bucket) + msg.MetaSetMut("nats_object_store_nuid", object.NUID) + msg.MetaSetMut("nats_object_store_size", object.Size) + msg.MetaSetMut("nats_object_store_modtime", object.ModTime) + msg.MetaSetMut("nats_object_store_chunks", object.Chunks) + msg.MetaSetMut("nats_object_store_digest", object.Digest) + msg.MetaSetMut("nats_object_store_deleted", object.Deleted) + + return msg, nil +} diff --git a/internal/impl/nats/integration_os_test.go b/internal/impl/nats/integration_os_test.go new file mode 100644 index 0000000000..86525463c7 --- /dev/null +++ b/internal/impl/nats/integration_os_test.go @@ -0,0 +1,159 @@ +package nats + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/benthos/v4/public/service/integration" +) + +func TestIntegrationNatsObjectStore(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Second * 30 + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "nats", + Tag: "latest", + Cmd: []string{"--js", "--trace"}, + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + var natsConn *nats.Conn + _ = resource.Expire(900) + require.NoError(t, pool.Retry(func() error { + natsConn, err = nats.Connect(fmt.Sprintf("tcp://localhost:%v", resource.GetPort("4222/tcp"))) + return err + })) + t.Cleanup(func() { + natsConn.Close() + }) + + template := ` +output: + label: object_store_output + nats_object_store: + urls: + - tcp://localhost:$PORT + bucket: bucket-$ID + object_name: ${! ksuid() } + +input: + label: object_store_input + nats_object_store: + urls: + - tcp://localhost:$PORT + bucket: bucket-$ID +` + + suite := integration.StreamTests( + integration.StreamTestOpenClose(), + integration.StreamTestSendBatch(10), + integration.StreamTestStreamParallel(1000), + integration.StreamTestStreamSequential(1000), + integration.StreamTestStreamParallelLossy(1000), + integration.StreamTestStreamParallelLossyThroughReconnect(1000), + ) + suite.Run( + t, template, + integration.StreamTestOptPreTest(func(t testing.TB, _ context.Context, vars *integration.StreamTestConfigVars) { + js, err := jetstream.New(natsConn) + require.NoError(t, err) + + bucketName := "bucket-" + vars.ID + + _, err = js.CreateObjectStore(context.Background(), jetstream.ObjectStoreConfig{ + Bucket: bucketName, + }) + require.NoError(t, err) + }), + integration.StreamTestOptSleepAfterInput(100*time.Millisecond), + integration.StreamTestOptSleepAfterOutput(100*time.Millisecond), + integration.StreamTestOptPort(resource.GetPort("4222/tcp")), + ) + + t.Run("processor", func(t *testing.T) { + createBucket := func(t *testing.T) (jetstream.ObjectStore, string) { + + js, err := jetstream.New(natsConn) + require.NoError(t, err) + + bucketName := "object-store-bucket" + + bucket, err := js.CreateObjectStore(context.Background(), jetstream.ObjectStoreConfig{ + Bucket: bucketName, + }) + require.NoError(t, err) + + url := fmt.Sprintf("tcp://localhost:%v", resource.GetPort("4222/tcp")) + + return bucket, url + } + + process := func(yaml string) (service.MessageBatch, error) { + spec := natsOSProcessorConfig() + parsed, err := spec.ParseYAML(yaml, nil) + require.NoError(t, err) + + p, err := newOSProcessor(parsed, service.MockResources()) + require.NoError(t, err) + + m := service.NewMessage([]byte("hello")) + return p.Process(context.Background(), m) + } + + t.Run("get operation", func(t *testing.T) { + bucket, url := createBucket(t) + _, err := bucket.PutString(context.Background(), "blob", "lawblog") + require.NoError(t, err) + + yaml := fmt.Sprintf(` + bucket: object-store-bucket + operation: get + object_name: blob + urls: [%s]`, url) + + result, err := process(yaml) + require.NoError(t, err) + + m := result[0] + bytes, err := m.AsBytes() + require.NoError(t, err) + assert.Equal(t, []byte("lawblog"), bytes) + }) + + t.Run("put operation", func(t *testing.T) { + _, url := createBucket(t) + + yaml := fmt.Sprintf(` + bucket: object-store-bucket + operation: put + object_name: ting + urls: [%s]`, url) + + result, err := process(yaml) + require.NoError(t, err) + + m := result[0] + bytes, err := m.AsBytes() + require.NoError(t, err) + assert.Equal(t, []byte("hello"), bytes) + + }) + + }) +} diff --git a/internal/impl/nats/output_os.go b/internal/impl/nats/output_os.go new file mode 100644 index 0000000000..7624199e0e --- /dev/null +++ b/internal/impl/nats/output_os.go @@ -0,0 +1,198 @@ +package nats + +import ( + "context" + "fmt" + "sync" + + "github.com/Jeffail/shutdown" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + osoBucketField = "bucket" + osoNameField = "object_name" +) + +func natsOSOutputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Services"). + Version("1.8.0"). + Summary("Put messages in a NATS object-store bucket."). + Description(` +The field ` + "`object_name`" + ` supports +[interpolation functions](/docs/configuration/interpolation#bloblang-queries), allowing +you to create a unique object name for each message. + +` + connectionNameDescription() + authDescription()). + Fields(Docs("object store", []*service.ConfigField{ + service.NewInterpolatedStringField(osoNameField). + Description("The object name for each message."), + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), + service.NewOutputMaxInFlightField().Default(64), + }...)...) +} + +func init() { + err := service.RegisterOutput( + "nats_object_store", natsOSOutputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Output, int, error) { + maxInFlight, err := conf.FieldInt("max_in_flight") + if err != nil { + return nil, 0, err + } + o, err := newOSOutput(conf, mgr) + return o, maxInFlight, err + }) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +type osOutput struct { + connDetails connectionDetails + bucket string + objName *service.InterpolatedString + createBucket bool + + log *service.Logger + + connMut sync.Mutex + natsConn *nats.Conn + objectStore jetstream.ObjectStore + + shutSig *shutdown.Signaller +} + +func newOSOutput(conf *service.ParsedConfig, mgr *service.Resources) (*osOutput, error) { + oso := osOutput{ + log: mgr.Logger(), + shutSig: shutdown.NewSignaller(), + } + + var err error + if oso.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { + return nil, err + } + + if oso.bucket, err = conf.FieldString(osoBucketField); err != nil { + return nil, err + } + + if oso.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + + if oso.objName, err = conf.FieldInterpolatedString(osoNameField); err != nil { + return nil, err + } + + return &oso, nil +} + +//------------------------------------------------------------------------------ + +func (oso *osOutput) Connect(ctx context.Context) (err error) { + oso.connMut.Lock() + defer oso.connMut.Unlock() + + if oso.natsConn != nil { + return nil + } + + var natsConn *nats.Conn + + defer func() { + if err != nil && natsConn != nil { + natsConn.Close() + } + }() + + if natsConn, err = oso.connDetails.get(ctx); err != nil { + return err + } + + + js, err := jetstream.New(natsConn) + if err != nil { + return err + } + + // Check if bucket exists first, create only if config allows + oso.objectStore, err = js.ObjectStore(ctx, oso.bucket) + if err != nil { + if oso.createBucket { + oso.objectStore, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{ + Bucket: oso.bucket, + }) + if err != nil { + return fmt.Errorf("failed to create bucket %s: %w", oso.bucket, err) + } + oso.log.Infof("Created bucket %s", oso.bucket) + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", oso.bucket) + } + } + + oso.natsConn = natsConn + return nil +} + +func (oso *osOutput) Write(ctx context.Context, msg *service.Message) error { + oso.connMut.Lock() + objectStore := oso.objectStore + oso.connMut.Unlock() + if objectStore == nil { + return service.ErrNotConnected + } + + msgBytes, err := msg.AsBytes() + if err != nil { + return err + } + + objn, err := oso.objName.TryString(msg) + if err != nil { + return err + } + + _, err = objectStore.PutBytes(ctx, objn, msgBytes) + if err != nil { + return err + } + + return nil +} + +func (oso *osOutput) Close(ctx context.Context) error { + go func() { + oso.disconnect() + oso.shutSig.TriggerHasStopped() + }() + select { + case <-oso.shutSig.HasStoppedChan(): + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +//------------------------------------------------------------------------------ + +func (oso *osOutput) disconnect() { + oso.connMut.Lock() + defer oso.connMut.Unlock() + + if oso.natsConn != nil { + oso.natsConn.Close() + oso.natsConn = nil + } + oso.objectStore = nil +} diff --git a/internal/impl/nats/processor_os.go b/internal/impl/nats/processor_os.go new file mode 100644 index 0000000000..bf1e83b66e --- /dev/null +++ b/internal/impl/nats/processor_os.go @@ -0,0 +1,220 @@ +package nats + +import ( + "context" + "fmt" + "sync" + + "github.com/Jeffail/shutdown" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + ospOperationField = "operation" + ospBucketField = "bucket" + ospObjectNameField = "object_name" +) + +type ospOperationType string + +const ( + ospOperationGet ospOperationType = "get" + ospOperationPut ospOperationType = "put" +) + +var ospOperations = map[string]string{ + string(ospOperationGet): "Returns the latest value for the `object_name`.", + string(ospOperationPut): "Places a new value for the `object_name` in the object store.", +} + +func natsOSProcessorConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Services"). + Version("1.8.0"). + Summary("Perform operations on a NATS object store bucket."). + Description(` +### Operations + +The NATS object store processor supports ` + "`get`" + ` and ` + "`put`" + ` operations via the ` + "`operation`" + ` field. + +` + connectionNameDescription() + authDescription()). + Fields(Docs("object store", []*service.ConfigField{ + service.NewStringAnnotatedEnumField(ospOperationField, ospOperations). + Description("The operation to perform on the Object Store bucket."), + service.NewInterpolatedStringField(ospObjectNameField). + Description("The name of the object in the object store to operate on."), + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), + }...)...) +} + +func init() { + err := service.RegisterProcessor( + "nats_object_store", natsOSProcessorConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + return newOSProcessor(conf, mgr) + }, + ) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +type osProcessor struct { + connDetails connectionDetails + bucket string + operation ospOperationType + objectName *service.InterpolatedString + createBucket bool + + log *service.Logger + shutSig *shutdown.Signaller + + connMut sync.Mutex + natsConn *nats.Conn + os jetstream.ObjectStore +} + +func newOSProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*osProcessor, error) { + osp := &osProcessor{ + log: mgr.Logger(), + shutSig: shutdown.NewSignaller(), + } + + var err error + if osp.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { + return nil, err + } + + if osp.bucket, err = conf.FieldString(ospBucketField); err != nil { + return nil, err + } + + if osp.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + + if operation, err := conf.FieldString(ospOperationField); err != nil { + return nil, err + } else { + osp.operation = ospOperationType(operation) + } + + if osp.objectName, err = conf.FieldInterpolatedString(ospObjectNameField); err != nil { + return nil, err + } + + err = osp.connect(context.Background()) + return osp, err +} + +func (osp *osProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + osp.connMut.Lock() + os := osp.os + osp.connMut.Unlock() + + objectName, err := osp.objectName.TryString(msg) + if err != nil { + return nil, err + } + + switch osp.operation { + case ospOperationGet: + objectBytes, err := os.GetBytes(ctx, objectName) + if err != nil { + return nil, err + } + m := msg.Copy() + m.SetBytes(objectBytes) + return service.MessageBatch{m}, nil + + case ospOperationPut: + msgBytes, err := msg.AsBytes() + if err != nil { + return nil, err + } + + _, err = osp.os.PutBytes(ctx, objectName, msgBytes) + if err != nil { + return nil, err + } + + return service.MessageBatch{msg}, nil + + default: + return nil, fmt.Errorf("invalid nats_object_store operation: %s", osp.operation) + } +} + +func (osp *osProcessor) Close(ctx context.Context) error { + go func() { + osp.disconnect() + osp.shutSig.TriggerHasStopped() + }() + select { + case <-osp.shutSig.HasStoppedChan(): + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +func (osp *osProcessor) connect(ctx context.Context) (err error) { + osp.connMut.Lock() + defer osp.connMut.Unlock() + + if osp.natsConn != nil { + return nil + } + + defer func() { + if err != nil { + if osp.natsConn != nil { + osp.natsConn.Close() + } + } + }() + + if osp.natsConn, err = osp.connDetails.get(ctx); err != nil { + return err + } + + js, err := jetstream.New(osp.natsConn) + if err != nil { + return err + } + + // Check if bucket exists first, create only if config allows + osp.os, err = js.ObjectStore(ctx, osp.bucket) + if err != nil { + if osp.createBucket { + osp.os, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{ + Bucket: osp.bucket, + }) + if err != nil { + return fmt.Errorf("failed to create bucket %s: %w", osp.bucket, err) + } + osp.log.Infof("Created bucket %s", osp.bucket) + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", osp.bucket) + } + } + return nil +} + +func (osp *osProcessor) disconnect() { + osp.connMut.Lock() + defer osp.connMut.Unlock() + + if osp.natsConn != nil { + osp.natsConn.Close() + osp.natsConn = nil + } + osp.os = nil +}