Skip to content

Commit d858409

Browse files
authored
Merge pull request #88 from unicitynetwork/sharding-improvements
Sharding improvements
2 parents 7788968 + b271ce2 commit d858409

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+4823
-3218
lines changed

.dockerignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ changes.txt
5555

5656
# Local development
5757
.env
58-
.env.local
58+
.env.local
59+
60+
# Data directories (MongoDB, Redis, etc.)
61+
data/

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,18 @@ The service is configured via environment variables:
160160
| `BATCH_LIMIT` | Maximum number of commitments to process per batch | `1000` |
161161
| `ROUND_DURATION` | Duration between block creation rounds | `1s` |
162162

163+
### Storage Configuration
164+
| Variable | Description | Default |
165+
|----------|-------------|---------|
166+
| `USE_REDIS_FOR_COMMITMENTS` | Use Redis for commitment queue (instead of MongoDB) | `false` |
167+
| `REDIS_HOST` | Redis server hostname | `localhost` |
168+
| `REDIS_PORT` | Redis server port | `6379` |
169+
| `REDIS_PASSWORD` | Redis server password | `` |
170+
| `REDIS_DB` | Redis database number | `0` |
171+
| `REDIS_STREAM_NAME` | Redis stream name for commitments (allows multiple shards to share a Redis instance) | `commitments` |
172+
| `REDIS_FLUSH_INTERVAL` | Interval for flushing pending commitments to Redis | `50ms` |
173+
| `REDIS_MAX_BATCH_SIZE` | Maximum batch size before forcing flush | `2000` |
174+
163175
### BFT Configuration
164176

165177
| Variable | Description | Default |

