Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ linters:
- varnamelen
- wrapcheck
- wsl
- noinlineerr
settings:
cyclop:
max-complexity: 15
Expand Down
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ require (
github.com/KyberNetwork/cclog v1.1.2
github.com/KyberNetwork/kyber-trace-go v0.1.2
github.com/TheZeroSlave/zapsentry v1.23.0
github.com/andybalholm/brotli v1.2.0
github.com/cskr/pubsub v1.0.2
github.com/duoxehyon/mev-share-go v0.3.0
github.com/ethereum/go-ethereum v1.15.10
github.com/flashbots/mev-share-node v0.0.0-20240517155750-67003f8e8700
github.com/getsentry/sentry-go v0.32.0
github.com/golang-migrate/migrate/v4 v4.18.3
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/holiman/uint256 v1.3.2
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
Expand All @@ -31,7 +34,6 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/VictoriaMetrics/metrics v1.35.4 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/bits-and-blooms/bitset v1.22.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -43,7 +45,6 @@ require (
github.com/deckarep/golang-set/v2 v2.8.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dmarkham/enumer v1.6.1 // indirect
github.com/ethereum/c-kzg-4844 v1.0.3 // indirect
github.com/ethereum/go-verkle v0.2.2 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
Expand All @@ -52,7 +53,6 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/gofrs/flock v0.12.1 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand All @@ -66,7 +66,6 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pascaldekloe/name v1.0.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pierrec/lz4/v3 v3.3.5 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
Expand Down Expand Up @@ -101,13 +100,11 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/exp v0.0.0-20250811191247-51f88131bc50 // indirect
golang.org/x/mod v0.27.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.36.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250428153025-10db94c68c34 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
google.golang.org/grpc v1.72.0 // indirect
Expand Down
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOV
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a/go.mod h1:sTwzHBvIzm2RfVCGNEBZgRyjwK40bVoun3ZnGOCafNM=
github.com/crate-crypto/go-kzg-4844 v1.1.0 h1:EN/u9k2TF6OWSHrCCDBBU6GLNMq88OspHHlMnHfoyU4=
github.com/crate-crypto/go-kzg-4844 v1.1.0/go.mod h1:JolLjpSff1tCCJKaJx4psrlEdlXuJEC996PL3tTAFks=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand All @@ -100,8 +102,6 @@ github.com/dhui/dktest v0.4.5 h1:uUfYBIVREmj/Rw6MvgmqNAYzTiKOHJak+enB5Di73MM=
github.com/dhui/dktest v0.4.5/go.mod h1:tmcyeHDKagvlDrz7gDKq4UAJOLIfVZYkfD5OnHDwcCo=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dmarkham/enumer v1.6.1 h1:aSc9awYtZL07TUueWs40QcHtxTvHTAwG0EqrNsK45w4=
github.com/dmarkham/enumer v1.6.1/go.mod h1:yixql+kDDQRYqcuBM2n9Vlt7NoT9ixgXhaXry8vmRg8=
github.com/docker/docker v27.2.0+incompatible h1:Rk9nIVdfH3+Vz4cyI/uhbINhEZ/oLmc+CBXmH6fbNk4=
github.com/docker/docker v27.2.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
Expand Down Expand Up @@ -351,8 +351,6 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/name v1.0.0 h1:n7LKFgHixETzxpRv2R77YgPUFo85QHGZKrdaYm7eY5U=
github.com/pascaldekloe/name v1.0.0/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down Expand Up @@ -483,6 +481,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI=
github.com/ybbus/jsonrpc/v3 v3.1.6 h1:Yn2dNOZ3xVdOVxq9d4GaH2pxjjei27C+vU6H+E7hfAw=
github.com/ybbus/jsonrpc/v3 v3.1.6/go.mod h1:U1QbyNfL5Pvi2roT0OpRbJeyvGxfWYSgKJHjxWdAEeE=
Expand Down Expand Up @@ -545,8 +545,6 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down Expand Up @@ -639,8 +637,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
166 changes: 166 additions & 0 deletions pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package eventbus

import (
"context"
"sync"

"github.com/cskr/pubsub"
)

type Manager struct {
rw sync.RWMutex
subs map[Topic]Subscription
}

func NewManager() *Manager {
return &Manager{
rw: sync.RWMutex{},
subs: make(map[Topic]Subscription),
}
}

type Subscription struct {
topic string
ps *pubsub.PubSub
subscriber map[SubscriberID]chan any
}

// StartConsume subscribe with a handler.
func (m *Manager) StartConsume(ctx context.Context, consumerName string, topic Topic, fn Handler) {
id, c := m.Subscribe(consumerName, topic)

for {
select {
case <-ctx.Done():
m.Unsubscribe(topic, id)

return

case v, ok := <-c:
if !ok {
continue
}

// Ignore error here, it should be handled in handler.
_ = fn(v)
}
}
}

// StartConsumeMultiple subscribe with a handler.
func (m *Manager) StartConsumeMultiple(ctx context.Context, consumerName string, topic Topic, fn Handler, worker int) {
id, c := m.Subscribe(consumerName, topic)

wg := sync.WaitGroup{}
defer wg.Wait()

for i := 0; i < worker; i++ { //nolint:modernize
wg.Add(1)

go func() { //nolint:modernize
defer wg.Done()

for {
select {
case <-ctx.Done():
m.Unsubscribe(topic, id)

return

case v, ok := <-c:
if !ok {
continue
}

// Ignore error here, it should be handled in handler.
_ = fn(v)
}
}
}()
}
}

func (m *Manager) Publish(topic Topic, message any) {
sub, exist := m.getSub(topic)
if !exist {
m.rw.Lock()
{
sub = Subscription{
topic: string(topic),
ps: pubsub.New(defaultBufferLength),
subscriber: make(map[SubscriberID]chan any),
}

m.subs[topic] = sub
}

m.rw.Unlock()
}

sub.ps.Pub(message, sub.topic)
}

func (m *Manager) Subscribe(consumerName string, topic Topic) (SubscriberID, <-chan any) {
m.rw.Lock()
defer m.rw.Unlock()

subscription, exist := m.subs[topic]
if !exist {
subscription = Subscription{
topic: string(topic),
ps: pubsub.New(defaultBufferLength),
subscriber: make(map[SubscriberID]chan any),
}

m.subs[topic] = subscription
}

id := newSubscriberID(consumerName)
c := subscription.ps.Sub(subscription.topic)
subscription.subscriber[id] = c

return id, c
}

func (m *Manager) Unsubscribe(topic Topic, id SubscriberID) {
m.rw.Lock()
defer m.rw.Unlock()

sub, exist := m.subs[topic]
if !exist {
return
}

c, exist := sub.subscriber[id]
if !exist {
return
}

sub.ps.Unsub(c, sub.topic)
delete(sub.subscriber, id)
}

func (m *Manager) getSub(topic Topic) (Subscription, bool) {
m.rw.RLock()
defer m.rw.RUnlock()

s, exist := m.subs[topic]

return s, exist
}

func (m *Manager) GetStats() map[string]map[string]int64 {
m.rw.RLock()
defer m.rw.RUnlock()

stats := make(map[string]map[string]int64)

for topic, sub := range m.subs {
stats[string(topic)] = make(map[string]int64)
for subID, c := range sub.subscriber {
stats[string(topic)][string(subID)] = int64(len(c))
}
}

return stats
}
59 changes: 59 additions & 0 deletions pkg/eventbus/eventbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package eventbus_test

import (
"context"
"testing"
"time"

"github.com/KyberNetwork/tradinglib/pkg/eventbus"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func Test1(t *testing.T) {
l, _ := zap.NewDevelopment()
zap.ReplaceGlobals(l)

var (
m = eventbus.NewManager()
topic eventbus.Topic = "hello"
msg = "world"
)

id, c := m.Subscribe("", topic)
t.Log("id", id)
m.Publish(topic, msg)

s := <-c
t.Log("s", s)
}

func Test2(t *testing.T) {
l, _ := zap.NewDevelopment()
zap.ReplaceGlobals(l)

var (
m = eventbus.NewManager()
topic eventbus.Topic = "hello"
msg = "world"
ctx, cancel = context.WithCancel(context.Background())
)

go m.StartConsume(
ctx,
"",
topic,
func(i any) error {
s, ok := i.(string)
assert.True(t, ok, i)

assert.Equal(t, msg, s)
cancel()

return nil
})

time.Sleep(time.Second)
m.Publish(topic, msg)
<-ctx.Done()
}
20 changes: 20 additions & 0 deletions pkg/eventbus/pkg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package eventbus

import (
"math/rand"
"strconv"
)

const (
defaultBufferLength = 8048
)

type Handler func(any) error

type Topic string

type SubscriberID string

func newSubscriberID(consumerName string) SubscriberID {
return SubscriberID(consumerName + "-" + strconv.Itoa(rand.Int())) //nolint:gosec
}
Loading