Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/VictoriaMetrics/metrics v1.35.4 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/bits-and-blooms/bitset v1.22.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/VictoriaMetrics/metrics v1.35.4/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsK
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
161 changes: 161 additions & 0 deletions pkg/flashblock/block_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package flashblock

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gorilla/websocket"
"go.uber.org/zap"
)

const (
maxRetryWait = 30 * time.Second
)

type NodeBlockListenerConfig struct {
WebSocketURL string
}

func (c NodeBlockListenerConfig) validate() error {
if c.WebSocketURL == "" || (!strings.HasPrefix(c.WebSocketURL, "ws://") &&
!strings.HasPrefix(c.WebSocketURL, "wss://")) {
return fmt.Errorf("invalid WebSocketURL: %s", c.WebSocketURL)
}
return nil
}

type NodeClient struct {
config NodeBlockListenerConfig
l *zap.SugaredLogger
conn *websocket.Conn
publisher Publisher
}

func NewNodeListenerClient(config NodeBlockListenerConfig, publisher Publisher) (*NodeClient, error) {
if err := config.validate(); err != nil {
return nil, err
}

return &NodeClient{
config: config,
l: zap.S().Named("block-listener"),
publisher: publisher,
}, nil
}

func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error {
retryWait := time.Second

resetRetryWait := func() {
retryWait = time.Second
}

for {
select {
case <-ctx.Done():
c.l.Info("Context cancelled, stopping flashblock listener")
return ctx.Err()
default:
err := c.connectAndListen(ctx, resetRetryWait)
if err != nil {
c.l.Errorw("Flashblock listener error", "error", err)
}

if err == nil {
return nil
}

c.l.Warnw("Retrying in %s...", "wait", retryWait)
time.Sleep(retryWait)
retryWait *= 2
if retryWait > maxRetryWait {
retryWait = maxRetryWait
}
}
}
}

func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func()) error {
c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL)

conn, _, err := websocket.DefaultDialer.Dial(c.config.WebSocketURL, nil)
if err != nil {
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
c.conn = conn
defer conn.Close()

c.l.Info("Connected to flashblock stream, listening for events...")

// Reset retry delay after successful connection
resetRetryDelay()

// PING PONG
conn.SetPingHandler(func(appData string) error {
c.l.Debugw("Ping received", "data", appData)
// nolint: errcheck
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
return conn.WriteMessage(websocket.PongMessage, []byte{})
})

// Create a context for the ping goroutine that will be cancelled when this function exits
pingCtx, cancelPing := context.WithCancel(ctx)
defer cancelPing()

go func() {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case <-pingCtx.Done():
return
case <-pingTicker.C:
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
c.l.Errorw("Ping error", "error", err)
_ = conn.Close()
return
}
}
}
}()

// Listen for flashblocks
for {
select {
case <-ctx.Done():
c.l.Info("Context cancelled, stopping flashblock listener")
return nil
default:
// nolint:errcheck
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
messageType, message, err := conn.ReadMessage()
if err != nil {
c.l.Errorw("Error reading node flashblock", "error", err)
return fmt.Errorf("stream error: %w", err)
}

if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
c.l.Info("Ignoring non-binary/text message", "type", messageType, "message", string(message))
continue
}

jsonBytes, err := DecompressBrotli(message)
if err != nil {
c.l.Errorw("Error decompressing flashblock", "error", err)
}

var flashBlock Flashblock
if err := json.Unmarshal(jsonBytes, &flashBlock); err != nil {
c.l.Errorw("Error parsing flashblock", "error", err, "data", string(jsonBytes))
continue
}

if err := c.publisher.Publish(ctx, NodeDataSource, flashBlock); err != nil {
c.l.Errorw("Error publishing flashblock", "error", err)
}
}
}
}
201 changes: 201 additions & 0 deletions pkg/flashblock/block_route_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package flashblock

import (
"encoding/json"
"maps"
"strconv"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)

// This file contains the protobuf message types for bloXroute Base streamer API
// These types are based on the GetParsedBdnFlashBlockStream response structure

// GetParsedBdnFlashBlockStreamRequest is the request message
type GetParsedBdnFlashBlockStreamRequest struct{}

// GetParsedBdnFlashBlockStreamResponse represents a parsed flashblock response
type GetParsedBdnFlashBlockStreamResponse struct {
PayloadId string `json:"payloadId"`
Index string `json:"index"`
Base *Base `json:"base,omitempty"`
Diff *Diff `json:"diff,omitempty"`
Metadata *Metadata `json:"metadata,omitempty"`
}

// Base contains the base block information (only in index 0)
type Base struct {
ParentBeaconBlockRoot string `json:"parentBeaconBlockRoot"`
ParentHash string `json:"parentHash"`
FeeRecipient string `json:"feeRecipient"`
PrevRandao string `json:"prevRandao"`
BlockNumber uint64 `json:"blockNumber"`
GasLimit uint64 `json:"gasLimit"`
Timestamp uint64 `json:"timestamp"`
ExtraData string `json:"extraData"`
BaseFeePerGas *hexutil.Big `json:"baseFeePerGas"`
}

