diff --git a/.golangci.yml b/.golangci.yml index b32a068..4bebb00 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -27,6 +27,7 @@ linters: - varnamelen - wrapcheck - wsl + - noinlineerr settings: cyclop: max-complexity: 15 diff --git a/go.mod b/go.mod index cc827b6..63d8b2b 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/KyberNetwork/cclog v1.1.2 github.com/KyberNetwork/kyber-trace-go v0.1.2 github.com/TheZeroSlave/zapsentry v1.23.0 + github.com/andybalholm/brotli v1.2.0 + github.com/cskr/pubsub v1.0.2 github.com/duoxehyon/mev-share-go v0.3.0 github.com/ethereum/go-ethereum v1.15.10 github.com/flashbots/mev-share-node v0.0.0-20240517155750-67003f8e8700 @@ -15,6 +17,7 @@ require ( github.com/golang-migrate/migrate/v4 v4.18.3 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/holiman/uint256 v1.3.2 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 @@ -31,7 +34,6 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect github.com/VictoriaMetrics/metrics v1.35.4 // indirect - github.com/andybalholm/brotli v1.2.0 // indirect github.com/bits-and-blooms/bitset v1.22.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -43,7 +45,6 @@ require ( github.com/deckarep/golang-set/v2 v2.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/dmarkham/enumer v1.6.1 // indirect github.com/ethereum/c-kzg-4844 v1.0.3 // indirect github.com/ethereum/go-verkle v0.2.2 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect @@ -52,7 +53,6 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/gofrs/flock v0.12.1 // indirect github.com/golang/snappy v1.0.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -66,7 +66,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect - github.com/pascaldekloe/name v1.0.0 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pierrec/lz4/v3 v3.3.5 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect @@ -101,13 +100,11 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.41.0 // indirect golang.org/x/exp v0.0.0-20250811191247-51f88131bc50 // indirect - golang.org/x/mod v0.27.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.11.0 // indirect - golang.org/x/tools v0.36.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250428153025-10db94c68c34 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect google.golang.org/grpc v1.72.0 // indirect diff --git a/go.sum b/go.sum index 15d7850..cf66af5 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,8 @@ github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOV github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a/go.mod h1:sTwzHBvIzm2RfVCGNEBZgRyjwK40bVoun3ZnGOCafNM= github.com/crate-crypto/go-kzg-4844 v1.1.0 h1:EN/u9k2TF6OWSHrCCDBBU6GLNMq88OspHHlMnHfoyU4= github.com/crate-crypto/go-kzg-4844 v1.1.0/go.mod h1:JolLjpSff1tCCJKaJx4psrlEdlXuJEC996PL3tTAFks= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= +github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -100,8 +102,6 @@ github.com/dhui/dktest v0.4.5 h1:uUfYBIVREmj/Rw6MvgmqNAYzTiKOHJak+enB5Di73MM= github.com/dhui/dktest v0.4.5/go.mod h1:tmcyeHDKagvlDrz7gDKq4UAJOLIfVZYkfD5OnHDwcCo= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/dmarkham/enumer v1.6.1 h1:aSc9awYtZL07TUueWs40QcHtxTvHTAwG0EqrNsK45w4= -github.com/dmarkham/enumer v1.6.1/go.mod h1:yixql+kDDQRYqcuBM2n9Vlt7NoT9ixgXhaXry8vmRg8= github.com/docker/docker v27.2.0+incompatible h1:Rk9nIVdfH3+Vz4cyI/uhbINhEZ/oLmc+CBXmH6fbNk4= github.com/docker/docker v27.2.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= @@ -351,8 +351,6 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/pascaldekloe/name v1.0.0 h1:n7LKFgHixETzxpRv2R77YgPUFo85QHGZKrdaYm7eY5U= -github.com/pascaldekloe/name v1.0.0/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -483,6 +481,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= github.com/ybbus/jsonrpc/v3 v3.1.6 h1:Yn2dNOZ3xVdOVxq9d4GaH2pxjjei27C+vU6H+E7hfAw= github.com/ybbus/jsonrpc/v3 v3.1.6/go.mod h1:U1QbyNfL5Pvi2roT0OpRbJeyvGxfWYSgKJHjxWdAEeE= @@ -545,8 +545,6 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= -golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -639,8 +637,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= -golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/eventbus/eventbus.go b/pkg/eventbus/eventbus.go new file mode 100644 index 0000000..5707327 --- /dev/null +++ b/pkg/eventbus/eventbus.go @@ -0,0 +1,166 @@ +package eventbus + +import ( + "context" + "sync" + + "github.com/cskr/pubsub" +) + +type Manager struct { + rw sync.RWMutex + subs map[Topic]Subscription +} + +func NewManager() *Manager { + return &Manager{ + rw: sync.RWMutex{}, + subs: make(map[Topic]Subscription), + } +} + +type Subscription struct { + topic string + ps *pubsub.PubSub + subscriber map[SubscriberID]chan any +} + +// StartConsume subscribe with a handler. +func (m *Manager) StartConsume(ctx context.Context, consumerName string, topic Topic, fn Handler) { + id, c := m.Subscribe(consumerName, topic) + + for { + select { + case <-ctx.Done(): + m.Unsubscribe(topic, id) + + return + + case v, ok := <-c: + if !ok { + continue + } + + // Ignore error here, it should be handled in handler. + _ = fn(v) + } + } +} + +// StartConsumeMultiple subscribe with a handler. +func (m *Manager) StartConsumeMultiple(ctx context.Context, consumerName string, topic Topic, fn Handler, worker int) { + id, c := m.Subscribe(consumerName, topic) + + wg := sync.WaitGroup{} + defer wg.Wait() + + for i := 0; i < worker; i++ { //nolint:modernize + wg.Add(1) + + go func() { //nolint:modernize + defer wg.Done() + + for { + select { + case <-ctx.Done(): + m.Unsubscribe(topic, id) + + return + + case v, ok := <-c: + if !ok { + continue + } + + // Ignore error here, it should be handled in handler. + _ = fn(v) + } + } + }() + } +} + +func (m *Manager) Publish(topic Topic, message any) { + sub, exist := m.getSub(topic) + if !exist { + m.rw.Lock() + { + sub = Subscription{ + topic: string(topic), + ps: pubsub.New(defaultBufferLength), + subscriber: make(map[SubscriberID]chan any), + } + + m.subs[topic] = sub + } + + m.rw.Unlock() + } + + sub.ps.Pub(message, sub.topic) +} + +func (m *Manager) Subscribe(consumerName string, topic Topic) (SubscriberID, <-chan any) { + m.rw.Lock() + defer m.rw.Unlock() + + subscription, exist := m.subs[topic] + if !exist { + subscription = Subscription{ + topic: string(topic), + ps: pubsub.New(defaultBufferLength), + subscriber: make(map[SubscriberID]chan any), + } + + m.subs[topic] = subscription + } + + id := newSubscriberID(consumerName) + c := subscription.ps.Sub(subscription.topic) + subscription.subscriber[id] = c + + return id, c +} + +func (m *Manager) Unsubscribe(topic Topic, id SubscriberID) { + m.rw.Lock() + defer m.rw.Unlock() + + sub, exist := m.subs[topic] + if !exist { + return + } + + c, exist := sub.subscriber[id] + if !exist { + return + } + + sub.ps.Unsub(c, sub.topic) + delete(sub.subscriber, id) +} + +func (m *Manager) getSub(topic Topic) (Subscription, bool) { + m.rw.RLock() + defer m.rw.RUnlock() + + s, exist := m.subs[topic] + + return s, exist +} + +func (m *Manager) GetStats() map[string]map[string]int64 { + m.rw.RLock() + defer m.rw.RUnlock() + + stats := make(map[string]map[string]int64) + + for topic, sub := range m.subs { + stats[string(topic)] = make(map[string]int64) + for subID, c := range sub.subscriber { + stats[string(topic)][string(subID)] = int64(len(c)) + } + } + + return stats +} diff --git a/pkg/eventbus/eventbus_test.go b/pkg/eventbus/eventbus_test.go new file mode 100644 index 0000000..5934959 --- /dev/null +++ b/pkg/eventbus/eventbus_test.go @@ -0,0 +1,59 @@ +package eventbus_test + +import ( + "context" + "testing" + "time" + + "github.com/KyberNetwork/tradinglib/pkg/eventbus" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func Test1(t *testing.T) { + l, _ := zap.NewDevelopment() + zap.ReplaceGlobals(l) + + var ( + m = eventbus.NewManager() + topic eventbus.Topic = "hello" + msg = "world" + ) + + id, c := m.Subscribe("", topic) + t.Log("id", id) + m.Publish(topic, msg) + + s := <-c + t.Log("s", s) +} + +func Test2(t *testing.T) { + l, _ := zap.NewDevelopment() + zap.ReplaceGlobals(l) + + var ( + m = eventbus.NewManager() + topic eventbus.Topic = "hello" + msg = "world" + ctx, cancel = context.WithCancel(context.Background()) + ) + + go m.StartConsume( + ctx, + "", + topic, + func(i any) error { + s, ok := i.(string) + assert.True(t, ok, i) + + assert.Equal(t, msg, s) + cancel() + + return nil + }) + + time.Sleep(time.Second) + m.Publish(topic, msg) + <-ctx.Done() +} diff --git a/pkg/eventbus/pkg.go b/pkg/eventbus/pkg.go new file mode 100644 index 0000000..c1bdf69 --- /dev/null +++ b/pkg/eventbus/pkg.go @@ -0,0 +1,20 @@ +package eventbus + +import ( + "math/rand" + "strconv" +) + +const ( + defaultBufferLength = 8048 +) + +type Handler func(any) error + +type Topic string + +type SubscriberID string + +func newSubscriberID(consumerName string) SubscriberID { + return SubscriberID(consumerName + "-" + strconv.Itoa(rand.Int())) //nolint:gosec +}