Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ func CloneBytes(b []byte) []byte {
return clone
}

func SafeCloseDoneChan(c chan<- struct{}) (ok bool) {
defer func() {
ok = recover() == nil
}()
close(c)
return
}

func ToMilliseconds(duration time.Duration) int64 {
return int64(duration) / 1e6
}
37 changes: 18 additions & 19 deletions rx/mono/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"github.com/jjeffcaii/reactor-go"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/payload"
"go.uber.org/atomic"
)

type blockSubscriber struct {
done chan struct{}
vchan chan<- payload.Payload
echan chan<- error
// Atomic bool to ensure that 'done' is closed only once.
isDone *atomic.Bool
done chan struct{}
vchan chan<- payload.Payload
echan chan<- error
}

func newBlockSubscriber(
Expand All @@ -20,34 +23,30 @@ func newBlockSubscriber(
echan chan<- error,
) reactor.Subscriber {
return blockSubscriber{
done: done,
vchan: vchan,
echan: echan,
isDone: atomic.NewBool(false),
done: done,
vchan: vchan,
echan: echan,
}
}

func (b blockSubscriber) OnComplete() {
select {
case <-b.done:
default:
_ = common.SafeCloseDoneChan(b.done)
swapped := b.isDone.CAS(false, true)
if swapped {
close(b.done)
}
}

func (b blockSubscriber) OnError(err error) {
select {
case <-b.done:
default:
if common.SafeCloseDoneChan(b.done) {
b.echan <- err
}
swapped := b.isDone.CAS(false, true)
if swapped {
b.echan <- err
close(b.done)
}
}

func (b blockSubscriber) OnNext(any reactor.Any) {
select {
case <-b.done:
default:
if !b.isDone.Load() {
if r, ok := any.(common.Releasable); ok {
r.IncRef()
}
Expand Down
4 changes: 1 addition & 3 deletions rx/mono/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,11 @@ func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) {
done := make(chan struct{})
vchan := make(chan payload.Payload, 1)
echan := make(chan error, 1)
// 'blockSubscriber' takes ownership of the above channels (w.r.t. closing them)
b := newBlockSubscriber(done, vchan, echan)
m.SubscribeWith(ctx, b)
<-done

defer close(vchan)
defer close(echan)

select {
case value := <-vchan:
return value, nil
Expand Down
Loading