func (b *Base) UnmarshalJSON(data []byte) error {
type Alias Base
aux := &struct {
BlockNumber hexutil.Uint64 `json:"blockNumber"`
GasLimit hexutil.Uint64 `json:"gasLimit"`
Timestamp hexutil.Uint64 `json:"timestamp"`
BaseFeePerGas *hexutil.Big `json:"base_fee_per_gas"`
*Alias
}{
Alias: (*Alias)(b),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
b.BlockNumber = uint64(aux.BlockNumber)
b.GasLimit = uint64(aux.GasLimit)
b.Timestamp = uint64(aux.Timestamp)
if aux.BaseFeePerGas != nil {
b.BaseFeePerGas = aux.BaseFeePerGas
}
return nil
}

// Diff contains the differential block information
type Diff struct {
StateRoot string `json:"stateRoot"`
ReceiptsRoot string `json:"receiptsRoot"`
LogsBloom string `json:"logsBloom"`
GasUsed uint64 `json:"gasUsed"`
BlockHash string `json:"blockHash"`
Transactions []string `json:"transactions"`
Withdrawals []string `json:"withdrawals"`
WithdrawalsRoot string `json:"withdrawalsRoot"`
}

func (d *Diff) UnmarshalJSON(data []byte) error {
type Alias Diff
aux := &struct {
GasUsed hexutil.Uint64 `json:"gasUsed"`
*Alias
}{
Alias: (*Alias)(d),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
d.GasUsed = uint64(aux.GasUsed)
return nil
}

// Metadata contains block metadata including receipts
type Metadata struct {
BlockNumber uint64 `json:"blockNumber"` // "0x2483f21"
NewAccountBalances map[common.Address]*hexutil.Big `json:"newAccountBalances"`
Receipts map[common.Hash]*Receipt `json:"receipts"`
}

func (b *Metadata) UnmarshalJSON(data []byte) error {
type Alias Metadata
aux := &struct {
BlockNumber string `json:"blockNumber"`
*Alias
}{
Alias: (*Alias)(b),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}

blockNo, err := strconv.ParseInt(aux.BlockNumber, 10, 64)
if err != nil {
return err
}
b.BlockNumber = uint64(blockNo)
return nil
}

// Receipt represents a transaction receipt
type Receipt struct {
Eip1559 *Eip1559Receipt `json:"Eip1559,omitempty"`
Legacy *LegacyReceipt `json:"Legacy,omitempty"`
Deposit *DepositReceipt `json:"Deposit,omitempty"`
}

// Eip1559Receipt represents an EIP-1559 transaction receipt
type Eip1559Receipt struct {
CumulativeGasUsed string `json:"cumulativeGasUsed"`
Logs []*Log `json:"logs"`
Status string `json:"status"`
}

// LegacyReceipt represents a legacy transaction receipt
type LegacyReceipt struct {
CumulativeGasUsed string `json:"cumulativeGasUsed"`
Logs []*Log `json:"logs"`
Status string `json:"status"`
}

// DepositReceipt represents a deposit transaction receipt
type DepositReceipt struct {
CumulativeGasUsed string `json:"cumulativeGasUsed"`
Logs []*Log `json:"logs"`
Status string `json:"status"`
DepositNonce string `json:"depositNonce"`
DepositReceiptVersion string `json:"depositReceiptVersion"`
}

// Log represents an event log
type Log struct {
Address string `json:"address"`
Topics []string `json:"topics"`
Data string `json:"data"`
}

// StreamerApiClient is the gRPC client interface
// nolint:lll
type StreamerApiClient interface {
GetParsedBdnFlashBlockStream(ctx interface{}, in *GetParsedBdnFlashBlockStreamRequest, opts ...interface{}) (StreamerApi_GetParsedBdnFlashBlockStreamClient, error)
}

// StreamerApi_GetParsedBdnFlashBlockStreamClient is the stream client interface
type StreamerApi_GetParsedBdnFlashBlockStreamClient interface {
Recv() (*GetParsedBdnFlashBlockStreamResponse, error)
}

func convertBloxRouteBaseToFlashblockBase(b *Base) *FlashblockBase {
if b == nil {
return nil
}
return &FlashblockBase{
ParentHash: common.HexToHash(b.ParentHash),
FeeRecipient: common.HexToAddress(b.FeeRecipient),
BlockNumber: b.BlockNumber,
GasLimit: b.GasLimit,
Timestamp: b.Timestamp,
BaseFeePerGas: b.BaseFeePerGas,
}
}

func convertBloxRouteDiffToFlashblockDiff(d *Diff) *FlashblockDiff {
if d == nil {
return nil
}
return &FlashblockDiff{
StateRoot: common.HexToHash(d.StateRoot),
BlockHash: common.HexToHash(d.BlockHash),
GasUsed: d.GasUsed,
Transactions: d.Transactions,
Withdrawals: d.Withdrawals,
}
}

func convertBloxRouteMetadataToFlashblockMeta(m *Metadata) *FlashblockMeta {
if m == nil {
return nil
}
return &FlashblockMeta{
BlockNumber: m.BlockNumber,
NewAccountBalances: maps.Clone(m.NewAccountBalances),
Receipts: maps.Clone(m.Receipts),
}
}
Loading
Loading