Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions pkg/flashblock/block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (

type NodeBlockListenerConfig struct {
WebSocketURL string
AuthHeader string
}

func (c NodeBlockListenerConfig) validate() error {
Expand All @@ -28,21 +29,24 @@ func (c NodeBlockListenerConfig) validate() error {
}

type NodeClient struct {
config NodeBlockListenerConfig
l *zap.SugaredLogger
conn *websocket.Conn
publisher Publisher
config NodeBlockListenerConfig
l *zap.SugaredLogger
conn *websocket.Conn
publisher Publisher
dataSource DataSource
}

func NewNodeListenerClient(config NodeBlockListenerConfig, publisher Publisher) (*NodeClient, error) {
// nolint:lll
func NewNodeListenerClient(config NodeBlockListenerConfig, publisher Publisher, dataSource DataSource) (*NodeClient, error) {
if err := config.validate(); err != nil {
return nil, err
}

return &NodeClient{
config: config,
l: zap.S().Named("block-listener"),
publisher: publisher,
config: config,
l: zap.S().Named("block-listener"),
publisher: publisher,
dataSource: dataSource,
}, nil
}

Expand Down Expand Up @@ -78,10 +82,16 @@ func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error {
}
}

// nolint:cyclop
func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func()) error {
c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL)

conn, _, err := websocket.DefaultDialer.Dial(c.config.WebSocketURL, nil)
header := make(map[string][]string)
if c.config.AuthHeader != "" {
header["Authorization"] = []string{c.config.AuthHeader}
}

conn, _, err := websocket.DefaultDialer.Dial(c.config.WebSocketURL, header)
if err != nil {
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
Expand Down Expand Up @@ -153,7 +163,7 @@ func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func(
continue
}

if err := c.publisher.Publish(ctx, NodeDataSource, flashBlock); err != nil {
if err := c.publisher.Publish(ctx, c.dataSource, flashBlock); err != nil {
c.l.Errorw("Error publishing flashblock", "error", err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/flashblock/block_route_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,15 @@ func (b *Metadata) UnmarshalJSON(data []byte) error {
return nil
}

// nolint: godox
// TODO: update logs later
// Receipt represents a transaction receipt
type Receipt struct {
Eip1559 *Eip1559Receipt `json:"Eip1559,omitempty"`
Legacy *LegacyReceipt `json:"Legacy,omitempty"`
Deposit *DepositReceipt `json:"Deposit,omitempty"`
Status string `json:"status"`
Type string `json:"type"`
}

// Eip1559Receipt represents an EIP-1559 transaction receipt
Expand Down
Loading