Skip to content

Commit 51211df

Browse files
authored
fix: Split large sels into multiple batches in CompactBatchs.Union - hotfix (#22841)
### **User description** ## What type of PR is this? - [ ] API-change - [x] BUG - [ ] Improvement - [ ] Documentation - [ ] Feature - [ ] Test and CI - [ ] Code Refactoring ## Which issue(s) this PR fixes: issue #22825 ## What this PR does / why we need it: This PR fixes a bug in `CompactBatchs.Union` method where large selection arrays (`sels`) exceeding `batchMaxRow` were not properly split into multiple batches when the batch collection was empty (`bats.Length() == 0`). **Problem:** In the `Union` method, when `bats.Length() == 0` and `sels` length exceeds `batchMaxRow`, the original implementation would create a single batch containing all selected rows without checking the batch size limit. This violates the `batchMaxRow` constraint that each batch should respect, potentially causing: - Batches exceeding the maximum row limit - Inconsistent behavior compared to the case when `bats.Length() != 0` (which already handles large `sels` correctly by splitting them) **Solution:** Modified the `Union` method to handle large `sels` arrays when `bats.Length() == 0` by splitting them into multiple batches, ensuring each batch respects the `batchMaxRow` limit. The fix iteratively processes `sels` in chunks of `batchMaxRow` size, creating multiple batches as needed. **Changes:** - Updated `pkg/container/batch/compact_batchs.go`: Added logic in the `bats.Length() == 0` branch to split large `sels` into multiple batches, making it consistent with the existing logic for `bats.Length() != 0` case - Added comprehensive test cases in `pkg/container/batch/compact_batchs_test.go`: `TestCompactBatchsUnionLargeSels` covers various scenarios: - Union with `selsLen > batchMaxRow` when `bats.Length() == 0` - Union with `selsLen == batchMaxRow` when `bats.Length() == 0` - Union with large `sels` when last batch is already full - Union with large `sels` when last batch has some rows This fix ensures consistent batch size handling across all code paths in the `Union` method, preventing potential issues with oversized batches. ___ ### **PR Type** Bug fix, Tests ___ ### **Description** - Split large selection arrays into multiple batches when empty - Ensures each batch respects batchMaxRow limit constraint - Added comprehensive test coverage for large selections - Fixes inconsistent behavior between empty and non-empty batch collections ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Union method called<br/>with large sels"] --> B{"bats.Length() == 0?"} B -->|Yes| C["Split sels into<br/>batchMaxRow chunks"] C --> D["Create multiple<br/>batches iteratively"] D --> E["Each batch respects<br/>batchMaxRow limit"] B -->|No| F["Existing logic<br/>handles splitting"] E --> G["Consistent behavior<br/>across all paths"] F --> G ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>compact_batchs.go</strong><dd><code>Implement batch splitting for large selections</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> pkg/container/batch/compact_batchs.go <ul><li>Wrapped large selection handling in a loop to process selections in <br>chunks<br> <li> Each chunk is limited to <code>batchMaxRow</code> size<br> <li> Creates multiple batches iteratively instead of single oversized batch<br> <li> Maintains consistency with existing logic for non-empty batch <br>collections</ul> </details> </td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22841/files#diff-13a01ffd2c00c5e8faff10e905d8067f400bafe3d940f6da252774850e0bfc85">+18/-8</a>&nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Tests</strong></td><td><table> <tr> <td> <details> <summary><strong>compact_batchs_test.go</strong><dd><code>Add comprehensive tests for large selection splitting</code>&nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> pkg/container/batch/compact_batchs_test.go <ul><li>Added <code>TestCompactBatchsUnionLargeSels</code> test function with four <br>comprehensive test cases<br> <li> Test case 1: Validates splitting when selsLen > batchMaxRow on empty <br>collection<br> <li> Test case 2: Validates single batch creation when selsLen == <br>batchMaxRow<br> <li> Test case 3: Validates new batch creation when last batch is full<br> <li> Test case 4: Validates batch filling and overflow when last batch has <br>partial rows</ul> </details> </td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22841/files#diff-b568feda7c727d7a72c52ec6ebd3a44ed0676127e33093f4d66ddc1c784b2471">+116/-0</a>&nbsp; </td> </tr> </table></td></tr></tr></tbody></table> </details> ___
1 parent a8f97af commit 51211df

File tree

2 files changed

+134
-8
lines changed

2 files changed

+134
-8
lines changed

pkg/container/batch/compact_batchs.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,26 @@ func (bats *CompactBatchs) Union(mpool *mpool.MPool, inBatch *Batch, sels []int3
166166
}
167167

168168
if bats.Length() == 0 {
169-
tmpBat := NewWithSize(len(inBatch.Vecs))
170-
for i := range tmpBat.Vecs {
171-
tmpBat.Vecs[i] = vector.NewVec(*inBatch.Vecs[i].GetType())
172-
err := tmpBat.Vecs[i].UnionInt32(inBatch.Vecs[i], sels, mpool)
173-
if err != nil {
174-
return err
169+
// Handle large sels: split into multiple batches if selsLen > batchMaxRow
170+
remainingSels := sels
171+
for len(remainingSels) > 0 {
172+
tmpSize := len(remainingSels)
173+
if tmpSize > bats.batchMaxRow {
174+
tmpSize = bats.batchMaxRow
175175
}
176+
tmpSels := remainingSels[:tmpSize]
177+
tmpBat := NewWithSize(len(inBatch.Vecs))
178+
for i := range tmpBat.Vecs {
179+
tmpBat.Vecs[i] = vector.NewVec(*inBatch.Vecs[i].GetType())
180+
err := tmpBat.Vecs[i].UnionInt32(inBatch.Vecs[i], tmpSels, mpool)
181+
if err != nil {
182+
return err
183+
}
184+
}
185+
tmpBat.rowCount = tmpBat.Vecs[0].Length()
186+
bats.batchs = append(bats.batchs, tmpBat)
187+
remainingSels = remainingSels[tmpSize:]
176188
}
177-
tmpBat.rowCount = tmpBat.Vecs[0].Length()
178-
bats.batchs = append(bats.batchs, tmpBat)
179189
return nil
180190
}
181191

pkg/container/batch/compact_batchs_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,122 @@ func TestCompactBatchsUnion(t *testing.T) {
232232
bats.Clean(mp)
233233
}
234234

235+
func TestCompactBatchsUnionLargeSels(t *testing.T) {
236+
mp := mpool.MustNewZero()
237+
bats := NewCompactBatchs(TestBatchMaxRow)
238+
239+
// Test case 1: Union with selsLen > batchMaxRow when bats.Length() == 0
240+
bat1 := makeTestBatch(20000, mp) // Create a large batch
241+
var sels []int32
242+
for i := 0; i < 15000; i++ {
243+
sels = append(sels, int32(i))
244+
}
245+
err := bats.Union(mp, bat1, sels)
246+
require.NoError(t, err)
247+
248+
// Should create multiple batches since selsLen (15000) > batchMaxRow (8192)
249+
expectedBatches1 := (15000 + TestBatchMaxRow - 1) / TestBatchMaxRow // Ceiling division
250+
require.Equal(t, expectedBatches1, bats.Length())
251+
require.Equal(t, 15000, bats.RowCount())
252+
253+
// Verify each batch doesn't exceed batchMaxRow
254+
for i := 0; i < bats.Length(); i++ {
255+
bat := bats.Get(i)
256+
require.NotNil(t, bat)
257+
require.LessOrEqual(t, bat.rowCount, TestBatchMaxRow)
258+
if i < bats.Length()-1 {
259+
// All batches except the last should be exactly batchMaxRow
260+
require.Equal(t, TestBatchMaxRow, bat.rowCount)
261+
}
262+
}
263+
264+
bat1.Clean(mp)
265+
bats.Clean(mp)
266+
require.Equal(t, int64(0), mp.CurrNB())
267+
268+
// Test case 2: Union with selsLen == batchMaxRow when bats.Length() == 0
269+
bats = NewCompactBatchs(TestBatchMaxRow)
270+
bat2 := makeTestBatch(TestBatchMaxRow, mp)
271+
sels = nil
272+
for i := 0; i < TestBatchMaxRow; i++ {
273+
sels = append(sels, int32(i))
274+
}
275+
err = bats.Union(mp, bat2, sels)
276+
require.NoError(t, err)
277+
require.Equal(t, 1, bats.Length())
278+
require.Equal(t, TestBatchMaxRow, bats.RowCount())
279+
require.Equal(t, TestBatchMaxRow, bats.Get(0).rowCount)
280+
281+
bat2.Clean(mp)
282+
bats.Clean(mp)
283+
require.Equal(t, int64(0), mp.CurrNB())
284+
285+
// Test case 3: Union with selsLen > batchMaxRow when lastBat is already full
286+
bats = NewCompactBatchs(TestBatchMaxRow)
287+
bat3 := makeTestBatch(TestBatchMaxRow, mp)
288+
sels = nil
289+
for i := 0; i < TestBatchMaxRow; i++ {
290+
sels = append(sels, int32(i))
291+
}
292+
// Fill first batch to full
293+
err = bats.Union(mp, bat3, sels)
294+
require.NoError(t, err)
295+
require.Equal(t, 1, bats.Length())
296+
require.Equal(t, TestBatchMaxRow, bats.Get(0).rowCount)
297+
298+
// Now Union with large sels again
299+
bat4 := makeTestBatch(20000, mp)
300+
sels = nil
301+
for i := 0; i < 10000; i++ {
302+
sels = append(sels, int32(i))
303+
}
304+
err = bats.Union(mp, bat4, sels)
305+
require.NoError(t, err)
306+
// Should create new batches since lastBat is full
307+
// First batch is full (8192), remaining 10000 sels should create 2 batches: 8192 + 1808
308+
expectedBatches3 := 1 + (10000+TestBatchMaxRow-1)/TestBatchMaxRow
309+
require.Equal(t, expectedBatches3, bats.Length())
310+
require.Equal(t, TestBatchMaxRow+10000, bats.RowCount())
311+
require.Equal(t, TestBatchMaxRow, bats.Get(0).rowCount)
312+
require.Equal(t, TestBatchMaxRow, bats.Get(1).rowCount)
313+
require.Equal(t, 10000-TestBatchMaxRow, bats.Get(2).rowCount) // 10000 - 8192 = 1808
314+
315+
bat3.Clean(mp)
316+
bat4.Clean(mp)
317+
bats.Clean(mp)
318+
require.Equal(t, int64(0), mp.CurrNB())
319+
320+
// Test case 4: Union with selsLen > batchMaxRow when lastBat has some rows
321+
bats = NewCompactBatchs(TestBatchMaxRow)
322+
bat5 := makeTestBatch(10000, mp)
323+
sels = nil
324+
for i := 0; i < 5000; i++ {
325+
sels = append(sels, int32(i))
326+
}
327+
// First Union: creates batch with 5000 rows
328+
err = bats.Union(mp, bat5, sels)
329+
require.NoError(t, err)
330+
require.Equal(t, 1, bats.Length())
331+
require.Equal(t, 5000, bats.Get(0).rowCount)
332+
333+
// Second Union: with 10000 sels, should fill first batch to 8192, then create new batch
334+
sels = nil
335+
for i := 0; i < 10000; i++ {
336+
sels = append(sels, int32(i))
337+
}
338+
err = bats.Union(mp, bat5, sels)
339+
require.NoError(t, err)
340+
// First batch should be filled to 8192 (5000 + 3192), second batch has remaining 6808
341+
require.Equal(t, 2, bats.Length())
342+
require.Equal(t, 5000+10000, bats.RowCount())
343+
require.Equal(t, TestBatchMaxRow, bats.Get(0).rowCount)
344+
require.Equal(t, 10000-3192, bats.Get(1).rowCount) // 10000 - (8192-5000) = 6808
345+
346+
bat5.Clean(mp)
347+
bats.Clean(mp)
348+
require.Equal(t, int64(0), mp.CurrNB())
349+
}
350+
235351
func makeTestBatch(max int, mp *mpool.MPool) *Batch {
236352
bat := NewWithSize(2)
237353
bat.Vecs[0] = makeTestVec(max, mp)

0 commit comments

Comments
 (0)