cmd/commitment/main.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/rand"
7+
"encoding/json"
8+
"flag"
9+
"fmt"
10+
"log"
11+
"math/big"
12+
"net/http"
13+
"os"
14+
"time"
15+
16+
"github.com/btcsuite/btcd/btcec/v2"
17+
18+
"github.com/unicitynetwork/aggregator-go/internal/signing"
19+
"github.com/unicitynetwork/aggregator-go/pkg/api"
20+
)
21+
22+
type (
23+
jsonRPCRequest struct {
24+
JSONRPC string `json:"jsonrpc"`
25+
Method string `json:"method"`
26+
Params interface{} `json:"params"`
27+
ID int `json:"id"`
28+
}
29+
30+
jsonRPCResponse struct {
31+
JSONRPC string `json:"jsonrpc"`
32+
Result json.RawMessage `json:"result,omitempty"`
33+
Error *jsonRPCError `json:"error,omitempty"`
34+
ID int `json:"id"`
35+
}
36+
37+
jsonRPCError struct {
38+
Code int `json:"code"`
39+
Message string `json:"message"`
40+
Data interface{} `json:"data,omitempty"`
41+
}
42+
)
43+
44+
var (
45+
flagURL = flag.String("url", "https://goggregator-test.unicity.network/", "Aggregator JSON-RPC endpoint")
46+
flagAuth = flag.String("auth", "", "Optional Authorization header value")
47+
flagTimeout = flag.Duration("timeout", 45*time.Second, "Maximum time to wait for inclusion proof")
48+
flagPollInterval = flag.Duration("poll-interval", time.Second, "Polling interval for inclusion proof checks")
49+
flagVerbose = flag.Bool("v", true, "Log request and response payloads")
50+
)
51+
52+
func main() {
53+
flag.Parse()
54+
55+
logger := log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds)
56+
57+
ctx, cancel := context.WithTimeout(context.Background(), *flagTimeout)
58+
defer cancel()
59+
60+
client := &http.Client{
61+
Timeout: 15 * time.Second,
62+
}
63+
64+
commitReq := generateCommitmentRequest()
65+
if *flagVerbose {
66+
if payload, err := json.MarshalIndent(commitReq, "", " "); err == nil {
67+
logger.Printf("submit_commitment request:\n%s", payload)
68+
}
69+
}
70+
71+
logger.Printf("Submitting commitment to URL: %s", *flagURL)
72+
73+
submitResp, err := callJSONRPC(ctx, client, *flagURL, *flagAuth, "submit_commitment", commitReq)
74+
if err != nil {
75+
logger.Fatalf("submit_commitment call failed: %v", err)
76+
}
77+
78+
if *flagVerbose {
79+
if payload, err := json.MarshalIndent(submitResp, "", " "); err == nil {
80+
logger.Printf("submit_commitment response:\n%s", payload)
81+
}
82+
}
83+
84+
var submitResult api.SubmitCommitmentResponse
85+
if submitResp.Error != nil {
86+
logger.Fatalf("submit_commitment returned error: %s (code %d)", submitResp.Error.Message, submitResp.Error.Code)
87+
}
88+
if err := json.Unmarshal(submitResp.Result, &submitResult); err != nil {
89+
logger.Fatalf("failed to decode submit_commitment result: %v", err)
90+
}
91+
if submitResult.Status != "SUCCESS" {
92+
logger.Fatalf("submit_commitment status was %q", submitResult.Status)
93+
}
94+
95+
logger.Printf("Commitment %s accepted. Polling for inclusion proof...", commitReq.RequestID)
96+
97+
path, err := commitReq.RequestID.GetPath()
98+
if err != nil {
99+
logger.Fatalf("failed to derive SMT path: %v", err)
100+
}
101+
102+
submittedAt := time.Now()
103+
inclusionProof, verification, attempts, err := waitForInclusionProof(ctx, client, commitReq.RequestID, path, logger)
104+
if err != nil {
105+
logger.Fatalf("failed to retrieve inclusion proof after %d attempt(s): %v", attempts, err)
106+
}
107+
108+
if *flagVerbose {
109+
if payload, err := json.MarshalIndent(inclusionProof, "", " "); err == nil {
110+
logger.Printf("get_inclusion_proof response:\n%s", payload)
111+
}
112+
}
113+
114+
logger.Printf("Proof verification result: pathValid=%t pathIncluded=%t overall=%t",
115+
verification.PathValid, verification.PathIncluded, verification.Result)
116+
117+
elapsed := time.Since(submittedAt)
118+
logger.Printf("Valid inclusion proof received in %s after %d attempt(s).", elapsed.Round(time.Millisecond), attempts)
119+
120+
logger.Printf("Commitment %s successfully submitted and verified.", commitReq.RequestID)
121+
}
122+
123+
func generateCommitmentRequest() *api.SubmitCommitmentRequest {
124+
privateKey, err := btcec.NewPrivateKey()
125+
if err != nil {
126+
panic(fmt.Sprintf("failed to generate private key: %v", err))
127+
}
128+
129+
publicKeyBytes := privateKey.PubKey().SerializeCompressed()
130+
131+
stateData := make([]byte, 32)
132+
if _, err := rand.Read(stateData); err != nil {
133+
panic(fmt.Sprintf("failed to read random state bytes: %v", err))
134+
}
135+
136+
stateHashImprint := signing.CreateDataHashImprint(stateData)
137+
138+
requestID, err := api.CreateRequestID(publicKeyBytes, stateHashImprint)
139+
if err != nil {
140+
panic(fmt.Sprintf("failed to create request ID: %v", err))
141+
}
142+
143+
transactionData := make([]byte, 32)
144+
if _, err := rand.Read(transactionData); err != nil {
145+
panic(fmt.Sprintf("failed to read random transaction bytes: %v", err))
146+
}
147+
148+
transactionHashImprint := signing.CreateDataHashImprint(transactionData)
149+
transactionHashBytes, err := transactionHashImprint.DataBytes()
150+
if err != nil {
151+
panic(fmt.Sprintf("failed to extract transaction hash bytes: %v", err))
152+
}
153+
154+
signature, err := signing.NewSigningService().SignHash(transactionHashBytes, privateKey.Serialize())
155+
if err != nil {
156+
panic(fmt.Sprintf("failed to sign transaction hash: %v", err))
157+
}
158+
159+
receipt := false
160+
return &api.SubmitCommitmentRequest{
161+
RequestID: api.RequestID(requestID),
162+
TransactionHash: api.TransactionHash(transactionHashImprint),
163+
Authenticator: api.Authenticator{
164+
Algorithm: "secp256k1",
165+
PublicKey: api.HexBytes(publicKeyBytes),
166+
Signature: api.HexBytes(signature),
167+
StateHash: api.StateHash(stateHashImprint),
168+
},
169+
Receipt: &receipt,
170+
}
171+
}
172+
173+
func callJSONRPC(ctx context.Context, client *http.Client, url, authHeader, method string, params interface{}) (*jsonRPCResponse, error) {
174+
body, err := json.Marshal(jsonRPCRequest{
175+
JSONRPC: "2.0",
176+
Method: method,
177+
Params: params,
178+
ID: 1,
179+
})
180+
if err != nil {
181+
return nil, fmt.Errorf("failed to marshal request: %w", err)
182+
}
183+
184+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
185+
if err != nil {
186+
return nil, fmt.Errorf("failed to create request: %w", err)
187+
}
188+
189+
req.Header.Set("Content-Type", "application/json")
190+
/*if authHeader != "" {
191+
req.Header.Set("Authorization", "supersecret")
192+
}*/
193+
req.Header.Set("Authorization", "Bearer supersecret")
194+
195+
resp, err := client.Do(req)
196+
if err != nil {
197+
return nil, fmt.Errorf("request failed: %w", err)
198+
}
199+
defer resp.Body.Close()
200+
201+
var rpcResp jsonRPCResponse
202+
if err := json.NewDecoder(resp.Body).Decode(&rpcResp); err != nil {
203+
return nil, fmt.Errorf("failed to decode response: %w", err)
204+
}
205+
206+
return &rpcResp, nil
207+
}
208+
209+
func waitForInclusionProof(ctx context.Context, client *http.Client, requestID api.RequestID, requestPath *big.Int, logger *log.Logger) (*api.GetInclusionProofResponse, *api.PathVerificationResult, int, error) {
210+
deadline, ok := ctx.Deadline()
211+
if !ok {
212+
deadline = time.Now().Add(45 * time.Second)
213+
}
214+
215+
attempts := 0
216+
for time.Now().Before(deadline) {
217+
attempts++
218+
select {
219+
case <-ctx.Done():
220+
return nil, nil, attempts, ctx.Err()
221+
default:
222+
}
223+
224+
proofResp, err := callJSONRPC(ctx, client, *flagURL, *flagAuth, "get_inclusion_proof", api.GetInclusionProofRequest{
225+
RequestID: requestID,
226+
})
227+
if err != nil {
228+
logger.Printf("get_inclusion_proof attempt %d failed: %v", attempts, err)
229+
time.Sleep(*flagPollInterval)
230+
continue
231+
}
232+
233+
if proofResp.Error != nil {
234+
logger.Printf("get_inclusion_proof attempt %d returned error: %s (code %d)", attempts, proofResp.Error.Message, proofResp.Error.Code)
235+
time.Sleep(*flagPollInterval)
236+
continue
237+
}
238+
239+
var payload api.GetInclusionProofResponse
240+
if err := json.Unmarshal(proofResp.Result, &payload); err != nil {
241+
logger.Printf("get_inclusion_proof attempt %d decode error: %v", attempts, err)
242+
time.Sleep(*flagPollInterval)
243+
continue
244+
}
245+
246+
if payload.InclusionProof == nil || payload.InclusionProof.MerkleTreePath == nil {
247+
logger.Printf("get_inclusion_proof attempt %d: proof payload incomplete, retrying...", attempts)
248+
time.Sleep(*flagPollInterval)
249+
continue
250+
}
251+
252+
result, err := verifyProof(&payload, requestPath)
253+
if err != nil {
254+
logger.Printf("get_inclusion_proof attempt %d verification error: %v", attempts, err)
255+
time.Sleep(*flagPollInterval)
256+
continue
257+
}
258+
259+
if result.PathIncluded {
260+
return &payload, result, attempts, nil
261+
}
262+
263+
logger.Printf("get_inclusion_proof attempt %d: proof returned but path not included yet (pathValid=%t). Waiting...",
264+
attempts, result.PathValid)
265+
time.Sleep(*flagPollInterval)
266+
}
267+
268+
return nil, nil, attempts, fmt.Errorf("timed out waiting for inclusion proof for request %s", requestID)
269+
}
270+
271+
func verifyProof(resp *api.GetInclusionProofResponse, path *big.Int) (*api.PathVerificationResult, error) {
272+
if resp == nil || resp.InclusionProof == nil {
273+
return nil, fmt.Errorf("inclusion proof payload was empty")
274+
}
275+
276+
if resp.InclusionProof.MerkleTreePath == nil {
277+
return nil, fmt.Errorf("merkle tree path missing from inclusion proof")
278+
}
279+
280+
return resp.InclusionProof.MerkleTreePath.Verify(path)
281+
}

0 commit comments

Comments
 (0)