@@ -69,7 +69,9 @@ func CompareRowNumbers(upToDefinitionLevel int, a, b RowNumber) int {
6969// EqualRowNumber compares the sequences of row numbers in a and b
7070// for partial equality. A little faster than CompareRowNumbers(d,a,b)==0
7171func EqualRowNumber (upToDefinitionLevel int , a , b RowNumber ) bool {
72- for i := 0 ; i <= upToDefinitionLevel ; i ++ {
72+ // Compare in reverse order because most row number activity
73+ // occurs at the deeper definition levels.
74+ for i := upToDefinitionLevel ; i >= 0 ; i -- {
7375 if a [i ] != b [i ] {
7476 return false
7577 }
@@ -117,16 +119,14 @@ func (t *RowNumber) Valid() bool {
117119// from the Dremel whitepaper:
118120// https://storage.googleapis.com/pub-tools-public-publication-data/pdf/36632.pdf
119121// Name.Language.Country
120- // value | r | d | expected RowNumber
121- // -------|---|---|-------------------
122- //
123- // | | | { -1, -1, -1, -1 } <-- starting position
124- //
125- // us | 0 | 3 | { 0, 0, 0, 0 }
126- // null | 2 | 2 | { 0, 0, 1, -1 }
127- // null | 1 | 1 | { 0, 1, -1, -1 }
128- // gb | 1 | 3 | { 0, 2, 0, 0 }
129- // null | 0 | 1 | { 1, 0, -1, -1 }
122+ // | value | r | d | expected RowNumber
123+ // |--------|---|---|-------------------
124+ // | | | | { -1, -1, -1, -1 } <-- starting position
125+ // | us | 0 | 3 | { 0, 0, 0, 0 }
126+ // | null | 2 | 2 | { 0, 0, 1, -1 }
127+ // | null | 1 | 1 | { 0, 1, -1, -1 }
128+ // | gb | 1 | 3 | { 0, 2, 0, 0 }
129+ // | null | 0 | 1 | { 1, 0, -1, -1 }
130130func (t * RowNumber ) Next (repetitionLevel , definitionLevel , maxDefinitionLevel int ) {
131131 t [repetitionLevel ]++
132132
@@ -338,7 +338,6 @@ type SyncIteratorOpt func(i *SyncIterator)
338338// Not recommended with (very) high cardinality columns, such as UUIDs (spanID and traceID).
339339func SyncIteratorOptIntern () SyncIteratorOpt {
340340 return func (i * SyncIterator ) {
341- i .intern = true
342341 i .interner = intern .New ()
343342 }
344343}
@@ -418,8 +417,8 @@ type SyncIterator struct {
418417
419418 maxDefinitionLevel int
420419
421- intern bool
422- interner * intern. Interner
420+ interner * intern. Interner
421+ makeResult func ( t RowNumber , v * pq. Value ) * IteratorResult
423422}
424423
425424var _ Iterator = (* SyncIterator )(nil )
@@ -462,6 +461,25 @@ func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, opts ..
462461 opt (i )
463462 }
464463
464+ // Default value, always clone results until we have
465+ // checked the column type and determined it isn't needed.
466+ clone := true
467+
468+ // Always disable intern/clone for non-pointer types that don't need it.
469+ // This eliminates unnecessary function calls on the hot path.
470+ if len (rgs ) > 0 {
471+ cc := rgs [0 ].ColumnChunks ()[column ]
472+ switch cc .Type ().Kind () {
473+ case pq .ByteArray , pq .FixedLenByteArray :
474+ default :
475+ clone = false
476+ if i .interner != nil {
477+ i .interner .Close ()
478+ }
479+ i .interner = nil
480+ }
481+ }
482+
465483 if i .selectAs != "" {
466484 // Preallocate 1 entry with the given name.
467485 i .at .Entries = []struct {
@@ -472,6 +490,18 @@ func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, opts ..
472490 }
473491 }
474492
493+ // Based on all options and checks above, set the exact makeResult function
494+ switch {
495+ case i .selectAs == "" :
496+ i .makeResult = i .makeResultRowOnly
497+ case i .interner != nil :
498+ i .makeResult = i .makeResultIntern
499+ case clone :
500+ i .makeResult = i .makeResultClone
501+ default :
502+ i .makeResult = i .makeResultRaw
503+ }
504+
475505 _ , i .span = tracer .Start (ctx , "syncIterator" , trace .WithAttributes (
476506 attribute .Int ("columnIndex" , column ),
477507 attribute .String ("column" , i .columnName ),
@@ -846,22 +876,35 @@ func (c *SyncIterator) closeCurrRowGroup() {
846876 c .setPage (nil )
847877}
848878
849- func (c * SyncIterator ) makeResult (t RowNumber , v * pq.Value ) * IteratorResult {
850- // Use same static result instead of pooling
879+ // Several variations of optimized makeResult functions:
880+
881+ // makeResultIntern - The intern option was enabled and the column type contains
882+ // byte arrays that can benefit from interning.
883+ func (c * SyncIterator ) makeResultIntern (t RowNumber , v * pq.Value ) * IteratorResult {
884+ c .at .RowNumber = t
885+ c .at .Entries [0 ].Value = c .interner .UnsafeClone (v )
886+ return & c .at
887+ }
888+
889+ // makeResultClone - The column type contains pointers that must be cloned
890+ // to detach the values from the parquet buffers. But the intern option was
891+ // not enabled so we use parquet.Value.Clone() to create the copy.
892+ func (c * SyncIterator ) makeResultClone (t RowNumber , v * pq.Value ) * IteratorResult {
851893 c .at .RowNumber = t
894+ c .at .Entries [0 ].Value = v .Clone ()
895+ return & c .at
896+ }
852897
853- // The length of the Entries slice indicates if we should return the
854- // value or just the row number. This has already been checked during
855- // creation. SyncIterator reads a single column so the slice will
856- // always have length 0 or 1.
857- if len (c .at .Entries ) == 1 {
858- if c .intern {
859- c .at .Entries [0 ].Value = c .interner .UnsafeClone (v )
860- } else {
861- c .at .Entries [0 ].Value = v .Clone ()
862- }
863- }
898+ // makeResultRaw - The column type does not contain pointers, it is save to return the struct as-is.
899+ func (c * SyncIterator ) makeResultRaw (t RowNumber , v * pq.Value ) * IteratorResult {
900+ c .at .RowNumber = t
901+ c .at .Entries [0 ].Value = * v
902+ return & c .at
903+ }
864904
905+ // makeResultRowOnly - Not returning any values just save the row number.
906+ func (c * SyncIterator ) makeResultRowOnly (t RowNumber , _ * pq.Value ) * IteratorResult {
907+ c .at .RowNumber = t
865908 return & c .at
866909}
867910
@@ -870,7 +913,7 @@ func (c *SyncIterator) Close() {
870913
871914 c .span .End ()
872915
873- if c .intern && c . interner != nil {
916+ if c .interner != nil {
874917 c .interner .Close ()
875918 }
876919}
@@ -1047,8 +1090,19 @@ func (j *JoinIterator) collect(rowNumber RowNumber) (*IteratorResult, error) {
10471090 result .Reset ()
10481091 result .RowNumber = rowNumber
10491092
1093+ iters:
10501094 for i := range j .iters {
1051- for j .peeks [i ] != nil && EqualRowNumber (j .definitionLevel , j .peeks [i ].RowNumber , rowNumber ) {
1095+ for j .peeks [i ] != nil {
1096+
1097+ // Interned version of EqualRowNumber
1098+ // Compare in reverse order because most row number activity
1099+ // occurs at the deeper definition levels.
1100+ for k := j .definitionLevel ; k >= 0 ; k -- {
1101+ if j .peeks [i ].RowNumber [k ] != rowNumber [k ] {
1102+ continue iters
1103+ }
1104+ }
1105+
10521106 result .Append (j .peeks [i ])
10531107 j .peeks [i ], err = j .iters [i ].Next ()
10541108 if err != nil {
@@ -1272,39 +1326,54 @@ func (j *LeftJoinIterator) collect(rowNumber RowNumber) (*IteratorResult, error)
12721326 result .Reset ()
12731327 result .RowNumber = rowNumber
12741328
1275- collect := func (iters []Iterator , peeks []* IteratorResult ) {
1276- for i := range iters {
1277- // Collect matches
1278- for peeks [i ] != nil && EqualRowNumber (j .definitionLevel , peeks [i ].RowNumber , rowNumber ) {
1279- result .Append (peeks [i ])
1280- peeks [i ], err = iters [i ].Next ()
1281- if err != nil {
1282- return
1283- }
1284- }
1285- }
1286- }
1287-
12881329 // Collect is only called after we have found a match among all
12891330 // required iterators, therefore we only need to seek the optional ones to same location.
1290- err = j .seekAllOptional (rowNumber , j .definitionLevel )
1291- if err != nil {
1292- return nil , err
1331+ if len (j .optional ) > 0 {
1332+ err = j .seekAllOptional (rowNumber , j .definitionLevel )
1333+ if err != nil {
1334+ return nil , err
1335+ }
12931336 }
12941337
1295- collect ( j .required , j .peeksRequired )
1338+ err = j . collectInternal ( rowNumber , result , j .required , j .peeksRequired )
12961339 if err != nil {
12971340 return nil , err
12981341 }
12991342
1300- collect (j .optional , j .peeksOptional )
1301- if err != nil {
1302- return nil , err
1343+ if len (j .optional ) > 0 {
1344+ err = j .collectInternal (rowNumber , result , j .optional , j .peeksOptional )
1345+ if err != nil {
1346+ return nil , err
1347+ }
13031348 }
13041349
13051350 return result , nil
13061351}
13071352
1353+ func (j * LeftJoinIterator ) collectInternal (rowNumber RowNumber , result * IteratorResult , iters []Iterator , peeks []* IteratorResult ) (err error ) {
1354+ iters:
1355+ for i := range iters {
1356+ // Collect matches
1357+ for peeks [i ] != nil {
1358+ // Interned version of EqualRowNumber
1359+ // Compare in reverse order because most row number activity
1360+ // occurs at the deeper definition levels.
1361+ for k := j .definitionLevel ; k >= 0 ; k -- {
1362+ if peeks [i ].RowNumber [k ] != rowNumber [k ] {
1363+ continue iters
1364+ }
1365+ }
1366+
1367+ result .Append (peeks [i ])
1368+ peeks [i ], err = iters [i ].Next ()
1369+ if err != nil {
1370+ return err
1371+ }
1372+ }
1373+ }
1374+ return nil
1375+ }
1376+
13081377func (j * LeftJoinIterator ) Close () {
13091378 for _ , i := range j .required {
13101379 i .Close ()
0 commit comments