diff --git a/.golangci.yaml b/.golangci.yaml index b186ca1e..4bf9cd9d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -30,6 +30,7 @@ linters: - exhaustruct - tenv - gocognit + - dupl # # Disabled because of generics: diff --git a/common/constants.go b/common/constants.go new file mode 100644 index 00000000..8db21729 --- /dev/null +++ b/common/constants.go @@ -0,0 +1,8 @@ +package common + +const ( + EthConsensusVersionBellatrix = "bellatrix" + EthConsensusVersionCapella = "capella" + EthConsensusVersionDeneb = "deneb" + EthConsensusVersionElectra = "electra" +) diff --git a/common/types_spec.go b/common/types_spec.go index 95e65a61..2e4c903b 100644 --- a/common/types_spec.go +++ b/common/types_spec.go @@ -1,6 +1,7 @@ package common import ( + "bytes" "encoding/json" "fmt" @@ -11,6 +12,7 @@ import ( builderApiV1 "github.com/attestantio/go-builder-client/api/v1" builderSpec "github.com/attestantio/go-builder-client/spec" eth2Api "github.com/attestantio/go-eth2-client/api" + eth2ApiV1Bellatrix "github.com/attestantio/go-eth2-client/api/v1/bellatrix" eth2ApiV1Capella "github.com/attestantio/go-eth2-client/api/v1/capella" eth2ApiV1Deneb "github.com/attestantio/go-eth2-client/api/v1/deneb" eth2ApiV1Electra "github.com/attestantio/go-eth2-client/api/v1/electra" @@ -589,3 +591,36 @@ func (r *VersionedSignedBlindedBeaconBlock) UnmarshalJSON(input []byte) error { } return errors.Wrap(err, "failed to unmarshal SignedBlindedBeaconBlock") } + +func (r *VersionedSignedBlindedBeaconBlock) Unmarshal(input []byte, contentType, ethConsensusVersion string) error { + switch contentType { + case "application/octet-stream": + if ethConsensusVersion != "" { + switch ethConsensusVersion { + case EthConsensusVersionBellatrix: + r.Version = spec.DataVersionBellatrix + r.Bellatrix = new(eth2ApiV1Bellatrix.SignedBlindedBeaconBlock) + return r.Bellatrix.UnmarshalSSZ(input) + case EthConsensusVersionCapella: + r.Version = spec.DataVersionCapella + r.Capella = new(eth2ApiV1Capella.SignedBlindedBeaconBlock) + return r.Capella.UnmarshalSSZ(input) + case EthConsensusVersionDeneb: + r.Version = spec.DataVersionDeneb + r.Deneb = new(eth2ApiV1Deneb.SignedBlindedBeaconBlock) + return r.Deneb.UnmarshalSSZ(input) + case EthConsensusVersionElectra: + r.Version = spec.DataVersionElectra + r.Electra = new(eth2ApiV1Electra.SignedBlindedBeaconBlock) + return r.Electra.UnmarshalSSZ(input) + default: + return ErrInvalidForkVersion + } + } else { + return ErrMissingEthConsensusVersion + } + case "application/json": + return json.NewDecoder(bytes.NewReader(input)).Decode(r) + } + return ErrInvalidContentType +} diff --git a/common/utils.go b/common/utils.go index 9ab66c21..0d0ef0a3 100644 --- a/common/utils.go +++ b/common/utils.go @@ -26,9 +26,11 @@ import ( ) var ( - ErrInvalidForkVersion = errors.New("invalid fork version") - ErrHTTPErrorResponse = errors.New("got an HTTP error response") - ErrIncorrectLength = errors.New("incorrect length") + ErrInvalidForkVersion = errors.New("invalid fork version") + ErrHTTPErrorResponse = errors.New("got an HTTP error response") + ErrIncorrectLength = errors.New("incorrect length") + ErrMissingEthConsensusVersion = errors.New("missing eth-consensus-version") + ErrInvalidContentType = errors.New("invalid content type") ) // SlotPos returns the slot's position in the epoch (1-based, i.e. 1..32) diff --git a/datastore/redis.go b/datastore/redis.go index 79739415..e5c8302b 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -847,6 +847,6 @@ func (r *RedisCache) NewPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint return r.client.Pipeline() } -func (r *RedisCache) NewTxPipeline() redis.Pipeliner { //nolint:ireturn +func (r *RedisCache) NewTxPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint return r.client.TxPipeline() } diff --git a/services/api/service.go b/services/api/service.go index b28fae06..a5c3a31e 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -2,7 +2,6 @@ package api import ( - "bytes" "compress/gzip" "context" "database/sql" @@ -10,6 +9,7 @@ import ( "fmt" "io" "math/big" + "mime" "net/http" _ "net/http/pprof" "os" @@ -24,6 +24,7 @@ import ( "github.com/aohorodnyk/mimeheader" builderApi "github.com/attestantio/go-builder-client/api" builderApiV1 "github.com/attestantio/go-builder-client/api/v1" + builderSpec "github.com/attestantio/go-builder-client/spec" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/buger/jsonparser" @@ -52,6 +53,13 @@ const ( ErrBlockAlreadyKnown = "simulation failed: block already known" ErrBlockRequiresReorg = "simulation failed: block requires a reorg" ErrMissingTrieNode = "missing trie node" + + ApplicationJSON = "application/json" + ApplicationOctetStream = "application/octet-stream" + + HeaderAccept = "Accept" + HeaderContentType = "Content-Type" + HeaderEthConsensusVersion = "Eth-Consensus-Version" ) var ( @@ -62,6 +70,7 @@ var ( ErrServerAlreadyStarted = errors.New("server was already started") ErrBuilderAPIWithoutSecretKey = errors.New("cannot start builder API without secret key") ErrNegativeTimestamp = errors.New("timestamp cannot be negative") + ErrInvalidForkVersion = errors.New("invalid fork version") ) var ( @@ -929,7 +938,7 @@ func (api *RelayAPI) RespondMsg(w http.ResponseWriter, code int, msg string) { } func (api *RelayAPI) Respond(w http.ResponseWriter, code int, response any) { - w.Header().Set("Content-Type", "application/json") + w.Header().Set(HeaderContentType, ApplicationJSON) w.WriteHeader(code) if response == nil { return @@ -946,32 +955,12 @@ func (api *RelayAPI) handleStatus(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } -const ( - ApplicationJSON = "application/json" - ApplicationOctetStream = "application/octet-stream" -) - -// RequestAcceptsJSON returns true if the Accept header is empty (defaults to JSON) -// or application/json can be negotiated. -func RequestAcceptsJSON(req *http.Request) bool { - ah := req.Header.Get("Accept") - if ah == "" { - return true - } - mh := mimeheader.ParseAcceptHeader(ah) - _, _, matched := mh.Negotiate( - []string{ApplicationJSON}, - ApplicationJSON, - ) - return matched -} - // NegotiateRequestResponseType returns whether the request accepts // JSON (application/json) or SSZ (application/octet-stream) responses. // If accepted is false, no mime type could be negotiated and the server // should respond with http.StatusNotAcceptable. func NegotiateRequestResponseType(req *http.Request) (mimeType string, err error) { - ah := req.Header.Get("Accept") + ah := req.Header.Get(HeaderAccept) if ah == "" { return ApplicationJSON, nil } @@ -1005,19 +994,6 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re "contentLength": req.ContentLength, }) - // If the Content-Type header is included, for now only allow JSON. - // TODO: support Content-Type: application/octet-stream and allow SSZ - // request bodies. - if ct := req.Header.Get("Content-Type"); ct != "" { - switch ct { - case ApplicationJSON: - break - default: - api.RespondError(w, http.StatusUnsupportedMediaType, "only Content-Type: application/json is currently supported") - return - } - } - start := time.Now().UTC() registrationTimestampUpperBound := start.Unix() + 10 // 10 seconds from now @@ -1041,8 +1017,13 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re return } + // Get the request content type + proposerContentType := req.Header.Get(HeaderContentType) + log = log.WithField("proposerContentType", proposerContentType) + + // Read the encoded validator registrations limitReader := io.LimitReader(req.Body, int64(apiMaxPayloadBytes)) - body, err := io.ReadAll(limitReader) + regBytes, err := io.ReadAll(limitReader) if err != nil { log.WithError(err).Warn("failed to read request body") api.RespondError(w, http.StatusBadRequest, "failed to read request body") @@ -1050,98 +1031,26 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re } req.Body.Close() - parseRegistration := func(value []byte) (reg *builderApiV1.SignedValidatorRegistration, err error) { - // Pubkey - _pubkey, err := jsonparser.GetUnsafeString(value, "message", "pubkey") - if err != nil { - return nil, fmt.Errorf("registration message error (pubkey): %w", err) - } - - pubkey, err := utils.HexToPubkey(_pubkey) - if err != nil { - return nil, fmt.Errorf("registration message error (pubkey): %w", err) - } - - // Timestamp - _timestamp, err := jsonparser.GetUnsafeString(value, "message", "timestamp") - if err != nil { - return nil, fmt.Errorf("registration message error (timestamp): %w", err) - } - - timestamp, err := strconv.ParseInt(_timestamp, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid timestamp: %w", err) - } - if timestamp < 0 { - return nil, ErrNegativeTimestamp - } - - // GasLimit - _gasLimit, err := jsonparser.GetUnsafeString(value, "message", "gas_limit") - if err != nil { - return nil, fmt.Errorf("registration message error (gasLimit): %w", err) - } - - gasLimit, err := strconv.ParseUint(_gasLimit, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid gasLimit: %w", err) - } - - // FeeRecipient - _feeRecipient, err := jsonparser.GetUnsafeString(value, "message", "fee_recipient") - if err != nil { - return nil, fmt.Errorf("registration message error (fee_recipient): %w", err) - } - - feeRecipient, err := utils.HexToAddress(_feeRecipient) - if err != nil { - return nil, fmt.Errorf("registration message error (fee_recipient): %w", err) - } - - // Signature - _signature, err := jsonparser.GetUnsafeString(value, "signature") - if err != nil { - return nil, fmt.Errorf("registration message error (signature): %w", err) - } - - signature, err := utils.HexToSignature(_signature) - if err != nil { - return nil, fmt.Errorf("registration message error (signature): %w", err) - } - - // Construct and return full registration object - reg = &builderApiV1.SignedValidatorRegistration{ - Message: &builderApiV1.ValidatorRegistration{ - FeeRecipient: feeRecipient, - GasLimit: gasLimit, - Timestamp: time.Unix(timestamp, 0), - Pubkey: pubkey, - }, - Signature: signature, - } - - return reg, nil + // Parse the registrations + signedValidatorRegistrations, err := api.fastRegistrationParsing(regBytes, proposerContentType) + if err != nil { + handleError(log, http.StatusBadRequest, err.Error()) + return } // Iterate over the registrations - _, err = jsonparser.ArrayEach(body, func(value []byte, dataType jsonparser.ValueType, offset int, _err error) { + for regIndex, signedValidatorRegistration := range signedValidatorRegistrations.Registrations { numRegTotal += 1 if processingStoppedByError { return } numRegProcessed += 1 regLog := log.WithFields(logrus.Fields{ + "regIndex": regIndex, "numRegistrationsSoFar": numRegTotal, "numRegistrationsProcessed": numRegProcessed, }) - // Extract immediately necessary registration fields - signedValidatorRegistration, err := parseRegistration(value) - if err != nil { - handleError(regLog, http.StatusBadRequest, err.Error()) - return - } - // Add validator pubkey to logs pkHex := common.PubkeyHex(signedValidatorRegistration.Message.Pubkey.String()) regLog = regLog.WithFields(logrus.Fields{ @@ -1202,7 +1111,7 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re default: regLog.Error("validator registration channel full") } - }) + } log = log.WithFields(logrus.Fields{ "timeNeededSec": time.Since(start).Seconds(), @@ -1229,6 +1138,117 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re w.WriteHeader(http.StatusOK) } +func (api *RelayAPI) fastRegistrationParsing(regBytes []byte, contentType string) (*builderApiV1.SignedValidatorRegistrations, error) { + signedValidatorRegistrations := new(builderApiV1.SignedValidatorRegistrations) + + // Parse registrations as SSZ + if contentType == ApplicationOctetStream { + api.log.Debug("Parsing registrations as SSZ") + err := signedValidatorRegistrations.UnmarshalSSZ(regBytes) + if err != nil { + return nil, err + } + return signedValidatorRegistrations, nil + } + + // Parse registrations as JSON + parseRegistration := func(value []byte) (reg *builderApiV1.SignedValidatorRegistration, err error) { + // Pubkey + _pubkey, err := jsonparser.GetUnsafeString(value, "message", "pubkey") + if err != nil { + return nil, fmt.Errorf("registration message error (pubkey): %w", err) + } + + pubkey, err := utils.HexToPubkey(_pubkey) + if err != nil { + return nil, fmt.Errorf("registration message error (pubkey): %w", err) + } + + // Timestamp + _timestamp, err := jsonparser.GetUnsafeString(value, "message", "timestamp") + if err != nil { + return nil, fmt.Errorf("registration message error (timestamp): %w", err) + } + + timestamp, err := strconv.ParseInt(_timestamp, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid timestamp: %w", err) + } + if timestamp < 0 { + return nil, ErrNegativeTimestamp + } + + // GasLimit + _gasLimit, err := jsonparser.GetUnsafeString(value, "message", "gas_limit") + if err != nil { + return nil, fmt.Errorf("registration message error (gasLimit): %w", err) + } + + gasLimit, err := strconv.ParseUint(_gasLimit, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid gasLimit: %w", err) + } + + // FeeRecipient + _feeRecipient, err := jsonparser.GetUnsafeString(value, "message", "fee_recipient") + if err != nil { + return nil, fmt.Errorf("registration message error (fee_recipient): %w", err) + } + + feeRecipient, err := utils.HexToAddress(_feeRecipient) + if err != nil { + return nil, fmt.Errorf("registration message error (fee_recipient): %w", err) + } + + // Signature + _signature, err := jsonparser.GetUnsafeString(value, "signature") + if err != nil { + return nil, fmt.Errorf("registration message error (signature): %w", err) + } + + signature, err := utils.HexToSignature(_signature) + if err != nil { + return nil, fmt.Errorf("registration message error (signature): %w", err) + } + + // Construct and return full registration object + reg = &builderApiV1.SignedValidatorRegistration{ + Message: &builderApiV1.ValidatorRegistration{ + FeeRecipient: feeRecipient, + GasLimit: gasLimit, + Timestamp: time.Unix(timestamp, 0), + Pubkey: pubkey, + }, + Signature: signature, + } + + return reg, nil + } + + var parseErr error + api.log.Debug("Parsing registrations as JSON") + _, forEachErr := jsonparser.ArrayEach(regBytes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + if err != nil { + parseErr = err + return + } + signedValidatorRegistration, err := parseRegistration(value) + if err != nil { + parseErr = err + return + } + signedValidatorRegistrations.Registrations = append(signedValidatorRegistrations.Registrations, signedValidatorRegistration) + }) + if forEachErr != nil { + return nil, forEachErr + } + if parseErr != nil { + return nil, parseErr + } + + return signedValidatorRegistrations, nil +} + func (api *RelayAPI) handleGetHeader(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) slotStr := vars["slot"] @@ -1237,6 +1257,14 @@ func (api *RelayAPI) handleGetHeader(w http.ResponseWriter, req *http.Request) { ua := req.UserAgent() headSlot := api.headSlot.Load() + // Negotiate the response media type + negotiatedResponseMediaType, err := NegotiateRequestResponseType(req) + if err != nil { + api.log.WithError(err).Error("failed to negotiate response type") + api.RespondError(w, http.StatusNotAcceptable, err.Error()) + return + } + slot, err := strconv.ParseUint(slotStr, 10, 64) if err != nil { api.RespondError(w, http.StatusBadRequest, common.ErrInvalidSlot.Error()) @@ -1248,16 +1276,17 @@ func (api *RelayAPI) handleGetHeader(w http.ResponseWriter, req *http.Request) { msIntoSlot := requestTime.UnixMilli() - int64(slotStartTimestamp*1000) //nolint:gosec log := api.log.WithFields(logrus.Fields{ - "method": "getHeader", - "headSlot": headSlot, - "slot": slotStr, - "parentHash": parentHashHex, - "pubkey": proposerPubkeyHex, - "ua": ua, - "mevBoostV": common.GetMevBoostVersionFromUserAgent(ua), - "requestTimestamp": requestTime.Unix(), - "slotStartSec": slotStartTimestamp, - "msIntoSlot": msIntoSlot, + "method": "getHeader", + "headSlot": headSlot, + "slot": slotStr, + "parentHash": parentHashHex, + "pubkey": proposerPubkeyHex, + "ua": ua, + "mevBoostV": common.GetMevBoostVersionFromUserAgent(ua), + "requestTimestamp": requestTime.Unix(), + "slotStartSec": slotStartTimestamp, + "msIntoSlot": msIntoSlot, + "negotiatedResponseMediaType": negotiatedResponseMediaType, }) if len(proposerPubkeyHex) != 98 { @@ -1275,12 +1304,6 @@ func (api *RelayAPI) handleGetHeader(w http.ResponseWriter, req *http.Request) { return } - // TODO: Use NegotiateRequestResponseType, for now we only accept JSON - if !RequestAcceptsJSON(req) { - api.RespondError(w, http.StatusNotAcceptable, "only Accept: application/json is currently supported") - return - } - log.Debug("getHeader request received") defer func() { metrics.GetHeaderLatencyHistogram.Record( @@ -1342,7 +1365,52 @@ func (api *RelayAPI) handleGetHeader(w http.ResponseWriter, req *http.Request) { "blockHash": blockHash.String(), }).Info("bid delivered") - api.RespondOK(w, bid) + switch negotiatedResponseMediaType { + case ApplicationOctetStream: + log.Debug("responding with SSZ") + api.respondGetHeaderSSZ(w, bid) + default: + log.Debug("responding with JSON") + api.RespondOK(w, bid) + } +} + +// respondGetHeaderSSZ responds to the proposer in SSZ +func (api *RelayAPI) respondGetHeaderSSZ(w http.ResponseWriter, bid *builderSpec.VersionedSignedBuilderBid) { + // Serialize the response + var err error + var sszData []byte + switch bid.Version { + case spec.DataVersionBellatrix: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionBellatrix) + sszData, err = bid.Bellatrix.MarshalSSZ() + case spec.DataVersionCapella: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionCapella) + sszData, err = bid.Capella.MarshalSSZ() + case spec.DataVersionDeneb: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionDeneb) + sszData, err = bid.Deneb.MarshalSSZ() + case spec.DataVersionElectra: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionElectra) + sszData, err = bid.Electra.MarshalSSZ() + case spec.DataVersionUnknown, spec.DataVersionPhase0, spec.DataVersionAltair: + err = ErrInvalidForkVersion + } + if err != nil { + api.log.WithError(err).Error("error serializing response as SSZ") + http.Error(w, "failed to serialize response", http.StatusInternalServerError) + return + } + + // Write the header + w.Header().Set(HeaderContentType, ApplicationOctetStream) + w.WriteHeader(http.StatusOK) + + // Write SSZ data + if _, err := w.Write(sszData); err != nil { + api.log.WithError(err).Error("error writing SSZ response") + http.Error(w, "failed to write response", http.StatusInternalServerError) + } } func (api *RelayAPI) checkProposerSignature(block *common.VersionedSignedBlindedBeaconBlock, pubKey []byte) (bool, error) { @@ -1362,18 +1430,41 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) api.getPayloadCallsInFlight.Add(1) defer api.getPayloadCallsInFlight.Done() + // Negotiate the response media type + negotiatedResponseMediaType, err := NegotiateRequestResponseType(req) + if err != nil { + api.log.WithError(err).Error("failed to negotiate response type") + api.RespondError(w, http.StatusNotAcceptable, err.Error()) + return + } + + // Determine what encoding the proposer sent + proposerContentType := req.Header.Get(HeaderContentType) + parsedProposerContentType, _, err := mime.ParseMediaType(proposerContentType) + if err != nil { + api.log.WithError(err).Error("failed to parse proposer content type") + api.RespondError(w, http.StatusUnsupportedMediaType, err.Error()) + return + } + + // Get the optional consensus version + proposerEthConsensusVersion := req.Header.Get("Eth-Consensus-Version") + ua := req.UserAgent() headSlot := api.headSlot.Load() receivedAt := time.Now().UTC() log := api.log.WithFields(logrus.Fields{ - "method": "getPayload", - "ua": ua, - "mevBoostV": common.GetMevBoostVersionFromUserAgent(ua), - "contentLength": req.ContentLength, - "headSlot": headSlot, - "headSlotEpochPos": (headSlot % common.SlotsPerEpoch) + 1, - "idArg": req.URL.Query().Get("id"), - "timestampRequestStart": receivedAt.UnixMilli(), + "method": "getPayload", + "ua": ua, + "mevBoostV": common.GetMevBoostVersionFromUserAgent(ua), + "contentLength": req.ContentLength, + "headSlot": headSlot, + "headSlotEpochPos": (headSlot % common.SlotsPerEpoch) + 1, + "idArg": req.URL.Query().Get("id"), + "timestampRequestStart": receivedAt.UnixMilli(), + "negotiatedResponseMediaType": negotiatedResponseMediaType, + "parsedProposerContentType": parsedProposerContentType, + "proposerEthConsensusVersion": proposerEthConsensusVersion, }) // Log at start and end of request @@ -1390,25 +1481,6 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) ) }() - // TODO: Use NegotiateRequestResponseType, for now we only accept JSON - if !RequestAcceptsJSON(req) { - api.RespondError(w, http.StatusNotAcceptable, "only Accept: application/json is currently supported") - return - } - - // If the Content-Type header is included, for now only allow JSON. - // TODO: support Content-Type: application/octet-stream and allow SSZ - // request bodies. - if ct := req.Header.Get("Content-Type"); ct != "" { - switch ct { - case ApplicationJSON: - break - default: - api.RespondError(w, http.StatusUnsupportedMediaType, "only Content-Type: application/json is currently supported") - return - } - } - // Read the body first, so we can decode it later limitReader := io.LimitReader(req.Body, int64(apiMaxPayloadBytes)) body, err := io.ReadAll(limitReader) @@ -1426,7 +1498,8 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) // Decode payload payload := new(common.VersionedSignedBlindedBeaconBlock) - if err := json.NewDecoder(bytes.NewReader(body)).Decode(payload); err != nil { + err = payload.Unmarshal(body, parsedProposerContentType, proposerEthConsensusVersion) + if err != nil { log.WithError(err).Warn("failed to decode getPayload request") api.RespondError(w, http.StatusBadRequest, "failed to decode payload") return @@ -1713,8 +1786,15 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) // give the beacon network some time to propagate the block time.Sleep(time.Duration(getPayloadResponseDelayMs) * time.Millisecond) - // respond to the HTTP request - api.RespondOK(w, getPayloadResp) + // Respond appropriately + switch negotiatedResponseMediaType { + case ApplicationOctetStream: + log.Debug("responding with SSZ") + api.respondGetPayloadSSZ(w, getPayloadResp) + default: + log.Debug("responding with JSON") + api.RespondOK(w, getPayloadResp) + } blockNumber, err := payload.ExecutionBlockNumber() if err != nil { log.WithError(err).Info("failed to get block number") @@ -1749,6 +1829,44 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) log.Info("execution payload delivered") } +// respondGetPayloadSSZ responds to the proposer in SSZ +func (api *RelayAPI) respondGetPayloadSSZ(w http.ResponseWriter, result *builderApi.VersionedSubmitBlindedBlockResponse) { + // Serialize the response + var err error + var sszData []byte + switch result.Version { + case spec.DataVersionBellatrix: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionBellatrix) + sszData, err = result.Bellatrix.MarshalSSZ() + case spec.DataVersionCapella: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionCapella) + sszData, err = result.Capella.MarshalSSZ() + case spec.DataVersionDeneb: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionDeneb) + sszData, err = result.Deneb.MarshalSSZ() + case spec.DataVersionElectra: + w.Header().Set(HeaderEthConsensusVersion, common.EthConsensusVersionElectra) + sszData, err = result.Electra.MarshalSSZ() + case spec.DataVersionUnknown, spec.DataVersionPhase0, spec.DataVersionAltair: + err = ErrInvalidForkVersion + } + if err != nil { + api.log.WithError(err).Error("error serializing response as SSZ") + http.Error(w, "failed to serialize response", http.StatusInternalServerError) + return + } + + // Write the header + w.Header().Set(HeaderContentType, ApplicationOctetStream) + w.WriteHeader(http.StatusOK) + + // Write SSZ data + if _, err := w.Write(sszData); err != nil { + api.log.WithError(err).Error("error writing SSZ response") + http.Error(w, "failed to write response", http.StatusInternalServerError) + } +} + // -------------------- // // BLOCK BUILDER APIS @@ -2062,8 +2180,8 @@ func (api *RelayAPI) handleSubmitNewBlock(w http.ResponseWriter, req *http.Reque payload := new(common.VersionedSubmitBlockRequest) // Check for SSZ encoding - contentType := req.Header.Get("Content-Type") - if contentType == "application/octet-stream" { + contentType := req.Header.Get(HeaderContentType) + if contentType == ApplicationOctetStream { log = log.WithField("reqContentType", "ssz") pf.ContentType = "ssz" if err = payload.UnmarshalSSZ(requestPayloadBytes); err != nil { diff --git a/services/api/service_test.go b/services/api/service_test.go index 3acca2cd..0c891fde 100644 --- a/services/api/service_test.go +++ b/services/api/service_test.go @@ -104,17 +104,15 @@ func newTestBackend(t require.TestingT, numBeaconNodes int) *testBackend { return &backend } -func (be *testBackend) requestBytes(method, path string, payload []byte, headers map[string]string) *httptest.ResponseRecorder { +func (be *testBackend) requestBytes(method, path string, payload []byte, header *http.Header) *httptest.ResponseRecorder { var req *http.Request var err error req, err = http.NewRequest(method, path, bytes.NewReader(payload)) require.NoError(be.t, err) - // Set headers - for k, v := range headers { - req.Header.Set(k, v) - } + // Set header + req.Header = *header // lfg rr := httptest.NewRecorder() @@ -193,20 +191,42 @@ func TestLivez(t *testing.T) { func TestRegisterValidator(t *testing.T) { path := "/eth/v1/builder/validators" - t.Run("not a known validator", func(t *testing.T) { + t.Run("accept validator -- json", func(t *testing.T) { backend := newTestBackend(t, 1) + msg := common.ValidPayloadRegisterValidator + backend.datastore.SetKnownValidator(common.PubkeyHex(msg.Message.Pubkey.String()), 1) + rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{common.ValidPayloadRegisterValidator}) - require.Equal(t, http.StatusBadRequest, rr.Code) + require.Equal(t, http.StatusOK, rr.Code) + + // wait for the both channel notifications + select { + case val := <-backend.relay.validatorRegC: + require.Equal(t, val.Message.Pubkey, msg.Message.Pubkey) + default: + } + + select { + case <-backend.relay.validatorUpdateCh: + default: + } }) - t.Run("known validator", func(t *testing.T) { + t.Run("accept validator -- ssz", func(t *testing.T) { backend := newTestBackend(t, 1) msg := common.ValidPayloadRegisterValidator backend.datastore.SetKnownValidator(common.PubkeyHex(msg.Message.Pubkey.String()), 1) - rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{common.ValidPayloadRegisterValidator}) + regs := builderApiV1.SignedValidatorRegistrations{} + regs.Registrations = append(regs.Registrations, &msg) + regBytes, err := regs.MarshalSSZ() + require.NoError(t, err) + + rr := backend.requestBytes(http.MethodPost, path, regBytes, &http.Header{ + "Content-Type": []string{"application/octet-stream"}, + }) require.Equal(t, http.StatusOK, rr.Code) // wait for the both channel notifications @@ -221,6 +241,81 @@ func TestRegisterValidator(t *testing.T) { default: } }) + + t.Run("reject validator -- timestamp too early", func(t *testing.T) { + backend := newTestBackend(t, 1) + + msg := common.ValidPayloadRegisterValidator + newMessage := *msg.Message + // The minimum timestamp is genesis time, so anything less than that should always fail + newMessage.Timestamp = time.Unix(int64(backend.relay.genesisInfo.Data.GenesisTime-1), 0) //nolint:gosec + msg.Message = &newMessage + + backend.datastore.SetKnownValidator(common.PubkeyHex(msg.Message.Pubkey.String()), 1) + + rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{msg}) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "timestamp too early") + }) + + t.Run("reject validator -- timestamp too far in the future", func(t *testing.T) { + backend := newTestBackend(t, 1) + + msg := common.ValidPayloadRegisterValidator + newMessage := *msg.Message + // The time difference limit is 10 seconds, so 11 seconds past should always fail + newMessage.Timestamp = time.Now().Add(11 * time.Second) + msg.Message = &newMessage + + backend.datastore.SetKnownValidator(common.PubkeyHex(msg.Message.Pubkey.String()), 1) + + rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{msg}) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "timestamp too far in the future") + }) + + t.Run("reject validator -- not a known validator", func(t *testing.T) { + backend := newTestBackend(t, 1) + + msg := common.ValidPayloadRegisterValidator + + // We do not call backend.datastore.SetKnownValidator() + + rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{msg}) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "not a known validator") + }) + + t.Run("reject validator -- failed to verify validator signature", func(t *testing.T) { + backend := newTestBackend(t, 1) + + msg := common.ValidPayloadRegisterValidator + newMessage := *msg.Message + // Change the message without changing the signature + newMessage.Timestamp = msg.Message.Timestamp.Add(1 * time.Second) + msg.Message = &newMessage + + backend.datastore.SetKnownValidator(common.PubkeyHex(msg.Message.Pubkey.String()), 1) + + rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{msg}) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "failed to verify validator signature") + }) + + t.Run("accept validator -- milliseconds dont matter", func(t *testing.T) { + backend := newTestBackend(t, 1) + + msg := common.ValidPayloadRegisterValidator + newMessage := *msg.Message + // Anything less than a second is dropped from the encoded timestamp + newMessage.Timestamp = newMessage.Timestamp.Add(999 * time.Millisecond) + msg.Message = &newMessage + + backend.datastore.SetKnownValidator(common.PubkeyHex(msg.Message.Pubkey.String()), 1) + + rr := backend.request(http.MethodPost, path, []builderApiV1.SignedValidatorRegistration{msg}) + require.Equal(t, http.StatusOK, rr.Code) + }) } func TestGetHeader(t *testing.T) { @@ -259,7 +354,7 @@ func TestGetHeader(t *testing.T) { _, err := backend.redis.SaveBidAndUpdateTopBid(t.Context(), backend.redis.NewPipeline(), trace, payload, getPayloadResp, getHeaderResp, time.Now(), false, nil) require.NoError(t, err) - // Check 1: regular capella request works and returns a bid + // Check: JSON capella request works and returns a bid rr := backend.request(http.MethodGet, path, nil) require.Equal(t, http.StatusOK, rr.Code) resp := builderSpec.VersionedSignedBuilderBid{} @@ -270,6 +365,22 @@ func TestGetHeader(t *testing.T) { require.Equal(t, spec.DataVersionCapella, resp.Version) require.Equal(t, bidValue.String(), value.String()) + // Check: SSZ capella request works and returns a bid + rr = backend.requestBytes(http.MethodGet, path, nil, &http.Header{ + "Accept": []string{"application/octet-stream"}, + }) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, ApplicationOctetStream, rr.Header().Get("Content-Type")) + require.Equal(t, common.EthConsensusVersionCapella, rr.Header().Get("Eth-Consensus-Version")) + resp = builderSpec.VersionedSignedBuilderBid{} + resp.Version = spec.DataVersionCapella + resp.Capella = new(builderApiCapella.SignedBuilderBid) + err = resp.Capella.UnmarshalSSZ(rr.Body.Bytes()) + require.NoError(t, err) + value, err = resp.Value() + require.NoError(t, err) + require.Equal(t, bidValue.String(), value.String()) + // Create a deneb bid path = fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", slot+1, parentHash, proposerPubkey) opts = common.CreateTestBlockSubmissionOpts{ @@ -282,7 +393,7 @@ func TestGetHeader(t *testing.T) { _, err = backend.redis.SaveBidAndUpdateTopBid(t.Context(), backend.redis.NewPipeline(), trace, payload, getPayloadResp, getHeaderResp, time.Now(), false, nil) require.NoError(t, err) - // Check 2: regular deneb request works and returns a bid + // Check: JSON deneb request works and returns a bid rr = backend.request(http.MethodGet, path, nil) require.Equal(t, http.StatusOK, rr.Code) resp = builderSpec.VersionedSignedBuilderBid{} @@ -293,7 +404,23 @@ func TestGetHeader(t *testing.T) { require.Equal(t, spec.DataVersionDeneb, resp.Version) require.Equal(t, bidValue.String(), value.String()) - // Check 3: Request returns 204 if sending a filtered user agent + // Check: SSZ deneb request works and returns a bid + rr = backend.requestBytes(http.MethodGet, path, nil, &http.Header{ + "Accept": []string{"application/octet-stream"}, + }) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, ApplicationOctetStream, rr.Header().Get("Content-Type")) + require.Equal(t, common.EthConsensusVersionDeneb, rr.Header().Get("Eth-Consensus-Version")) + resp = builderSpec.VersionedSignedBuilderBid{} + resp.Version = spec.DataVersionDeneb + resp.Deneb = new(builderApiDeneb.SignedBuilderBid) + err = resp.Deneb.UnmarshalSSZ(rr.Body.Bytes()) + require.NoError(t, err) + value, err = resp.Value() + require.NoError(t, err) + require.Equal(t, bidValue.String(), value.String()) + + // Check: Request returns 204 if sending a filtered user agent rr = backend.requestWithUA(http.MethodGet, path, "mev-boost/v1.5.0 Go-http-client/1.1", nil) require.Equal(t, http.StatusNoContent, rr.Code) } @@ -513,7 +640,7 @@ func TestBuilderSubmitBlock(t *testing.T) { reqJSONBytes2, err := json.Marshal(req) require.NoError(t, err) require.JSONEq(t, string(reqJSONBytes), string(reqJSONBytes2)) - rr := backend.requestBytes(http.MethodPost, path, reqJSONBytes, nil) + rr := backend.requestBytes(http.MethodPost, path, reqJSONBytes, new(http.Header)) require.Contains(t, rr.Body.String(), "invalid signature") require.Equal(t, http.StatusBadRequest, rr.Code) @@ -521,31 +648,29 @@ func TestBuilderSubmitBlock(t *testing.T) { reqSSZBytes, err := req.MarshalSSZ() require.NoError(t, err) require.Len(t, reqSSZBytes, testCase.data.sszReqSize) - rr = backend.requestBytes(http.MethodPost, path, reqSSZBytes, map[string]string{ - "Content-Type": "application/octet-stream", + rr = backend.requestBytes(http.MethodPost, path, reqSSZBytes, &http.Header{ + "Content-Type": []string{"application/octet-stream"}, }) require.Contains(t, rr.Body.String(), "invalid signature") require.Equal(t, http.StatusBadRequest, rr.Code) // Send JSON+GZIP encoded request - headers := map[string]string{ - "Content-Encoding": "gzip", - } jsonGzip := gzipBytes(t, reqJSONBytes) require.Len(t, jsonGzip, testCase.data.jsonGzipReqSize) - rr = backend.requestBytes(http.MethodPost, path, jsonGzip, headers) + rr = backend.requestBytes(http.MethodPost, path, jsonGzip, &http.Header{ + "Content-Type": []string{"application/json"}, + "Content-Encoding": []string{"gzip"}, + }) require.Contains(t, rr.Body.String(), "invalid signature") require.Equal(t, http.StatusBadRequest, rr.Code) // Send SSZ+GZIP encoded request - headers = map[string]string{ - "Content-Type": "application/octet-stream", - "Content-Encoding": "gzip", - } - sszGzip := gzipBytes(t, reqSSZBytes) require.Len(t, sszGzip, testCase.data.sszGzipReqSize) - rr = backend.requestBytes(http.MethodPost, path, sszGzip, headers) + rr = backend.requestBytes(http.MethodPost, path, sszGzip, &http.Header{ + "Content-Type": []string{"application/octet-stream"}, + "Content-Encoding": []string{"gzip"}, + }) require.Contains(t, rr.Body.String(), "invalid signature") require.Equal(t, http.StatusBadRequest, rr.Code) }) @@ -1384,31 +1509,6 @@ func gzipBytes(t *testing.T, b []byte) []byte { return buf.Bytes() } -func TestRequestAcceptsJSON(t *testing.T) { - for _, tc := range []struct { - Header string - Expected bool - }{ - {Header: "", Expected: true}, - {Header: "application/json", Expected: true}, - {Header: "application/octet-stream", Expected: false}, - {Header: "application/octet-stream;q=1.0,application/json;q=0.9", Expected: true}, - {Header: "application/octet-stream;q=1.0,application/something-else;q=0.9", Expected: false}, - {Header: "application/octet-stream;q=1.0,application/*;q=0.9", Expected: true}, - {Header: "application/octet-stream;q=1.0,*/*;q=0.9", Expected: true}, - {Header: "application/*;q=0.9", Expected: true}, - {Header: "application/*", Expected: true}, - } { - t.Run(tc.Header, func(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, "/eth/v1/builder/header/1/0x00/0xaa", nil) - require.NoError(t, err) - req.Header.Set("Accept", tc.Header) - actual := RequestAcceptsJSON(req) - require.Equal(t, tc.Expected, actual) - }) - } -} - func TestNegotiateRequestResponseType(t *testing.T) { for _, tc := range []struct { Header string @@ -1418,6 +1518,7 @@ func TestNegotiateRequestResponseType(t *testing.T) { {Header: "", Expected: ApplicationJSON}, {Header: "application/json", Expected: ApplicationJSON}, {Header: "application/octet-stream", Expected: ApplicationOctetStream}, + {Header: "application/octet-stream;q=1,application/json;q=0.9", Expected: ApplicationOctetStream}, {Header: "application/octet-stream;q=1.0,application/json;q=0.9", Expected: ApplicationOctetStream}, {Header: "application/octet-stream;q=1.0,application/something-else;q=0.9", Expected: ApplicationOctetStream}, {Header: "application/octet-stream;q=1.0,application/*;q=0.9", Expected: ApplicationOctetStream},