Skip to content

Commit e2fe66c

Browse files
committed
gossip: bug endianness fd_crds_mask_iter fix
1 parent 7000f9e commit e2fe66c

File tree

4 files changed

+228
-40
lines changed

4 files changed

+228
-40
lines changed

src/flamenco/gossip/crds/fd_crds.c

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,16 @@ struct fd_crds_entry_private {
137137
ulong next;
138138
} expire;
139139

140-
/* Finally, a core operation on the CRDS is to to query for values by
141-
hash, to respond to pull requests. This is done with a treap
142-
sorted by hash, which is just the first 8 bytes value_hash. */
140+
/* In order to load balance pull request messages across peers, each
141+
message has a mask value that is mask_bits long. The pull request
142+
is only concerned with CRDS entires with a hash where the first
143+
mask_bits of the hash match the mask value.
144+
145+
We need to be able to quickly iterate over all CRDS table entries
146+
matching a given mask. To do this, we store the first 8 bytes of
147+
the value_hash in a sorted treap. */
143148
struct {
144-
ulong hash;
149+
ulong hash_prefix;
145150
ulong parent;
146151
ulong left;
147152
ulong right;
@@ -204,18 +209,16 @@ struct fd_crds_entry_private {
204209
#define TREAP_NAME hash_treap
205210
#define TREAP_T fd_crds_entry_t
206211
#define TREAP_QUERY_T ulong
207-
#define TREAP_CMP(q,e) ((q>e->hash.hash)-(q<e->hash.hash))
212+
#define TREAP_CMP(q,e) ((q>e->hash.hash_prefix)-(q<e->hash.hash_prefix))
208213
#define TREAP_IDX_T ulong
209214
#define TREAP_OPTIMIZE_ITERATION 1
210215
#define TREAP_NEXT hash.next
211216
#define TREAP_PREV hash.prev
212-
#define TREAP_LT(e0,e1) ((e0)->hash.hash<(e1)->hash.hash)
213-
217+
#define TREAP_LT(e0,e1) ((e0)->hash.hash_prefix<(e1)->hash.hash_prefix)
214218
#define TREAP_PARENT hash.parent
215219
#define TREAP_LEFT hash.left
216220
#define TREAP_RIGHT hash.right
217221
#define TREAP_PRIO hash.prio
218-
219222
#include "../../../util/tmpl/fd_treap.c"
220223

221224
static inline ulong
@@ -280,6 +283,7 @@ struct fd_crds_purged {
280283
/* Similar to fd_crds_entry, we want the ability to query and iterate
281284
through value by hash[:8] to generate pull requests. */
282285
struct {
286+
ulong hash_prefix;
283287
ulong parent;
284288
ulong left;
285289
ulong right;
@@ -311,17 +315,16 @@ typedef struct fd_crds_purged fd_crds_purged_t;
311315
#define TREAP_NAME purged_treap
312316
#define TREAP_T fd_crds_purged_t
313317
#define TREAP_QUERY_T ulong
314-
#define TREAP_CMP(q,e) ((q>*(ulong *)(e->hash))-(q<*(ulong *)(e->hash)))
318+
#define TREAP_CMP(q,e) ((q>e->treap.hash_prefix)-(q<e->treap.hash_prefix))
319+
#define TREAP_IDX_T ulong
315320
#define TREAP_OPTIMIZE_ITERATION 1
316321
#define TREAP_NEXT treap.next
317322
#define TREAP_PREV treap.prev
318-
#define TREAP_LT(e0,e1) (*(ulong *)((e0)->hash)<*(ulong *)((e1)->hash))
319-
320-
#define TREAP_PARENT treap.parent
321-
#define TREAP_LEFT treap.left
322-
#define TREAP_RIGHT treap.right
323-
#define TREAP_PRIO treap.prio
324-
323+
#define TREAP_LT(e0,e1) ((e0)->treap.hash_prefix<(e1)->treap.hash_prefix)
324+
#define TREAP_PARENT treap.parent
325+
#define TREAP_LEFT treap.left
326+
#define TREAP_RIGHT treap.right
327+
#define TREAP_PRIO treap.prio
325328
#include "../../../util/tmpl/fd_treap.c"
326329

327330
#define DLIST_NAME failed_inserts_dlist
@@ -756,7 +759,7 @@ crds_entry_init( fd_gossip_view_crds_value_t const * view,
756759
out_value->stake = stake;
757760

758761
fd_crds_generate_hash( sha, payload+view->value_off, view->length, out_value->value_hash );
759-
out_value->hash.hash = fd_ulong_load_8( out_value->value_hash );
762+
out_value->hash.hash_prefix = fd_ulong_bswap( fd_ulong_load_8( out_value->value_hash ) );
760763

761764
if( FD_UNLIKELY( view->tag==FD_GOSSIP_VALUE_NODE_INSTANCE ) ) {
762765
out_value->node_instance.token = view->node_instance->token;
@@ -770,16 +773,19 @@ crds_entry_init( fd_gossip_view_crds_value_t const * view,
770773
static inline void
771774
purged_init( fd_crds_purged_t * purged,
772775
uchar const * hash,
776+
ulong hash_prefix,
773777
long now ) {
774778
fd_memcpy( purged->hash, hash, 32UL );
779+
purged->treap.hash_prefix = hash_prefix;
775780
purged->expire.wallclock_nanos = now;
776781
}
777782

778783
void
779784
insert_purged( fd_crds_t * crds,
780785
uchar const * hash,
781786
long now ) {
782-
if( purged_treap_ele_query( crds->purged.treap, *(ulong *)hash, crds->purged.pool ) ) {
787+
ulong hash_prefix = fd_ulong_bswap( fd_ulong_load_8( hash ) );
788+
if( purged_treap_ele_query( crds->purged.treap, hash_prefix, crds->purged.pool ) ) {
783789
return;
784790
}
785791
fd_crds_purged_t * purged;
@@ -795,7 +801,7 @@ insert_purged( fd_crds_t * crds,
795801
crds->metrics->purged_cnt++;
796802
}
797803
}
798-
purged_init( purged, hash, now );
804+
purged_init( purged, hash, hash_prefix, now );
799805
purged_treap_ele_insert( crds->purged.treap, purged, crds->purged.pool );
800806
purged_dlist_ele_push_tail( crds->purged.purged_dlist, purged, crds->purged.pool );
801807
}
@@ -841,7 +847,8 @@ void
841847
fd_crds_insert_failed_insert( fd_crds_t * crds,
842848
uchar const * hash,
843849
long now ) {
844-
if( purged_treap_ele_query( crds->purged.treap, *(ulong *)hash, crds->purged.pool ) ) {
850+
ulong hash_prefix = fd_ulong_bswap( fd_ulong_load_8( hash ) );
851+
if( purged_treap_ele_query( crds->purged.treap, hash_prefix, crds->purged.pool ) ) {
845852
return;
846853
}
847854
fd_crds_purged_t * failed;
@@ -857,7 +864,7 @@ fd_crds_insert_failed_insert( fd_crds_t * crds,
857864
crds->metrics->purged_cnt++;
858865
}
859866
}
860-
purged_init( failed, hash, now );
867+
purged_init( failed, hash, hash_prefix, now );
861868
purged_treap_ele_insert( crds->purged.treap, failed, crds->purged.pool );
862869
failed_inserts_dlist_ele_push_tail( crds->purged.failed_inserts_dlist, failed, crds->purged.pool );
863870
}
@@ -873,7 +880,7 @@ fd_crds_checks_fast( fd_crds_t * crds,
873880

874881
if( FD_UNLIKELY( !incumbent ) ) return FD_CRDS_UPSERT_CHECK_UPSERTS;
875882

876-
if( FD_UNLIKELY( *(ulong *)incumbent->value_bytes==(*(ulong *)(payload+candidate->value_off)) ) ) {
883+
if( FD_UNLIKELY( fd_ulong_load_8( incumbent->value_bytes )==fd_ulong_load_8( payload+candidate->value_off ) ) ) {
877884
/* We have a duplicate, so we return the number of duplicates */
878885
return (int)(++incumbent->num_duplicates);
879886
}
@@ -1245,6 +1252,19 @@ fd_crds_bucket_add( fd_crds_t * crds,
12451252
crds->samplers->ele_cnt );
12461253
}
12471254

1255+
static void
1256+
generate_masks( ulong mask,
1257+
uint mask_bits,
1258+
ulong * start_mask,
1259+
ulong * end_mask ) {
1260+
/* agave defines the mask as a ulong with the top mask_bits bits
1261+
set to the desired prefix and all other bits set to 1. */
1262+
FD_TEST( mask_bits<64U );
1263+
ulong range = fd_ulong_mask( 0U, (int)(63U-mask_bits) );
1264+
*start_mask = mask & ~range;
1265+
*end_mask = mask | range;
1266+
}
1267+
12481268
struct fd_crds_mask_iter_private {
12491269
ulong idx;
12501270
ulong end_hash;
@@ -1255,12 +1275,11 @@ fd_crds_mask_iter_init( fd_crds_t const * crds,
12551275
ulong mask,
12561276
uint mask_bits,
12571277
uchar iter_mem[ static 16UL ] ) {
1258-
fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1259-
ulong start_hash = 0 ;
1260-
if( FD_LIKELY( mask_bits > 0) ) start_hash = (mask&(~0UL<<(64UL-mask_bits)));
1261-
1262-
it->end_hash = mask;
1278+
ulong start_hash, end_hash;
1279+
generate_masks( mask, mask_bits, &start_hash, &end_hash );
12631280

1281+
fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1282+
it->end_hash = end_hash;
12641283
it->idx = hash_treap_idx_ge( crds->hash_treap, start_hash, crds->pool );
12651284
return it;
12661285
}
@@ -1276,7 +1295,7 @@ int
12761295
fd_crds_mask_iter_done( fd_crds_mask_iter_t * it, fd_crds_t const * crds ) {
12771296
fd_crds_entry_t const * val = hash_treap_ele_fast_const( it->idx, crds->pool );
12781297
return hash_treap_idx_is_null( it->idx ) ||
1279-
(it->end_hash < val->hash.hash);
1298+
(it->end_hash < val->hash.hash_prefix);
12801299
}
12811300

12821301
fd_crds_entry_t const *
@@ -1289,11 +1308,11 @@ fd_crds_purged_mask_iter_init( fd_crds_t const * crds,
12891308
ulong mask,
12901309
uint mask_bits,
12911310
uchar iter_mem[ static 16UL ] ){
1292-
fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1293-
ulong start_hash = 0;
1294-
if( FD_LIKELY( mask_bits > 0 ) ) start_hash = (mask&(~0UL<<(64UL-mask_bits)));
1295-
it->end_hash = mask;
1311+
ulong start_hash, end_hash;
1312+
generate_masks( mask, mask_bits, &start_hash, &end_hash );
12961313

1314+
fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1315+
it->end_hash = end_hash;
12971316
it->idx = purged_treap_idx_ge( crds->purged.treap, start_hash, crds->purged.pool );
12981317
return it;
12991318
}
@@ -1311,7 +1330,7 @@ fd_crds_purged_mask_iter_done( fd_crds_mask_iter_t * it,
13111330
fd_crds_t const * crds ){
13121331
fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
13131332
return purged_treap_idx_is_null( it->idx ) ||
1314-
(it->end_hash < fd_ulong_load_8( val->hash ));
1333+
(it->end_hash < val->treap.hash_prefix);
13151334
}
13161335

13171336
/* fd_crds_purged_mask_iter_hash returns the hash of the current

src/flamenco/gossip/crds/test_crds.c

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,183 @@ test_crds_new_basic( void ) {
2626
free( mem );
2727
}
2828

29+
static ulong
30+
generate_mask( ulong prefix,
31+
uint mask_bits ) {
32+
/* agave defines the mask as a ulong with the top mask_bits bits
33+
set to the desired prefix and all other bits set to 1. */
34+
FD_TEST( mask_bits<64U );
35+
if( FD_UNLIKELY( mask_bits==0U ) ) return ~0UL;
36+
prefix &= fd_ulong_mask( 0U, (int)(mask_bits-1U) );
37+
prefix <<= 64U-mask_bits;
38+
return prefix | fd_ulong_mask( 0U, (int)(63U-mask_bits) );
39+
}
40+
41+
static void
42+
test_generate_mask( void ) {
43+
FD_TEST( generate_mask( 0UL, 0U ) == 0xFFFFFFFFFFFFFFFF );
44+
FD_TEST( generate_mask( 1UL, 0U ) == 0xFFFFFFFFFFFFFFFF );
45+
FD_TEST( generate_mask( ~0UL, 0U ) == 0xFFFFFFFFFFFFFFFF );
46+
47+
FD_TEST( generate_mask( 0UL, 1U ) == 0x7FFFFFFFFFFFFFFF );
48+
FD_TEST( generate_mask( 1UL, 1U ) == 0xFFFFFFFFFFFFFFFF );
49+
FD_TEST( generate_mask( 2UL, 1U ) == 0x7FFFFFFFFFFFFFFF );
50+
FD_TEST( generate_mask( 3UL, 1U ) == 0xFFFFFFFFFFFFFFFF );
51+
52+
FD_TEST( generate_mask( 0UL, 4U ) == 0x0FFFFFFFFFFFFFFF );
53+
FD_TEST( generate_mask( 255UL, 4U ) == 0xFFFFFFFFFFFFFFFF );
54+
55+
FD_TEST( generate_mask( 0UL, 8U ) == 0x00FFFFFFFFFFFFFF );
56+
FD_TEST( generate_mask( 68UL, 8U ) == 0x44FFFFFFFFFFFFFF );
57+
FD_TEST( generate_mask( 70UL, 8U ) == 0x46FFFFFFFFFFFFFF );
58+
FD_TEST( generate_mask( 255UL, 8U ) == 0xFFFFFFFFFFFFFFFF );
59+
60+
FD_TEST( generate_mask( 0UL, 63U ) == 0x0000000000000001 );
61+
FD_TEST( generate_mask( 1UL, 63U ) == 0x0000000000000003 );
62+
FD_TEST( generate_mask( ~0UL, 63U ) == 0xFFFFFFFFFFFFFFFF );
63+
}
64+
65+
static void
66+
do_mask_test( ulong prefix,
67+
uint mask_bits,
68+
ulong expected_matches,
69+
fd_crds_t * crds ) {
70+
uchar iter_mem[ 16UL ];
71+
ulong mask = generate_mask( prefix, mask_bits );
72+
ulong matches = 0UL;
73+
for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( crds, mask, mask_bits, iter_mem );
74+
!fd_crds_mask_iter_done( it, crds );
75+
it=fd_crds_mask_iter_next( it, crds ) ) {
76+
matches++;
77+
fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, crds );
78+
uchar const * hash = fd_crds_entry_hash( candidate );
79+
ulong prefix = fd_ulong_bswap( fd_ulong_load_8( hash ) );
80+
FD_TEST( (prefix & mask)==prefix );
81+
}
82+
if( FD_UNLIKELY( matches!=expected_matches ) ) FD_LOG_ERR(( "incorrect match count, expected %lu got %lu", expected_matches, matches ));
83+
}
84+
85+
static void
86+
do_purged_mask_test( ulong prefix,
87+
uint mask_bits,
88+
ulong expected_matches,
89+
fd_crds_t * crds ) {
90+
uchar iter_mem[ 16UL ];
91+
ulong mask = generate_mask( prefix, mask_bits );
92+
ulong matches = 0UL;
93+
for( fd_crds_mask_iter_t * it=fd_crds_purged_mask_iter_init( crds, mask, mask_bits, iter_mem );
94+
!fd_crds_purged_mask_iter_done( it, crds );
95+
it=fd_crds_purged_mask_iter_next( it, crds ) ) {
96+
matches++;
97+
uchar const * hash = fd_crds_purged_mask_iter_hash( it, crds );
98+
ulong prefix = fd_ulong_bswap( fd_ulong_load_8( hash ) );
99+
FD_TEST( (prefix & mask)==prefix );
100+
}
101+
if( FD_UNLIKELY( matches!=expected_matches ) ) FD_LOG_ERR(( "incorrect match count, expected %lu got %lu", expected_matches, matches ));
102+
}
103+
104+
static void
105+
test_crds_mask_iter( void ) {
106+
fd_rng_t _rng[1];
107+
fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) );
108+
FD_TEST( rng );
109+
110+
ulong ele_max = 1024UL;
111+
ulong purged_max = 1024UL;
112+
void * crds_mem = aligned_alloc( fd_crds_align(), fd_crds_footprint( ele_max, purged_max ) );
113+
FD_TEST( crds_mem );
114+
static fd_gossip_out_ctx_t test_gossip_out_ctx = {0};
115+
fd_crds_t * crds = fd_crds_join( fd_crds_new( crds_mem, rng, ele_max, purged_max, &test_gossip_out_ctx ) );
116+
FD_TEST( crds );
117+
118+
long now = fd_log_wallclock();
119+
120+
#define ENTRY_CNT 768UL
121+
uchar payloads[ ENTRY_CNT ][ FD_GOSSIP_CRDS_MAX_SZ ];
122+
for( ulong i=0UL; i<ENTRY_CNT; i++ ) {
123+
for( ulong j=0UL; j<FD_GOSSIP_CRDS_MAX_SZ; j++ ) payloads[ i ][ j ] = fd_rng_uchar( rng );
124+
FD_STORE( uint, payloads[ i ]+64UL, FD_GOSSIP_VALUE_LOWEST_SLOT );
125+
}
126+
127+
for( ulong i=0UL; i<ENTRY_CNT; i++ ) {
128+
fd_gossip_view_crds_value_t view[1];
129+
view->value_off = 0UL;
130+
view->pubkey_off = 64UL+4UL;
131+
view->wallclock_nanos = now;
132+
view->length = FD_GOSSIP_CRDS_MAX_SZ;
133+
view->tag = FD_GOSSIP_VALUE_LOWEST_SLOT;
134+
view->lowest_slot = i;
135+
136+
int crds_res = fd_crds_checks_fast( crds, view, payloads[ i ], 0 );
137+
FD_TEST( crds_res==FD_CRDS_UPSERT_CHECK_UPSERTS );
138+
fd_crds_entry_t const * entry = fd_crds_insert( crds, view, payloads[ i ], 0UL, 0, now, NULL );
139+
FD_TEST( entry!=NULL );
140+
}
141+
142+
/* Zero bits should yeild all crds entries */
143+
do_mask_test( 0x0UL, 0U, ENTRY_CNT, crds );
144+
145+
/* 63 bits should yeild one crds entry */
146+
do_mask_test( (0x356e7c8ca538cdcaUL>>1UL), 63U, 1UL, crds );
147+
148+
/* All the hashes starting with byte 0x11 */
149+
do_mask_test( 0x11UL, 8U, 4UL, crds );
150+
151+
/* Hashes starting with 0b11111 */
152+
do_mask_test( 0x1fUL, 5U, 28UL, crds );
153+
154+
/* Invalid prefix */
155+
do_mask_test( 0xbeef, 12U, 0UL, crds );
156+
157+
/* No purged yet */
158+
uchar iter_mem[ 16UL ];
159+
fd_crds_mask_iter_t * it = fd_crds_purged_mask_iter_init( crds, generate_mask( 0x0UL, 0U ), 0U, iter_mem );
160+
FD_TEST( fd_crds_purged_mask_iter_done( it, crds ) );
161+
162+
/* Purge everything */
163+
now += 60L*60L*1000L*1000L*1000L;
164+
for( ulong i=0UL; i<ENTRY_CNT; i++ ) {
165+
for( ulong j=0UL; j<64UL; j++ ) payloads[ i ][ j ] = fd_rng_uchar( rng );
166+
167+
fd_gossip_view_crds_value_t view[1];
168+
view->value_off = 0UL;
169+
view->pubkey_off = 64UL+4UL;
170+
view->wallclock_nanos = now;
171+
view->length = FD_GOSSIP_CRDS_MAX_SZ;
172+
view->tag = FD_GOSSIP_VALUE_LOWEST_SLOT;
173+
view->lowest_slot = i;
174+
175+
int crds_res = fd_crds_checks_fast( crds, view, payloads[ i ], 0 );
176+
FD_TEST( crds_res==FD_CRDS_UPSERT_CHECK_UPSERTS );
177+
fd_crds_entry_t const * entry = fd_crds_insert( crds, view, payloads[ i ], 0UL, 0, now, NULL );
178+
FD_TEST( entry!=NULL );
179+
}
180+
181+
/* Zero bits should yeild all crds entries */
182+
do_purged_mask_test( 0x0UL, 0U, ENTRY_CNT, crds );
183+
184+
/* 63 bits should yeild one crds entry */
185+
do_purged_mask_test( (0x356e7c8ca538cdcaUL>>1UL), 63U, 1UL, crds );
186+
187+
/* All the hashes starting with byte 0x11 */
188+
do_purged_mask_test( 0x11UL, 8U, 4UL, crds );
189+
190+
/* Hashes starting with 0b11111 */
191+
do_purged_mask_test( 0x1fUL, 5U, 28UL, crds );
192+
193+
/* Invalid prefix */
194+
do_purged_mask_test( 0xbeef, 12U, 0UL, crds );
195+
196+
free( crds_mem );
197+
}
198+
29199
int
30200
main( int argc,
31201
char ** argv ) {
32202
fd_boot( &argc, &argv );
33203
test_crds_new_basic();
204+
test_generate_mask();
205+
test_crds_mask_iter();
34206
FD_LOG_NOTICE(( "pass" ));
35207
fd_halt();
36208
return 0;

0 commit comments

Comments
 (0)