diff --git a/go.mod b/go.mod index 39cd31e..cc827b6 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect github.com/VictoriaMetrics/metrics v1.35.4 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect github.com/bits-and-blooms/bitset v1.22.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index 512944f..15d7850 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/VictoriaMetrics/metrics v1.35.4/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsK github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/pkg/flashblock/block_listener.go b/pkg/flashblock/block_listener.go new file mode 100644 index 0000000..6d22652 --- /dev/null +++ b/pkg/flashblock/block_listener.go @@ -0,0 +1,161 @@ +package flashblock + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +const ( + maxRetryWait = 30 * time.Second +) + +type NodeBlockListenerConfig struct { + WebSocketURL string +} + +func (c NodeBlockListenerConfig) validate() error { + if c.WebSocketURL == "" || (!strings.HasPrefix(c.WebSocketURL, "ws://") && + !strings.HasPrefix(c.WebSocketURL, "wss://")) { + return fmt.Errorf("invalid WebSocketURL: %s", c.WebSocketURL) + } + return nil +} + +type NodeClient struct { + config NodeBlockListenerConfig + l *zap.SugaredLogger + conn *websocket.Conn + publisher Publisher +} + +func NewNodeListenerClient(config NodeBlockListenerConfig, publisher Publisher) (*NodeClient, error) { + if err := config.validate(); err != nil { + return nil, err + } + + return &NodeClient{ + config: config, + l: zap.S().Named("block-listener"), + publisher: publisher, + }, nil +} + +func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error { + retryWait := time.Second + + resetRetryWait := func() { + retryWait = time.Second + } + + for { + select { + case <-ctx.Done(): + c.l.Info("Context cancelled, stopping flashblock listener") + return ctx.Err() + default: + err := c.connectAndListen(ctx, resetRetryWait) + if err != nil { + c.l.Errorw("Flashblock listener error", "error", err) + } + + if err == nil { + return nil + } + + c.l.Warnw("Retrying in %s...", "wait", retryWait) + time.Sleep(retryWait) + retryWait *= 2 + if retryWait > maxRetryWait { + retryWait = maxRetryWait + } + } + } +} + +func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func()) error { + c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL) + + conn, _, err := websocket.DefaultDialer.Dial(c.config.WebSocketURL, nil) + if err != nil { + return fmt.Errorf("failed to connect to WebSocket: %w", err) + } + c.conn = conn + defer conn.Close() + + c.l.Info("Connected to flashblock stream, listening for events...") + + // Reset retry delay after successful connection + resetRetryDelay() + + // PING PONG + conn.SetPingHandler(func(appData string) error { + c.l.Debugw("Ping received", "data", appData) + // nolint: errcheck + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + return conn.WriteMessage(websocket.PongMessage, []byte{}) + }) + + // Create a context for the ping goroutine that will be cancelled when this function exits + pingCtx, cancelPing := context.WithCancel(ctx) + defer cancelPing() + + go func() { + pingTicker := time.NewTicker(5 * time.Second) + defer pingTicker.Stop() + for { + select { + case <-pingCtx.Done(): + return + case <-pingTicker.C: + if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + c.l.Errorw("Ping error", "error", err) + _ = conn.Close() + return + } + } + } + }() + + // Listen for flashblocks + for { + select { + case <-ctx.Done(): + c.l.Info("Context cancelled, stopping flashblock listener") + return nil + default: + // nolint:errcheck + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + messageType, message, err := conn.ReadMessage() + if err != nil { + c.l.Errorw("Error reading node flashblock", "error", err) + return fmt.Errorf("stream error: %w", err) + } + + if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage { + c.l.Info("Ignoring non-binary/text message", "type", messageType, "message", string(message)) + continue + } + + jsonBytes, err := DecompressBrotli(message) + if err != nil { + c.l.Errorw("Error decompressing flashblock", "error", err) + } + + var flashBlock Flashblock + if err := json.Unmarshal(jsonBytes, &flashBlock); err != nil { + c.l.Errorw("Error parsing flashblock", "error", err, "data", string(jsonBytes)) + continue + } + + if err := c.publisher.Publish(ctx, NodeDataSource, flashBlock); err != nil { + c.l.Errorw("Error publishing flashblock", "error", err) + } + } + } +} diff --git a/pkg/flashblock/block_route_types.go b/pkg/flashblock/block_route_types.go new file mode 100644 index 0000000..64b2577 --- /dev/null +++ b/pkg/flashblock/block_route_types.go @@ -0,0 +1,201 @@ +package flashblock + +import ( + "encoding/json" + "maps" + "strconv" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +// This file contains the protobuf message types for bloXroute Base streamer API +// These types are based on the GetParsedBdnFlashBlockStream response structure + +// GetParsedBdnFlashBlockStreamRequest is the request message +type GetParsedBdnFlashBlockStreamRequest struct{} + +// GetParsedBdnFlashBlockStreamResponse represents a parsed flashblock response +type GetParsedBdnFlashBlockStreamResponse struct { + PayloadId string `json:"payloadId"` + Index string `json:"index"` + Base *Base `json:"base,omitempty"` + Diff *Diff `json:"diff,omitempty"` + Metadata *Metadata `json:"metadata,omitempty"` +} + +// Base contains the base block information (only in index 0) +type Base struct { + ParentBeaconBlockRoot string `json:"parentBeaconBlockRoot"` + ParentHash string `json:"parentHash"` + FeeRecipient string `json:"feeRecipient"` + PrevRandao string `json:"prevRandao"` + BlockNumber uint64 `json:"blockNumber"` + GasLimit uint64 `json:"gasLimit"` + Timestamp uint64 `json:"timestamp"` + ExtraData string `json:"extraData"` + BaseFeePerGas *hexutil.Big `json:"baseFeePerGas"` +} + +func (b *Base) UnmarshalJSON(data []byte) error { + type Alias Base + aux := &struct { + BlockNumber hexutil.Uint64 `json:"blockNumber"` + GasLimit hexutil.Uint64 `json:"gasLimit"` + Timestamp hexutil.Uint64 `json:"timestamp"` + BaseFeePerGas *hexutil.Big `json:"base_fee_per_gas"` + *Alias + }{ + Alias: (*Alias)(b), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + b.BlockNumber = uint64(aux.BlockNumber) + b.GasLimit = uint64(aux.GasLimit) + b.Timestamp = uint64(aux.Timestamp) + if aux.BaseFeePerGas != nil { + b.BaseFeePerGas = aux.BaseFeePerGas + } + return nil +} + +// Diff contains the differential block information +type Diff struct { + StateRoot string `json:"stateRoot"` + ReceiptsRoot string `json:"receiptsRoot"` + LogsBloom string `json:"logsBloom"` + GasUsed uint64 `json:"gasUsed"` + BlockHash string `json:"blockHash"` + Transactions []string `json:"transactions"` + Withdrawals []string `json:"withdrawals"` + WithdrawalsRoot string `json:"withdrawalsRoot"` +} + +func (d *Diff) UnmarshalJSON(data []byte) error { + type Alias Diff + aux := &struct { + GasUsed hexutil.Uint64 `json:"gasUsed"` + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + d.GasUsed = uint64(aux.GasUsed) + return nil +} + +// Metadata contains block metadata including receipts +type Metadata struct { + BlockNumber uint64 `json:"blockNumber"` // "0x2483f21" + NewAccountBalances map[common.Address]*hexutil.Big `json:"newAccountBalances"` + Receipts map[common.Hash]*Receipt `json:"receipts"` +} + +func (b *Metadata) UnmarshalJSON(data []byte) error { + type Alias Metadata + aux := &struct { + BlockNumber string `json:"blockNumber"` + *Alias + }{ + Alias: (*Alias)(b), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + blockNo, err := strconv.ParseInt(aux.BlockNumber, 10, 64) + if err != nil { + return err + } + b.BlockNumber = uint64(blockNo) + return nil +} + +// Receipt represents a transaction receipt +type Receipt struct { + Eip1559 *Eip1559Receipt `json:"Eip1559,omitempty"` + Legacy *LegacyReceipt `json:"Legacy,omitempty"` + Deposit *DepositReceipt `json:"Deposit,omitempty"` +} + +// Eip1559Receipt represents an EIP-1559 transaction receipt +type Eip1559Receipt struct { + CumulativeGasUsed string `json:"cumulativeGasUsed"` + Logs []*Log `json:"logs"` + Status string `json:"status"` +} + +// LegacyReceipt represents a legacy transaction receipt +type LegacyReceipt struct { + CumulativeGasUsed string `json:"cumulativeGasUsed"` + Logs []*Log `json:"logs"` + Status string `json:"status"` +} + +// DepositReceipt represents a deposit transaction receipt +type DepositReceipt struct { + CumulativeGasUsed string `json:"cumulativeGasUsed"` + Logs []*Log `json:"logs"` + Status string `json:"status"` + DepositNonce string `json:"depositNonce"` + DepositReceiptVersion string `json:"depositReceiptVersion"` +} + +// Log represents an event log +type Log struct { + Address string `json:"address"` + Topics []string `json:"topics"` + Data string `json:"data"` +} + +// StreamerApiClient is the gRPC client interface +// nolint:lll +type StreamerApiClient interface { + GetParsedBdnFlashBlockStream(ctx interface{}, in *GetParsedBdnFlashBlockStreamRequest, opts ...interface{}) (StreamerApi_GetParsedBdnFlashBlockStreamClient, error) +} + +// StreamerApi_GetParsedBdnFlashBlockStreamClient is the stream client interface +type StreamerApi_GetParsedBdnFlashBlockStreamClient interface { + Recv() (*GetParsedBdnFlashBlockStreamResponse, error) +} + +func convertBloxRouteBaseToFlashblockBase(b *Base) *FlashblockBase { + if b == nil { + return nil + } + return &FlashblockBase{ + ParentHash: common.HexToHash(b.ParentHash), + FeeRecipient: common.HexToAddress(b.FeeRecipient), + BlockNumber: b.BlockNumber, + GasLimit: b.GasLimit, + Timestamp: b.Timestamp, + BaseFeePerGas: b.BaseFeePerGas, + } +} + +func convertBloxRouteDiffToFlashblockDiff(d *Diff) *FlashblockDiff { + if d == nil { + return nil + } + return &FlashblockDiff{ + StateRoot: common.HexToHash(d.StateRoot), + BlockHash: common.HexToHash(d.BlockHash), + GasUsed: d.GasUsed, + Transactions: d.Transactions, + Withdrawals: d.Withdrawals, + } +} + +func convertBloxRouteMetadataToFlashblockMeta(m *Metadata) *FlashblockMeta { + if m == nil { + return nil + } + return &FlashblockMeta{ + BlockNumber: m.BlockNumber, + NewAccountBalances: maps.Clone(m.NewAccountBalances), + Receipts: maps.Clone(m.Receipts), + } +} diff --git a/pkg/flashblock/blox_route_client.go b/pkg/flashblock/blox_route_client.go new file mode 100644 index 0000000..892cbaf --- /dev/null +++ b/pkg/flashblock/blox_route_client.go @@ -0,0 +1,288 @@ +package flashblock + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "syscall" + "time" + + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +// Config holds the configuration for the flashblock client +type Config struct { + WebSocketURL string + AuthHeader string +} + +// validate validates the configuration +func (c *Config) validate() error { + if c.WebSocketURL == "" { + return fmt.Errorf("WebSocketURL is required") + } + if c.AuthHeader == "" { + return fmt.Errorf("AuthHeader is required") + } + + return nil +} + +// Client represents a WebSocket client for bloXroute Base streamer +type Client struct { + config Config + l *zap.SugaredLogger + conn *websocket.Conn + publisher Publisher +} + +// WebSocketMessage represents the WebSocket subscription message +type WebSocketMessage struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params []interface{} `json:"params"` +} + +// WebSocketResponse represents the WebSocket response +type WebSocketResponse struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params struct { + Subscription string `json:"subscription"` + Result *GetParsedBdnFlashBlockStreamResponse `json:"result"` + } `json:"params"` +} + +// NewNodeListenerClient creates a new bloxroute client +func NewBloxRouteClient(config Config, publisher Publisher) (*Client, error) { + if err := config.validate(); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + return &Client{ + config: config, + l: zap.S().Named("blox-route-client"), + publisher: publisher, + }, nil +} + +// Start starts the bloxroute stream listener +func (c *Client) Start(ctx context.Context) error { + if err := c.Listen(ctx); err != nil { + c.l.Errorw("Failed to listen for bloxroute", "error", err) + } + + return nil +} + +// Listen connects to the WebSocket stream and listens for bloxroute with automatic retry +func (c *Client) Listen(ctx context.Context) error { + c.l.Infow("Starting bloxroute block listener with retry", "url", c.config.WebSocketURL) + + retryWait := 3 * time.Second + retryCount := 0 + + resetRetryWait := func() { + retryWait = time.Second + } + + for { + select { + case <-ctx.Done(): + c.l.Info("Context cancelled, stopping flashblock listener") + return ctx.Err() + default: + err := c.connectAndListen(ctx, resetRetryWait) + if err == nil { + return nil // Normal exit + } + + // Check if error is retryable + if !c.isRetryableError(err) { + c.l.Errorw("Non-retryable error, stopping flashblock listener", "error", err) + return err + } + + // Log retry attempt + retryCount++ + c.l.Warnw("Flashblock connection error, retrying", + "error", err, + "retryCount", retryCount, + "retryWait", retryWait) + + // Wait before retry + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryWait): + // Exponential backoff with max delay + retryWait *= 2 + if retryWait > maxRetryWait { + retryWait = maxRetryWait + } + } + } + } +} + +// connectAndListen establishes connection and listens for bloxroute +func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) error { + c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL) + + // Create WebSocket connection with authorization header + header := make(map[string][]string) + header["Authorization"] = []string{c.config.AuthHeader} + + conn, _, err := websocket.DefaultDialer.Dial(c.config.WebSocketURL, header) + if err != nil { + return fmt.Errorf("failed to connect to WebSocket: %w", err) + } + c.conn = conn + defer conn.Close() + + // Subscribe to GetParsedBdnFlashBlockStream + subscribeMsg := WebSocketMessage{ + JSONRPC: "2.0", + ID: 1, + Method: "subscribe", + Params: []interface{}{"GetParsedBdnFlashBlockStream", map[string]interface{}{}}, + } + + if err := conn.WriteJSON(subscribeMsg); err != nil { + return fmt.Errorf("failed to send subscription message: %w", err) + } + + c.l.Info("Connected to flashblock stream, listening for events...") + + // Reset retry delay after successful connection + resetRetryDelay() + + // PING PONG + conn.SetPingHandler(func(appData string) error { + c.l.Debugw("Ping received", "data", appData) + return conn.WriteMessage(websocket.PongMessage, []byte{}) + }) + + // Create a context for the ping goroutine that will be cancelled when this function exits + pingCtx, cancelPing := context.WithCancel(ctx) + defer cancelPing() + + go func() { + pingTicker := time.NewTicker(5 * time.Second) + defer pingTicker.Stop() + for { + select { + case <-pingCtx.Done(): + return + case <-pingTicker.C: + if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + c.l.Errorw("Ping error", "error", err) + _ = conn.Close() + return + } + } + } + }() + + // Listen for bloxroute + for { + select { + case <-ctx.Done(): + c.l.Info("Context cancelled, stopping flashblock listener") + return ctx.Err() + default: + var response WebSocketResponse + _, data, err := conn.ReadMessage() + if err != nil { + c.l.Errorw("Error reading bloxroute flashblock", "error", err, "data", data) + continue + } + + if err := json.Unmarshal(data, &response); err != nil { + c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data)) + } + + if response.Params.Result == nil { + // First message is subscription confirmation + c.l.Debugw("Received subscription confirmation") + continue + } + + c.processFlashblock(ctx, *response.Params.Result) + } + } +} + +// isRetryableError checks if an error should trigger a retry +func (c *Client) isRetryableError(err error) bool { + if err == nil { + return false + } + + // Don't retry on context cancellation + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + + // Retry on WebSocket close errors + if websocket.IsCloseError(err, + websocket.CloseAbnormalClosure, + websocket.CloseNormalClosure, + websocket.CloseServiceRestart, + websocket.CloseGoingAway) { + return true + } + + // Retry on connection reset + if errors.Is(err, syscall.ECONNRESET) { + return true + } + + // Retry on connection refused + if errors.Is(err, syscall.ECONNREFUSED) { + return true + } + + // Default to retry for other errors + return true +} + +// processFlashblock processes a received flashblock and publishes to subscribers +func (c *Client) processFlashblock(ctx context.Context, response GetParsedBdnFlashBlockStreamResponse) { + if response.Metadata == nil { + return + } + + blockNumber := response.Metadata.BlockNumber + c.l.Debugw("Processing flashblock", "blockNumber", blockNumber, "index", response.Index) + + flashBlock, err := convertBlockRouteFlashBlock(response) + if err != nil { + c.l.Errorw("Error converting bloxroute flashblock", "error", err, "blockNumber", blockNumber, "index", response.Index) + return + } + // Publish the flashblock data to subscribers + if c.publisher != nil { + if err := c.publisher.Publish(ctx, BloxRouteDataSource, flashBlock); err != nil { + c.l.Errorw("Failed to publish flashblock", "error", err, "blockNumber", blockNumber) + } + } +} + +func convertBlockRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStreamResponse) (Flashblock, error) { + index, err := strconv.ParseInt(bloxrouteFlashblock.Index, 10, 64) + if err != nil { + return Flashblock{}, fmt.Errorf("invalid index: %w value %v", err, index) + } + return Flashblock{ + PayloadID: bloxrouteFlashblock.PayloadId, + Index: index, + Base: convertBloxRouteBaseToFlashblockBase(bloxrouteFlashblock.Base), + Diff: convertBloxRouteDiffToFlashblockDiff(bloxrouteFlashblock.Diff), + Metadata: convertBloxRouteMetadataToFlashblockMeta(bloxrouteFlashblock.Metadata), + }, nil +} diff --git a/pkg/flashblock/flash_bock.go b/pkg/flashblock/flash_bock.go new file mode 100644 index 0000000..cbe81bf --- /dev/null +++ b/pkg/flashblock/flash_bock.go @@ -0,0 +1,110 @@ +package flashblock + +import ( + "encoding/json" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type Flashblock struct { + PayloadID string `json:"payload_id"` + Index int64 `json:"index"` + Base *FlashblockBase `json:"base"` + Diff *FlashblockDiff `json:"diff"` + Metadata *FlashblockMeta `json:"metadata"` +} + +type FlashblockBase struct { + ParentHash common.Hash `json:"parent_hash"` + FeeRecipient common.Address `json:"fee_recipient"` + BlockNumber uint64 `json:"block_number"` + GasLimit uint64 `json:"gas_limit"` + Timestamp uint64 `json:"timestamp"` + BaseFeePerGas *hexutil.Big `json:"base_fee_per_gas"` +} + +type FlashblockDiff struct { + StateRoot common.Hash `json:"state_root"` + BlockHash common.Hash `json:"block_hash"` + GasUsed uint64 `json:"gas_used"` + Transactions []string `json:"transactions"` + Withdrawals []string `json:"withdrawals"` +} + +type FlashblockMeta struct { + BlockNumber uint64 `json:"block_number"` + NewAccountBalances map[common.Address]*hexutil.Big `json:"new_account_balances"` + Receipts map[common.Hash]*Receipt `json:"receipts"` +} + +// --- Unmarshal Implementations --- +func (b *FlashblockBase) UnmarshalJSON(data []byte) error { + type Alias FlashblockBase + aux := &struct { + BlockNumber hexutil.Uint64 `json:"block_number"` + GasLimit hexutil.Uint64 `json:"gas_limit"` + Timestamp hexutil.Uint64 `json:"timestamp"` + *Alias + }{ + Alias: (*Alias)(b), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + b.BlockNumber = uint64(aux.BlockNumber) + b.GasLimit = uint64(aux.GasLimit) + b.Timestamp = uint64(aux.Timestamp) + return nil +} + +func (d *FlashblockDiff) UnmarshalJSON(data []byte) error { + type Alias FlashblockDiff + aux := &struct { + GasUsed hexutil.Uint64 `json:"gas_used"` + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + d.GasUsed = uint64(aux.GasUsed) + return nil +} + +// --- Marshal Implementations --- + +func (b FlashblockBase) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + ParentHash common.Hash `json:"parent_hash"` + FeeRecipient common.Address `json:"fee_recipient"` + BlockNumber hexutil.Uint64 `json:"block_number"` + GasLimit hexutil.Uint64 `json:"gas_limit"` + Timestamp hexutil.Uint64 `json:"timestamp"` + BaseFeePerGas *hexutil.Big `json:"base_fee_per_gas"` + }{ + ParentHash: b.ParentHash, + FeeRecipient: b.FeeRecipient, + BlockNumber: hexutil.Uint64(b.BlockNumber), + GasLimit: hexutil.Uint64(b.GasLimit), + Timestamp: hexutil.Uint64(b.Timestamp), + BaseFeePerGas: b.BaseFeePerGas, + }) +} + +func (d FlashblockDiff) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + StateRoot common.Hash `json:"state_root"` + BlockHash common.Hash `json:"block_hash"` + GasUsed hexutil.Uint64 `json:"gas_used"` + Transactions []string `json:"transactions"` + Withdrawals []string `json:"withdrawals"` + }{ + StateRoot: d.StateRoot, + BlockHash: d.BlockHash, + GasUsed: hexutil.Uint64(d.GasUsed), + Transactions: d.Transactions, + Withdrawals: d.Withdrawals, + }) +} diff --git a/pkg/flashblock/interface.go b/pkg/flashblock/interface.go new file mode 100644 index 0000000..749afcd --- /dev/null +++ b/pkg/flashblock/interface.go @@ -0,0 +1,16 @@ +package flashblock + +import ( + "context" +) + +type DataSource string + +const ( + NodeDataSource DataSource = "node" + BloxRouteDataSource DataSource = "bloxroute" +) + +type Publisher interface { + Publish(ctx context.Context, source DataSource, data Flashblock) error +} diff --git a/pkg/flashblock/util.go b/pkg/flashblock/util.go new file mode 100644 index 0000000..1eab0a3 --- /dev/null +++ b/pkg/flashblock/util.go @@ -0,0 +1,13 @@ +package flashblock + +import ( + "bytes" + "io" + + "github.com/andybalholm/brotli" +) + +func DecompressBrotli(input []byte) ([]byte, error) { + reader := brotli.NewReader(bytes.NewReader(input)) + return io.ReadAll(reader) +}