From c0f294ddbcd5e994a027c03a864bf13e5bee0906 Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Thu, 6 Nov 2025 15:43:50 +0100 Subject: [PATCH 1/5] Error handling improvements and better integration with OOCEvictionManager --- .../instructions/OOCInstructionParser.java | 3 + .../ooc/IndexingOOCInstruction.java | 360 ++++++++++++++++++ .../ooc/MatrixIndexingOOCInstruction.java | 248 ++++++++++++ .../instructions/ooc/UnaryOOCInstruction.java | 7 + .../apache/sysds/runtime/util/IndexRange.java | 12 + .../ooc/RightIndexingCoordsTest.java | 133 +++++++ .../test/functions/ooc/RightIndexingTest.java | 215 +++++++++++ .../scripts/functions/ooc/RightIndexing.dml | 31 ++ .../functions/ooc/RightIndexingCoords.dml | 29 ++ 9 files changed, 1038 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java create mode 100644 src/test/scripts/functions/ooc/RightIndexing.dml create mode 100644 src/test/scripts/functions/ooc/RightIndexingCoords.dml diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index 4e9a92ecb78..d81c6a40b61 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -27,6 +27,7 @@ import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.TSMMOOCInstruction; @@ -75,6 +76,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str return CentralMomentOOCInstruction.parseInstruction(str); case Ctable: return CtableOOCInstruction.parseInstruction(str); + case MatrixIndexing: + return IndexingOOCInstruction.parseInstruction(str); default: throw new DMLRuntimeException("Invalid OOC Instruction Type: " + ooctype); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java new file mode 100644 index 00000000000..354abebef74 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.util.IndexRange; + +import java.util.function.BiConsumer; + +public abstract class IndexingOOCInstruction extends UnaryOOCInstruction { + protected final CPOperand rowLower, rowUpper, colLower, colUpper; + + public static IndexingOOCInstruction parseInstruction(String str) { + IndexingCPInstruction cpInst = IndexingCPInstruction.parseInstruction(str); + return parseInstruction(cpInst); + } + + public static IndexingOOCInstruction parseInstruction(IndexingCPInstruction cpInst) { + String opcode = cpInst.getOpcode(); + + if(opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString())) { + if(cpInst.input1.getDataType().isMatrix()) { + return new MatrixIndexingOOCInstruction(cpInst.input1, cpInst.getRowLower(), cpInst.getRowUpper(), + cpInst.getColLower(), cpInst.getColUpper(), cpInst.output, cpInst.getOpcode(), + cpInst.getInstructionString()); + } + else { + throw new NotImplementedException(); + } + } + + throw new NotImplementedException(); + } + + protected IndexingOOCInstruction(CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, + CPOperand out, String opcode, String istr) { + super(OOCInstruction.OOCType.MatrixIndexing, null, in, out, opcode, istr); + rowLower = rl; + rowUpper = ru; + colLower = cl; + colUpper = cu; + } + + protected IndexingOOCInstruction(CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, + CPOperand cu, CPOperand out, String opcode, String istr) { + super(OOCInstruction.OOCType.MatrixIndexing, null, lhsInput, rhsInput, out, opcode, istr); + rowLower = rl; + rowUpper = ru; + colLower = cl; + colUpper = cu; + } + + protected IndexRange getIndexRange(ExecutionContext ec) { + return new IndexRange( //rl, ru, cl, ru + (int) (ec.getScalarInput(rowLower).getLongValue() - 1), + (int) (ec.getScalarInput(rowUpper).getLongValue() - 1), + (int) (ec.getScalarInput(colLower).getLongValue() - 1), + (int) (ec.getScalarInput(colUpper).getLongValue() - 1)); + } + + public static class BlockAligner { + private static final boolean DEBUG = false; + + private final int _blocksize; + private final IndexRange indexRange; + private final IndexRange blockRange; + private final int _outRows; + private final int _outCols; + private final Sector[] _blocks; + private int _emitCtr; + + @SuppressWarnings("unchecked") + public BlockAligner(IndexRange range, int blocksize) { + indexRange = range; + _blocksize = blocksize; + + long firstBlockRow = range.rowStart / blocksize; + long lastBlockRow = range.rowEnd / blocksize; + long firstBlockCol = range.colStart / blocksize; + long lastBlockCol = range.colEnd / blocksize; + blockRange = new IndexRange(firstBlockRow, lastBlockRow + 1, firstBlockCol, lastBlockCol + 1); + + long totalRows = range.rowSpan() + 1; + long totalCols = range.colSpan() + 1; + _outRows = (int) ((totalRows + blocksize - 1) / blocksize); + _outCols = (int) ((totalCols + blocksize - 1) / blocksize); + + _blocks = (Sector[]) new Sector[_outRows * _outCols]; + _emitCtr = 0; + + if(DEBUG) { + System.out.println("BlockAligner: range=" + range + ", blocksize=" + blocksize); + System.out.println("BlockRange: " + blockRange); + System.out.println("Output grid: " + _outRows + "x" + _outCols); + } + } + + public boolean isAligned() { + return (indexRange.rowStart % _blocksize) == 0 && (indexRange.colStart % _blocksize) == 0; + } + + public boolean putNext(MatrixIndexes index, T data, BiConsumer> emitter) { + long blockRow = index.getRowIndex() - 1; + long blockCol = index.getColumnIndex() - 1; + + if(!blockRange.isWithin(blockRow, blockCol)) + return false; + + long blockRowStart = blockRow * _blocksize; + long blockRowEnd = blockRowStart + _blocksize - 1; + long blockColStart = blockCol * _blocksize; + long blockColEnd = blockColStart + _blocksize - 1; + + long overlapRowStart = Math.max(indexRange.rowStart, blockRowStart); + long overlapRowEnd = Math.min(indexRange.rowEnd, blockRowEnd); + if(overlapRowStart > overlapRowEnd) + return false; + + long overlapColStart = Math.max(indexRange.colStart, blockColStart); + long overlapColEnd = Math.min(indexRange.colEnd, blockColEnd); + if(overlapColStart > overlapColEnd) + return false; + + int outRowStart = (int) ((overlapRowStart - indexRange.rowStart) / _blocksize); + int outRowEnd = (int) ((overlapRowEnd - indexRange.rowStart) / _blocksize); + int outColStart = (int) ((overlapColStart - indexRange.colStart) / _blocksize); + int outColEnd = (int) ((overlapColEnd - indexRange.colStart) / _blocksize); + + for(int outRow = outRowStart; outRow <= outRowEnd; outRow++) { + long targetRowStartGlobal = indexRange.rowStart + (long) outRow * _blocksize; + long targetRowEndGlobal = Math.min(indexRange.rowEnd, targetRowStartGlobal + _blocksize - 1); + long targetStartBlockRow = targetRowStartGlobal / _blocksize; + long targetEndBlockRow = targetRowEndGlobal / _blocksize; + int rowSegments = (int) (targetEndBlockRow - targetStartBlockRow + 1); + + for(int outCol = outColStart; outCol <= outColEnd; outCol++) { + long targetColStartGlobal = indexRange.colStart + (long) outCol * _blocksize; + long targetColEndGlobal = Math.min(indexRange.colEnd, targetColStartGlobal + _blocksize - 1); + long targetStartBlockCol = targetColStartGlobal / _blocksize; + long targetEndBlockCol = targetColEndGlobal / _blocksize; + int colSegments = (int) (targetEndBlockCol - targetStartBlockCol + 1); + + int rowOffset = (rowSegments == 1) ? 0 : (blockRow == targetStartBlockRow ? 0 : 1); + int colOffset = (colSegments == 1) ? 0 : (blockCol == targetStartBlockCol ? 0 : 1); + + Sector sector = getOrCreate(outRow, outCol, rowSegments, colSegments); + if(sector == null) + continue; + + boolean emit = sector.place(rowOffset, colOffset, blockRow, blockCol, data); + if(emit) { + int idxPos = resolveIndex(outRow, outCol); + _blocks[idxPos] = null; + _emitCtr++; + emitter.accept(new MatrixIndexes(outRow + 1, outCol + 1), sector); + } + } + } + + return _emitCtr >= _blocks.length; + } + + private int resolveIndex(int row, int col) { + if(row < 0 || row >= _outRows || col < 0 || col >= _outCols) + return -1; + return row * _outCols + col; + } + + private synchronized Sector getOrCreate(int outRow, int outCol, int rowSegments, int colSegments) { + int idx = resolveIndex(outRow, outCol); + if(idx == -1) + return null; + + Sector s = _blocks[idx]; + if(s == null) { + if(rowSegments == 1 && colSegments == 1) + s = new Sector1<>(); + else if(rowSegments == 1 && colSegments == 2) + s = new Sector2Col<>(); + else if(rowSegments == 2 && colSegments == 1) + s = new Sector2Row<>(); + else + s = new Sector4<>(); + _blocks[idx] = s; + } + + return s; + } + + public synchronized void close() { + if(_emitCtr != _blocks.length) + throw new DMLRuntimeException("BlockAligner still has some unfinished sectors"); + } + } + + public static final class BlockComponent { + public final long blockRow; + public final long blockCol; + public final T data; + + public BlockComponent(long blockRow, long blockCol, T data) { + this.blockRow = blockRow; + this.blockCol = blockCol; + this.data = data; + } + } + + public static abstract class Sector { + public abstract boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data); + + public abstract BlockComponent get(int rowOffset, int colOffset); + + public abstract int count(); + } + + public static class Sector1 extends Sector { + private BlockComponent _data; + + @Override + public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + if(rowOffset != 0 || colOffset != 0) + return false; + + _data = new BlockComponent<>(blockRow, blockCol, data); + return true; + } + + @Override + public synchronized BlockComponent get(int rowOffset, int colOffset) { + return (rowOffset == 0 && colOffset == 0) ? _data : null; + } + + @Override + public synchronized int count() { + return _data == null ? 0 : 1; + } + } + + public static class Sector4 extends Sector { + private int _count; + private final BlockComponent[] _data; + + @SuppressWarnings("unchecked") + public Sector4() { + _count = 0; + _data = (BlockComponent[]) new BlockComponent[4]; + } + + @Override + public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + int pos = rowOffset * 2 + colOffset; + if(_data[pos] == null) { + _data[pos] = new BlockComponent<>(blockRow, blockCol, data); + _count++; + } + return _count == 4; + } + + @Override + public synchronized BlockComponent get(int rowOffset, int colOffset) { + return _data[rowOffset * 2 + colOffset]; + } + + @Override + public synchronized int count() { + return _count; + } + } + + public static class Sector2Col extends Sector { + private int _count; + private final BlockComponent[] _data; + + @SuppressWarnings("unchecked") + public Sector2Col() { + _count = 0; + _data = (BlockComponent[]) new BlockComponent[2]; + } + + @Override + public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + if(rowOffset != 0 || colOffset < 0 || colOffset > 1) + return false; + + if(_data[colOffset] == null) { + _data[colOffset] = new BlockComponent<>(blockRow, blockCol, data); + _count++; + } + + return _count == 2; + } + + @Override + public synchronized BlockComponent get(int rowOffset, int colOffset) { + return (rowOffset == 0 && colOffset >= 0 && colOffset < 2) ? _data[colOffset] : null; + } + + @Override + public synchronized int count() { + return _count; + } + } + + public static class Sector2Row extends Sector { + private int _count; + private final BlockComponent[] _data; + + @SuppressWarnings("unchecked") + public Sector2Row() { + _count = 0; + _data = (BlockComponent[]) new BlockComponent[2]; + } + + @Override + public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + if(colOffset != 0 || rowOffset < 0 || rowOffset > 1) + return false; + + if(_data[rowOffset] == null) { + _data[rowOffset] = new BlockComponent<>(blockRow, blockCol, data); + _count++; + } + + return _count == 2; + } + + @Override + public synchronized BlockComponent get(int rowOffset, int colOffset) { + return (colOffset == 0 && rowOffset >= 0 && rowOffset < 2) ? _data[rowOffset] : null; + } + + @Override + public synchronized int count() { + return _count; + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java new file mode 100644 index 00000000000..ba5bebc73a7 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.DoubleObject; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.util.IndexRange; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class MatrixIndexingOOCInstruction extends IndexingOOCInstruction { + + public MatrixIndexingOOCInstruction(CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, + CPOperand out, String opcode, String istr) { + super(in, rl, ru, cl, cu, out, opcode, istr); + } + + protected MatrixIndexingOOCInstruction(CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, + CPOperand ru, CPOperand cl, CPOperand cu, CPOperand out, String opcode, String istr) { + super(lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr); + } + + @Override + public void processInstruction(ExecutionContext ec) { + String opcode = getOpcode(); + IndexRange ix = getIndexRange(ec); + + MatrixObject mo = ec.getMatrixObject(input1.getName()); + int blocksize = mo.getBlocksize(); + long firstBlockRow = ix.rowStart / blocksize; + long lastBlockRow = ix.rowEnd / blocksize; + long firstBlockCol = ix.colStart / blocksize; + long lastBlockCol = ix.colEnd / blocksize; + + boolean inRange = ix.rowStart < mo.getNumRows() && ix.colStart < mo.getNumColumns(); + + OOCStream qIn = mo.getStreamHandle(); + OOCStream qOut = createWritableStream(); + + addInStream(qIn); + addOutStream(qOut); + + ec.getMatrixObject(output).setStreamHandle(qOut); + + //right indexing + if( opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString()) ) + { + if (output.isScalar() && inRange) { + IndexedMatrixValue tmp; + + while ((tmp = qIn.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { + if (tmp.getIndexes().getRowIndex() == firstBlockRow && tmp.getIndexes().getColumnIndex() == firstBlockCol) { + ec.setScalarOutput(output.getName(), new DoubleObject(tmp.getValue().get((int)ix.rowStart % blocksize, (int)ix.rowEnd % blocksize))); + return; + } + } + + throw new DMLRuntimeException("Desired block not found"); + } + + + final AtomicReference> futureRef = new AtomicReference<>(); + + if (ix.rowStart % blocksize == 0 && ix.colStart % blocksize == 0) { + // Aligned case: interior blocks can be forwarded directly, borders may require slicing + final int outBlockRows = (int)Math.ceil((double)(ix.rowSpan() + 1) / blocksize); + final int outBlockCols = (int)Math.ceil((double)(ix.colSpan() + 1) / blocksize); + final int totalBlocks = outBlockRows * outBlockCols; + final AtomicInteger producedBlocks = new AtomicInteger(0); + + CompletableFuture future = filterOOC(qIn, tmp -> { + MatrixIndexes inIdx = tmp.getIndexes(); + long blockRow = inIdx.getRowIndex() - 1; + long blockCol = inIdx.getColumnIndex() - 1; + + if (blockRow < firstBlockRow || blockRow > lastBlockRow || blockCol < firstBlockCol || blockCol > lastBlockCol) + return; + + MatrixBlock block = (MatrixBlock) tmp.getValue(); + + int rowStartLocal = (blockRow == firstBlockRow) ? (int)(ix.rowStart % blocksize) : 0; + int rowEndLocal = (blockRow == lastBlockRow) + ? Math.min(block.getNumRows() - 1, (int)(ix.rowEnd % blocksize)) + : block.getNumRows() - 1; + int colStartLocal = (blockCol == firstBlockCol) ? (int)(ix.colStart % blocksize) : 0; + int colEndLocal = (blockCol == lastBlockCol) + ? Math.min(block.getNumColumns() - 1, (int)(ix.colEnd % blocksize)) + : block.getNumColumns() - 1; + + MatrixBlock outBlock; + if (rowStartLocal == 0 && rowEndLocal == block.getNumRows() - 1 && + colStartLocal == 0 && colEndLocal == block.getNumColumns() - 1) { + outBlock = block; + } + else { + outBlock = block.slice(rowStartLocal, rowEndLocal, colStartLocal, colEndLocal); + } + + long outBlockRow = blockRow - firstBlockRow + 1; + long outBlockCol = blockCol - firstBlockCol + 1; + qOut.enqueue(new IndexedMatrixValue(new MatrixIndexes(outBlockRow, outBlockCol), outBlock)); + + if (producedBlocks.incrementAndGet() >= totalBlocks) { + CompletableFuture f = futureRef.get(); + if (f != null) + f.cancel(true); + } + }, tmp -> { + long blockRow = tmp.getIndexes().getRowIndex() - 1; + long blockCol = tmp.getIndexes().getColumnIndex() - 1; + return blockRow >= firstBlockRow && blockRow <= lastBlockRow + && blockCol >= firstBlockCol && blockCol <= lastBlockCol; + }, qOut::closeInput); + futureRef.set(future); + return; + } + + final BlockAligner aligner = new BlockAligner<>(ix, blocksize); + + // We need to construct our own stream to properly manage the cached items + final CachingStream cachedStream = qIn.hasStreamCache() ? qIn.getStreamCache() : new CachingStream(qIn); + cachedStream.activateIndexing(); + + CompletableFuture future = filterOOC(cachedStream.getReadStream(), tmp -> { + boolean completed = aligner.putNext(tmp.getIndexes(), new IndexedBlockMeta(tmp), (idx, sector) -> { + int targetBlockRow = (int)(idx.getRowIndex() - 1); + int targetBlockCol = (int)(idx.getColumnIndex() - 1); + + long targetRowStartGlobal = ix.rowStart + (long)targetBlockRow * blocksize; + long targetRowEndGlobal = Math.min(ix.rowEnd, targetRowStartGlobal + blocksize - 1); + long targetColStartGlobal = ix.colStart + (long)targetBlockCol * blocksize; + long targetColEndGlobal = Math.min(ix.colEnd, targetColStartGlobal + blocksize - 1); + + int nRows = (int)(targetRowEndGlobal - targetRowStartGlobal + 1); + int nCols = (int)(targetColEndGlobal - targetColStartGlobal + 1); + + long firstSrcBlockRow = targetRowStartGlobal / blocksize; + long lastSrcBlockRow = targetRowEndGlobal / blocksize; + int rowSegments = (int)(lastSrcBlockRow - firstSrcBlockRow + 1); + + long firstSrcBlockCol = targetColStartGlobal / blocksize; + long lastSrcBlockCol = targetColEndGlobal / blocksize; + int colSegments = (int)(lastSrcBlockCol - firstSrcBlockCol + 1); + + MatrixBlock target = null; + + for (int r = 0; r < rowSegments; r++) { + for (int c = 0; c < colSegments; c++) { + BlockComponent component = sector.get(r, c); + if (component == null) + continue; + + IndexedMatrixValue mv = cachedStream.findCached(component.data.idx); + MatrixBlock srcBlock = (MatrixBlock)mv.getValue(); + + if (target == null) + target = new MatrixBlock(nRows, nCols, srcBlock.isInSparseFormat()); + + long srcBlockRowStart = component.blockRow * blocksize; + long srcBlockColStart = component.blockCol * blocksize; + long sliceRowStartGlobal = Math.max(targetRowStartGlobal, srcBlockRowStart); + long sliceRowEndGlobal = Math.min(targetRowEndGlobal, srcBlockRowStart + srcBlock.getNumRows() - 1); + long sliceColStartGlobal = Math.max(targetColStartGlobal, srcBlockColStart); + long sliceColEndGlobal = Math.min(targetColEndGlobal, srcBlockColStart + srcBlock.getNumColumns() - 1); + + int sliceRowStart = (int)(sliceRowStartGlobal - srcBlockRowStart); + int sliceRowEnd = (int)(sliceRowEndGlobal - srcBlockRowStart); + int sliceColStart = (int)(sliceColStartGlobal - srcBlockColStart); + int sliceColEnd = (int)(sliceColEndGlobal - srcBlockColStart); + + int targetRowOffset = (int)(sliceRowStartGlobal - targetRowStartGlobal); + int targetColOffset = (int)(sliceColStartGlobal - targetColStartGlobal); + + MatrixBlock sliced = srcBlock.slice(sliceRowStart, sliceRowEnd, sliceColStart, sliceColEnd); + sliced.putInto(target, targetRowOffset, targetColOffset, true); + } + } + + if (target == null) + target = new MatrixBlock(nRows, nCols, false); + + qOut.enqueue(new IndexedMatrixValue(idx, target)); + }); + if (completed) { + CompletableFuture f = futureRef.get(); + if (f != null) + f.cancel(true); + } + }, tmp -> { + long blockRow = tmp.getIndexes().getRowIndex() - 1; + long blockCol = tmp.getIndexes().getColumnIndex() - 1; + return blockRow >= firstBlockRow && blockRow <= lastBlockRow + && blockCol >= firstBlockCol && blockCol <= lastBlockCol; + }, () -> { + aligner.close(); + qOut.closeInput(); + }); + futureRef.set(future); + } + //left indexing + else if ( opcode.equalsIgnoreCase(Opcodes.LEFT_INDEX.toString())) + { + throw new NotImplementedException(); + } + else + throw new DMLRuntimeException("Invalid opcode (" + opcode +") encountered in MatrixIndexingOOCInstruction."); + } + + private static class IndexedBlockMeta { + public final MatrixIndexes idx; + public final long nrows; + public final long ncols; + + public IndexedBlockMeta(IndexedMatrixValue mv) { + this.idx = mv.getIndexes(); + this.nrows = mv.getValue().getNumRows(); + this.ncols = mv.getValue().getNumColumns(); + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 08f00f86d2f..59fdfa8079f 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -19,6 +19,7 @@ package org.apache.sysds.runtime.instructions.ooc; +import org.apache.sysds.hops.UnaryOp; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.instructions.InstructionUtils; @@ -36,6 +37,12 @@ protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand in1, CPO _uop = op; } + protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr) { + super(type, op, in1, in2, out, opcode, istr); + + _uop = op; + } + public static UnaryOOCInstruction parseInstruction(String str) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); InstructionUtils.checkNumFields(parts, 2); diff --git a/src/main/java/org/apache/sysds/runtime/util/IndexRange.java b/src/main/java/org/apache/sysds/runtime/util/IndexRange.java index 4a8d9991478..44fa8320e3a 100644 --- a/src/main/java/org/apache/sysds/runtime/util/IndexRange.java +++ b/src/main/java/org/apache/sysds/runtime/util/IndexRange.java @@ -30,6 +30,10 @@ public class IndexRange implements Serializable public long rowEnd = 0; public long colStart = 0; public long colEnd = 0; + + public static IndexRange intersect(IndexRange a, IndexRange b) { + return new IndexRange(Math.max(a.rowStart, b.rowStart), Math.min(a.rowEnd, b.rowEnd), Math.max(a.colStart, b.colStart), Math.min(a.colEnd, b.colEnd)); + } public IndexRange(long rs, long re, long cs, long ce) { set(rs, re, cs, ce); @@ -52,6 +56,10 @@ public IndexRange add(int delta) { colStart + delta, colEnd + delta); } + public IndexRange add(long rowDelta, long colDelta) { + return new IndexRange(rowStart + rowDelta, rowEnd + rowDelta, colStart + colDelta, colEnd + colDelta); + } + public boolean inColRange(long col) { return col >= colStart && col < colEnd; } @@ -68,6 +76,10 @@ public long rowSpan() { return rowEnd - rowStart; } + public boolean isWithin(long row, long col) { + return inColRange(col) && inRowRange(row); + } + @Override public String toString() { return "["+rowStart+":"+rowEnd+","+colStart+":"+colEnd+"]"; diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java new file mode 100644 index 00000000000..0dab096e0c2 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.instructions.Instruction; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class RightIndexingCoordsTest extends AutomatedTestBase { + private final static String TEST_NAME1 = "RightIndexingCoords"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + RightIndexingCoordsTest.class.getSimpleName() + "/"; + private final static double eps = 1e-8; + private static final String INPUT_NAME = "X"; + private static final String OUTPUT_NAME = "res"; + + private final static int nrows = 2300; + private final static int ncols = 1200; + private final static int maxVal = 7; + private final static double sparsity1 = 1; + private final static double sparsity2 = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); + addTestConfiguration(TEST_NAME1, config); + } + + @Test + public void testRightIndexingDense1() { + runRightIndexingTest(1, 1, false); + } + + @Test + public void testRightIndexingSparse1() { + runRightIndexingTest(1, 1, true); + } + + @Test + public void testRightIndexingDense2() { + runRightIndexingTest(1000, 1, false); + } + + @Test + public void testRightIndexingSparse2() { + runRightIndexingTest(1000, 1, true); + } + + private void runRightIndexingTest(int rs, int cs, boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME1); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME), "" + rs, "" + cs, output(OUTPUT_NAME)}; + + // 1. Generate the data in-memory as MatrixBlock objects + double[][] X_data = getRandomMatrix(nrows, ncols, 1, maxVal, sparse ? sparsity2 : sparsity1, 7); + + // 2. Convert the double arrays to MatrixBlock objects + MatrixBlock X_mb = DataConverter.convertToMatrixBlock(X_data); + + // 3. Create a binary matrix writer + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + + // 4. Write matrix A to a binary SequenceFile + writer.writeMatrixToHDFS(X_mb, input(INPUT_NAME), nrows, ncols, 1000, X_mb.getNonZeros()); + HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(nrows, ncols, 1000, X_mb.getNonZeros()), Types.FileFormat.BINARY); + + runTest(true, false, null, -1); + + //check tsmm OOC + Assert.assertTrue("OOC wasn't used for multiplication", + heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.RIGHT_INDEX)); + + //compare results + + // rerun without ooc flag + programArgs = new String[] {"-explain", "-stats", "-args", input(INPUT_NAME), "" + rs, "" + cs, output(OUTPUT_NAME + "_target")}; + runTest(true, false, null, -1); + + // compare matrices + + MatrixBlock ret1 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME), + Types.FileFormat.BINARY, 1, 1, 1000); + MatrixBlock ret2 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"), + Types.FileFormat.BINARY, 1, 1, 1000); + + //System.out.println(ret1.getNumRows() + "x" + ret1.getNumColumns() + " <=> " + ret2.getNumRows() + "x" + ret2.getNumColumns()); + TestUtils.compareMatrices(ret2, ret1, eps); + } + catch(IOException e) { + throw new RuntimeException(e); + } + finally { + resetExecMode(platformOld); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java new file mode 100644 index 00000000000..5bd061c6465 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.instructions.Instruction; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class RightIndexingTest extends AutomatedTestBase { + private final static String TEST_NAME1 = "RightIndexing"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + RightIndexingTest.class.getSimpleName() + "/"; + private final static double eps = 1e-8; + private static final String INPUT_NAME = "X"; + private static final String OUTPUT_NAME = "res"; + + private final static int maxVal = 7; + private final static double sparsity1 = 1; + private final static double sparsity2 = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); + addTestConfiguration(TEST_NAME1, config); + } + + @Test + public void testRightIndexingDense1() { + runRightIndexingTest(2, 2002, 100, 1150, 2100, 1200, false); + } + + @Test + public void testRightIndexingSparse1() { + runRightIndexingTest(2, 2002, 100, 1150, 2100, 1200, true); + } + + @Test + public void testRightIndexingAlignedDense() { + runRightIndexingTest(1, 2002, 1, 1150, 2100, 1200, false); + } + + @Test + public void testRightIndexingAlignedSparse() { + runRightIndexingTest(1, 2002, 1, 1150, 2100, 1200, true); + } + + @Test + public void testRightIndexingRowAlignedDense() { + runRightIndexingTest(1, 2002, 100, 1150, 2100, 1200, false); + } + + @Test + public void testRightIndexingRowAlignedSparse() { + runRightIndexingTest(1, 2002, 100, 1150, 2100, 1200, true); + } + + @Test + public void testRightIndexingSmallDense1() { + runRightIndexingTest(1, 700, 150, 1020, 3000, 3000, false); + } + + @Test + public void testRightIndexingSmallSparse1() { + runRightIndexingTest(1, 700, 150, 1020, 3000, 3000, true); + } + + @Test + public void testRightIndexingSmallDense2() { + runRightIndexingTest(150, 1020, 1, 700, 3000, 3000, false); + } + + @Test + public void testRightIndexingSmallSparse2() { + runRightIndexingTest(150, 1020, 1, 700, 3000, 3000, true); + } + + @Test + public void testRightIndexingSingleElementDense() { + runRightIndexingTest(1111, 1111, 2222, 2222, 3000, 3000, false); + } + + @Test + public void testRightIndexingSingleElementSparse() { + runRightIndexingTest(1111, 1111, 2222, 2222, 3000, 3000, true); + } + + @Test + public void testRightIndexingCrossBlockBothDense() { + runRightIndexingTest(950, 1050, 995, 1005, 3000, 3000, false); + } + + @Test + public void testRightIndexingCrossBlockBothSparse() { + runRightIndexingTest(950, 1050, 995, 1005, 3000, 3000, true); + } + + @Test + public void testRightIndexingSingleRowMultiBlockDense() { + runRightIndexingTest(1001, 1001, 800, 1205, 3000, 3000, false); + } + + @Test + public void testRightIndexingSingleRowMultiBlockSparse() { + runRightIndexingTest(1001, 1001, 800, 1205, 3000, 3000, true); + } + + @Test + public void testRightIndexingSingleColumnMultiBlockDense() { + runRightIndexingTest(800, 1205, 1001, 1001, 3000, 3000, false); + } + + @Test + public void testRightIndexingSingleColumnMultiBlockSparse() { + runRightIndexingTest(800, 1205, 1001, 1001, 3000, 3000, true); + } + + @Test + public void testRightIndexingTrailingBlocksDense() { + runRightIndexingTest(2501, 3000, 1500, 2100, 3000, 3000, false); + } + + @Test + public void testRightIndexingTrailingBlocksSparse() { + runRightIndexingTest(2501, 3000, 1500, 2100, 3000, 3000, true); + } + + private void runRightIndexingTest(int rs, int re, int cs, int ce, int nrows, int ncols, boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME1); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME), "" + rs, "" + re, "" + cs, "" + ce, output(OUTPUT_NAME)}; + + // 1. Generate the data in-memory as MatrixBlock objects + double[][] X_data = getRandomMatrix(nrows, ncols, 1, maxVal, sparse ? sparsity2 : sparsity1, 7); + + // 2. Convert the double arrays to MatrixBlock objects + MatrixBlock X_mb = DataConverter.convertToMatrixBlock(X_data); + + // 3. Create a binary matrix writer + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + + // 4. Write matrix A to a binary SequenceFile + writer.writeMatrixToHDFS(X_mb, input(INPUT_NAME), nrows, ncols, 1000, X_mb.getNonZeros()); + HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(nrows, ncols, 1000, X_mb.getNonZeros()), Types.FileFormat.BINARY); + + runTest(true, false, null, -1); + + //check tsmm OOC + Assert.assertTrue("OOC wasn't used for multiplication", + heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.RIGHT_INDEX)); + + //compare results + + // rerun without ooc flag + programArgs = new String[] {"-explain", "-stats", "-args", input(INPUT_NAME), "" + rs, "" + re, "" + cs, "" + ce, output(OUTPUT_NAME + "_target")}; + runTest(true, false, null, -1); + + // compare matrices + int outNRows = re-rs+1; + int outNCols = ce-cs+1; + + MatrixBlock ret1 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME), + Types.FileFormat.BINARY, outNRows, outNCols, 1000); + MatrixBlock ret2 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"), + Types.FileFormat.BINARY, outNRows, outNCols, 1000); + + //System.out.println(ret1.getNumRows() + "x" + ret1.getNumColumns() + " <=> " + ret2.getNumRows() + "x" + ret2.getNumColumns()); + /*System.out.println(ret1.slice(998, 1000, 901, 910)); + System.out.println(ret2.slice(998, 1000, 901, 910));*/ + TestUtils.compareMatrices(ret2, ret1, eps); + } + catch(IOException e) { + throw new RuntimeException(e); + } + finally { + resetExecMode(platformOld); + } + } +} diff --git a/src/test/scripts/functions/ooc/RightIndexing.dml b/src/test/scripts/functions/ooc/RightIndexing.dml new file mode 100644 index 00000000000..9abb5c6d93c --- /dev/null +++ b/src/test/scripts/functions/ooc/RightIndexing.dml @@ -0,0 +1,31 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read the input matrix as a stream +X = read($1); +rl = $2; +ru = $3; +cl = $4; +cu = $5; + +res = X[rl:ru, cl:cu]; + +write(res, $6, format="binary"); diff --git a/src/test/scripts/functions/ooc/RightIndexingCoords.dml b/src/test/scripts/functions/ooc/RightIndexingCoords.dml new file mode 100644 index 00000000000..61a2ddc747b --- /dev/null +++ b/src/test/scripts/functions/ooc/RightIndexingCoords.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read the input matrix as a stream +X = read($1); +rs = $2; +cs = $3; + +res = X[rs, cs]; + +write(res, $4, format="binary"); From 007f60d1fbb0d97a74b089ba0b941a22e0a5140c Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Sun, 9 Nov 2025 16:28:17 +0100 Subject: [PATCH 2/5] Cleanup and improved caching --- .../ooc/IndexingOOCInstruction.java | 82 ++++------ .../ooc/MatrixIndexingOOCInstruction.java | 142 ++++++++++-------- 2 files changed, 106 insertions(+), 118 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java index 354abebef74..3736b5a703c 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java @@ -28,6 +28,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.IndexRange; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; public abstract class IndexingOOCInstruction extends UnaryOOCInstruction { @@ -82,15 +83,13 @@ protected IndexRange getIndexRange(ExecutionContext ec) { } public static class BlockAligner { - private static final boolean DEBUG = false; - private final int _blocksize; private final IndexRange indexRange; private final IndexRange blockRange; private final int _outRows; private final int _outCols; private final Sector[] _blocks; - private int _emitCtr; + private final AtomicInteger _emitCtr; @SuppressWarnings("unchecked") public BlockAligner(IndexRange range, int blocksize) { @@ -109,13 +108,7 @@ public BlockAligner(IndexRange range, int blocksize) { _outCols = (int) ((totalCols + blocksize - 1) / blocksize); _blocks = (Sector[]) new Sector[_outRows * _outCols]; - _emitCtr = 0; - - if(DEBUG) { - System.out.println("BlockAligner: range=" + range + ", blocksize=" + blocksize); - System.out.println("BlockRange: " + blockRange); - System.out.println("Output grid: " + _outRows + "x" + _outCols); - } + _emitCtr = new AtomicInteger(0); } public boolean isAligned() { @@ -136,19 +129,16 @@ public boolean putNext(MatrixIndexes index, T data, BiConsumer overlapRowEnd) - return false; - long overlapColStart = Math.max(indexRange.colStart, blockColStart); long overlapColEnd = Math.min(indexRange.colEnd, blockColEnd); - if(overlapColStart > overlapColEnd) - return false; int outRowStart = (int) ((overlapRowStart - indexRange.rowStart) / _blocksize); int outRowEnd = (int) ((overlapRowEnd - indexRange.rowStart) / _blocksize); int outColStart = (int) ((overlapColStart - indexRange.colStart) / _blocksize); int outColEnd = (int) ((overlapColEnd - indexRange.colStart) / _blocksize); + int emitCtr = -1; + for(int outRow = outRowStart; outRow <= outRowEnd; outRow++) { long targetRowStartGlobal = indexRange.rowStart + (long) outRow * _blocksize; long targetRowEndGlobal = Math.min(indexRange.rowEnd, targetRowStartGlobal + _blocksize - 1); @@ -170,17 +160,17 @@ public boolean putNext(MatrixIndexes index, T data, BiConsumer= _blocks.length; + return emitCtr >= _blocks.length; } private int resolveIndex(int row, int col) { @@ -211,45 +201,33 @@ else if(rowSegments == 2 && colSegments == 1) } public synchronized void close() { - if(_emitCtr != _blocks.length) + if(_emitCtr.get() != _blocks.length) throw new DMLRuntimeException("BlockAligner still has some unfinished sectors"); } } - public static final class BlockComponent { - public final long blockRow; - public final long blockCol; - public final T data; - - public BlockComponent(long blockRow, long blockCol, T data) { - this.blockRow = blockRow; - this.blockCol = blockCol; - this.data = data; - } - } - public static abstract class Sector { - public abstract boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data); + public abstract boolean place(int rowOffset, int colOffset, T data); - public abstract BlockComponent get(int rowOffset, int colOffset); + public abstract T get(int rowOffset, int colOffset); public abstract int count(); } public static class Sector1 extends Sector { - private BlockComponent _data; + private T _data; @Override - public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + public synchronized boolean place(int rowOffset, int colOffset, T data) { if(rowOffset != 0 || colOffset != 0) return false; - _data = new BlockComponent<>(blockRow, blockCol, data); + _data = data; return true; } @Override - public synchronized BlockComponent get(int rowOffset, int colOffset) { + public synchronized T get(int rowOffset, int colOffset) { return (rowOffset == 0 && colOffset == 0) ? _data : null; } @@ -261,26 +239,26 @@ public synchronized int count() { public static class Sector4 extends Sector { private int _count; - private final BlockComponent[] _data; + private final T[] _data; @SuppressWarnings("unchecked") public Sector4() { _count = 0; - _data = (BlockComponent[]) new BlockComponent[4]; + _data = (T[]) new Object[4]; } @Override - public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + public synchronized boolean place(int rowOffset, int colOffset, T data) { int pos = rowOffset * 2 + colOffset; if(_data[pos] == null) { - _data[pos] = new BlockComponent<>(blockRow, blockCol, data); + _data[pos] = data; _count++; } return _count == 4; } @Override - public synchronized BlockComponent get(int rowOffset, int colOffset) { + public synchronized T get(int rowOffset, int colOffset) { return _data[rowOffset * 2 + colOffset]; } @@ -292,21 +270,21 @@ public synchronized int count() { public static class Sector2Col extends Sector { private int _count; - private final BlockComponent[] _data; + private final T[] _data; @SuppressWarnings("unchecked") public Sector2Col() { _count = 0; - _data = (BlockComponent[]) new BlockComponent[2]; + _data = (T[]) new Object[2]; } @Override - public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + public synchronized boolean place(int rowOffset, int colOffset, T data) { if(rowOffset != 0 || colOffset < 0 || colOffset > 1) return false; if(_data[colOffset] == null) { - _data[colOffset] = new BlockComponent<>(blockRow, blockCol, data); + _data[colOffset] = data; _count++; } @@ -314,7 +292,7 @@ public synchronized boolean place(int rowOffset, int colOffset, long blockRow, l } @Override - public synchronized BlockComponent get(int rowOffset, int colOffset) { + public synchronized T get(int rowOffset, int colOffset) { return (rowOffset == 0 && colOffset >= 0 && colOffset < 2) ? _data[colOffset] : null; } @@ -326,21 +304,21 @@ public synchronized int count() { public static class Sector2Row extends Sector { private int _count; - private final BlockComponent[] _data; + private final T[] _data; @SuppressWarnings("unchecked") public Sector2Row() { _count = 0; - _data = (BlockComponent[]) new BlockComponent[2]; + _data = (T[]) new Object[2]; } @Override - public synchronized boolean place(int rowOffset, int colOffset, long blockRow, long blockCol, T data) { + public synchronized boolean place(int rowOffset, int colOffset, T data) { if(colOffset != 0 || rowOffset < 0 || rowOffset > 1) return false; if(_data[rowOffset] == null) { - _data[rowOffset] = new BlockComponent<>(blockRow, blockCol, data); + _data[rowOffset] = data; _count++; } @@ -348,7 +326,7 @@ public synchronized boolean place(int rowOffset, int colOffset, long blockRow, l } @Override - public synchronized BlockComponent get(int rowOffset, int colOffset) { + public synchronized T get(int rowOffset, int colOffset) { return (colOffset == 0 && rowOffset >= 0 && rowOffset < 2) ? _data[rowOffset] : null; } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java index ba5bebc73a7..598c0cc697e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java @@ -43,8 +43,8 @@ public MatrixIndexingOOCInstruction(CPOperand in, CPOperand rl, CPOperand ru, CP super(in, rl, ru, cl, cu, out, opcode, istr); } - protected MatrixIndexingOOCInstruction(CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, - CPOperand ru, CPOperand cl, CPOperand cu, CPOperand out, String opcode, String istr) { + protected MatrixIndexingOOCInstruction(CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, CPOperand ru, + CPOperand cl, CPOperand cu, CPOperand out, String opcode, String istr) { super(lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr); } @@ -68,17 +68,19 @@ public void processInstruction(ExecutionContext ec) { addInStream(qIn); addOutStream(qOut); - ec.getMatrixObject(output).setStreamHandle(qOut); + MatrixObject mOut = ec.getMatrixObject(output); + mOut.setStreamHandle(qOut); //right indexing - if( opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString()) ) - { - if (output.isScalar() && inRange) { + if(opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString())) { + if(output.isScalar() && inRange) { IndexedMatrixValue tmp; - while ((tmp = qIn.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { - if (tmp.getIndexes().getRowIndex() == firstBlockRow && tmp.getIndexes().getColumnIndex() == firstBlockCol) { - ec.setScalarOutput(output.getName(), new DoubleObject(tmp.getValue().get((int)ix.rowStart % blocksize, (int)ix.rowEnd % blocksize))); + while((tmp = qIn.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { + if(tmp.getIndexes().getRowIndex() == firstBlockRow && + tmp.getIndexes().getColumnIndex() == firstBlockCol) { + ec.setScalarOutput(output.getName(), new DoubleObject( + tmp.getValue().get((int) ix.rowStart % blocksize, (int) ix.rowEnd % blocksize))); return; } } @@ -86,13 +88,12 @@ public void processInstruction(ExecutionContext ec) { throw new DMLRuntimeException("Desired block not found"); } - final AtomicReference> futureRef = new AtomicReference<>(); - if (ix.rowStart % blocksize == 0 && ix.colStart % blocksize == 0) { + if(ix.rowStart % blocksize == 0 && ix.colStart % blocksize == 0) { // Aligned case: interior blocks can be forwarded directly, borders may require slicing - final int outBlockRows = (int)Math.ceil((double)(ix.rowSpan() + 1) / blocksize); - final int outBlockCols = (int)Math.ceil((double)(ix.colSpan() + 1) / blocksize); + final int outBlockRows = (int) Math.ceil((double) (ix.rowSpan() + 1) / blocksize); + final int outBlockCols = (int) Math.ceil((double) (ix.colSpan() + 1) / blocksize); final int totalBlocks = outBlockRows * outBlockCols; final AtomicInteger producedBlocks = new AtomicInteger(0); @@ -101,23 +102,22 @@ public void processInstruction(ExecutionContext ec) { long blockRow = inIdx.getRowIndex() - 1; long blockCol = inIdx.getColumnIndex() - 1; - if (blockRow < firstBlockRow || blockRow > lastBlockRow || blockCol < firstBlockCol || blockCol > lastBlockCol) + if(blockRow < firstBlockRow || blockRow > lastBlockRow || blockCol < firstBlockCol || + blockCol > lastBlockCol) return; MatrixBlock block = (MatrixBlock) tmp.getValue(); - int rowStartLocal = (blockRow == firstBlockRow) ? (int)(ix.rowStart % blocksize) : 0; - int rowEndLocal = (blockRow == lastBlockRow) - ? Math.min(block.getNumRows() - 1, (int)(ix.rowEnd % blocksize)) - : block.getNumRows() - 1; - int colStartLocal = (blockCol == firstBlockCol) ? (int)(ix.colStart % blocksize) : 0; - int colEndLocal = (blockCol == lastBlockCol) - ? Math.min(block.getNumColumns() - 1, (int)(ix.colEnd % blocksize)) - : block.getNumColumns() - 1; + int rowStartLocal = (blockRow == firstBlockRow) ? (int) (ix.rowStart % blocksize) : 0; + int rowEndLocal = (blockRow == lastBlockRow) ? Math.min(block.getNumRows() - 1, + (int) (ix.rowEnd % blocksize)) : block.getNumRows() - 1; + int colStartLocal = (blockCol == firstBlockCol) ? (int) (ix.colStart % blocksize) : 0; + int colEndLocal = (blockCol == lastBlockCol) ? Math.min(block.getNumColumns() - 1, + (int) (ix.colEnd % blocksize)) : block.getNumColumns() - 1; MatrixBlock outBlock; - if (rowStartLocal == 0 && rowEndLocal == block.getNumRows() - 1 && - colStartLocal == 0 && colEndLocal == block.getNumColumns() - 1) { + if(rowStartLocal == 0 && rowEndLocal == block.getNumRows() - 1 && colStartLocal == 0 && + colEndLocal == block.getNumColumns() - 1) { outBlock = block; } else { @@ -128,16 +128,16 @@ public void processInstruction(ExecutionContext ec) { long outBlockCol = blockCol - firstBlockCol + 1; qOut.enqueue(new IndexedMatrixValue(new MatrixIndexes(outBlockRow, outBlockCol), outBlock)); - if (producedBlocks.incrementAndGet() >= totalBlocks) { + if(producedBlocks.incrementAndGet() >= totalBlocks) { CompletableFuture f = futureRef.get(); - if (f != null) + if(f != null) f.cancel(true); } }, tmp -> { long blockRow = tmp.getIndexes().getRowIndex() - 1; long blockCol = tmp.getIndexes().getColumnIndex() - 1; - return blockRow >= firstBlockRow && blockRow <= lastBlockRow - && blockCol >= firstBlockCol && blockCol <= lastBlockCol; + return blockRow >= firstBlockRow && blockRow <= lastBlockRow && blockCol >= firstBlockCol && + blockCol <= lastBlockCol; }, qOut::closeInput); futureRef.set(future); return; @@ -145,80 +145,90 @@ public void processInstruction(ExecutionContext ec) { final BlockAligner aligner = new BlockAligner<>(ix, blocksize); - // We need to construct our own stream to properly manage the cached items - final CachingStream cachedStream = qIn.hasStreamCache() ? qIn.getStreamCache() : new CachingStream(qIn); + // We may need to construct our own intermediate stream to properly manage the cached items + boolean hasIntermediateStream = !qIn.hasStreamCache(); + final CachingStream cachedStream = hasIntermediateStream ? new CachingStream(new SubscribableTaskQueue<>()) : qOut.getStreamCache(); cachedStream.activateIndexing(); - CompletableFuture future = filterOOC(cachedStream.getReadStream(), tmp -> { + CompletableFuture future = filterOOC(qIn.getReadStream(), tmp -> { + if (hasIntermediateStream) { + // We write to an intermediate stream to ensure that these matrix blocks are properly cached + cachedStream.getWriteStream().enqueue(tmp); + } + boolean completed = aligner.putNext(tmp.getIndexes(), new IndexedBlockMeta(tmp), (idx, sector) -> { - int targetBlockRow = (int)(idx.getRowIndex() - 1); - int targetBlockCol = (int)(idx.getColumnIndex() - 1); + int targetBlockRow = (int) (idx.getRowIndex() - 1); + int targetBlockCol = (int) (idx.getColumnIndex() - 1); - long targetRowStartGlobal = ix.rowStart + (long)targetBlockRow * blocksize; + long targetRowStartGlobal = ix.rowStart + (long) targetBlockRow * blocksize; long targetRowEndGlobal = Math.min(ix.rowEnd, targetRowStartGlobal + blocksize - 1); - long targetColStartGlobal = ix.colStart + (long)targetBlockCol * blocksize; + long targetColStartGlobal = ix.colStart + (long) targetBlockCol * blocksize; long targetColEndGlobal = Math.min(ix.colEnd, targetColStartGlobal + blocksize - 1); - int nRows = (int)(targetRowEndGlobal - targetRowStartGlobal + 1); - int nCols = (int)(targetColEndGlobal - targetColStartGlobal + 1); + int nRows = (int) (targetRowEndGlobal - targetRowStartGlobal + 1); + int nCols = (int) (targetColEndGlobal - targetColStartGlobal + 1); long firstSrcBlockRow = targetRowStartGlobal / blocksize; long lastSrcBlockRow = targetRowEndGlobal / blocksize; - int rowSegments = (int)(lastSrcBlockRow - firstSrcBlockRow + 1); + int rowSegments = (int) (lastSrcBlockRow - firstSrcBlockRow + 1); long firstSrcBlockCol = targetColStartGlobal / blocksize; long lastSrcBlockCol = targetColEndGlobal / blocksize; - int colSegments = (int)(lastSrcBlockCol - firstSrcBlockCol + 1); + int colSegments = (int) (lastSrcBlockCol - firstSrcBlockCol + 1); MatrixBlock target = null; - for (int r = 0; r < rowSegments; r++) { - for (int c = 0; c < colSegments; c++) { - BlockComponent component = sector.get(r, c); - if (component == null) + for(int r = 0; r < rowSegments; r++) { + for(int c = 0; c < colSegments; c++) { + IndexedBlockMeta ibm = sector.get(r, c); + if(ibm == null) continue; - IndexedMatrixValue mv = cachedStream.findCached(component.data.idx); - MatrixBlock srcBlock = (MatrixBlock)mv.getValue(); + IndexedMatrixValue mv = cachedStream.findCached(ibm.idx); + MatrixBlock srcBlock = (MatrixBlock) mv.getValue(); - if (target == null) + if(target == null) target = new MatrixBlock(nRows, nCols, srcBlock.isInSparseFormat()); - long srcBlockRowStart = component.blockRow * blocksize; - long srcBlockColStart = component.blockCol * blocksize; + long srcBlockRowStart = (ibm.idx.getRowIndex() - 1) * blocksize; + long srcBlockColStart = (ibm.idx.getColumnIndex() - 1) * blocksize; long sliceRowStartGlobal = Math.max(targetRowStartGlobal, srcBlockRowStart); - long sliceRowEndGlobal = Math.min(targetRowEndGlobal, srcBlockRowStart + srcBlock.getNumRows() - 1); + long sliceRowEndGlobal = Math.min(targetRowEndGlobal, + srcBlockRowStart + srcBlock.getNumRows() - 1); long sliceColStartGlobal = Math.max(targetColStartGlobal, srcBlockColStart); - long sliceColEndGlobal = Math.min(targetColEndGlobal, srcBlockColStart + srcBlock.getNumColumns() - 1); + long sliceColEndGlobal = Math.min(targetColEndGlobal, + srcBlockColStart + srcBlock.getNumColumns() - 1); - int sliceRowStart = (int)(sliceRowStartGlobal - srcBlockRowStart); - int sliceRowEnd = (int)(sliceRowEndGlobal - srcBlockRowStart); - int sliceColStart = (int)(sliceColStartGlobal - srcBlockColStart); - int sliceColEnd = (int)(sliceColEndGlobal - srcBlockColStart); + int sliceRowStart = (int) (sliceRowStartGlobal - srcBlockRowStart); + int sliceRowEnd = (int) (sliceRowEndGlobal - srcBlockRowStart); + int sliceColStart = (int) (sliceColStartGlobal - srcBlockColStart); + int sliceColEnd = (int) (sliceColEndGlobal - srcBlockColStart); - int targetRowOffset = (int)(sliceRowStartGlobal - targetRowStartGlobal); - int targetColOffset = (int)(sliceColStartGlobal - targetColStartGlobal); + int targetRowOffset = (int) (sliceRowStartGlobal - targetRowStartGlobal); + int targetColOffset = (int) (sliceColStartGlobal - targetColStartGlobal); MatrixBlock sliced = srcBlock.slice(sliceRowStart, sliceRowEnd, sliceColStart, sliceColEnd); sliced.putInto(target, targetRowOffset, targetColOffset, true); } } - if (target == null) - target = new MatrixBlock(nRows, nCols, false); - qOut.enqueue(new IndexedMatrixValue(idx, target)); }); - if (completed) { + + if(completed) { + // All blocks have been processed; we can cancel the future + // Currently, this does not affect processing (predicates prevent task submission anyway). + // However, a cancelled future may allow early file read aborts once implemented. CompletableFuture f = futureRef.get(); - if (f != null) + if(f != null) f.cancel(true); } }, tmp -> { + // Pre-filter incoming blocks to avoid unnecessary task submission long blockRow = tmp.getIndexes().getRowIndex() - 1; long blockCol = tmp.getIndexes().getColumnIndex() - 1; - return blockRow >= firstBlockRow && blockRow <= lastBlockRow - && blockCol >= firstBlockCol && blockCol <= lastBlockCol; + return blockRow >= firstBlockRow && blockRow <= lastBlockRow && blockCol >= firstBlockCol && + blockCol <= lastBlockCol; }, () -> { aligner.close(); qOut.closeInput(); @@ -226,12 +236,12 @@ public void processInstruction(ExecutionContext ec) { futureRef.set(future); } //left indexing - else if ( opcode.equalsIgnoreCase(Opcodes.LEFT_INDEX.toString())) - { + else if(opcode.equalsIgnoreCase(Opcodes.LEFT_INDEX.toString())) { throw new NotImplementedException(); } else - throw new DMLRuntimeException("Invalid opcode (" + opcode +") encountered in MatrixIndexingOOCInstruction."); + throw new DMLRuntimeException( + "Invalid opcode (" + opcode + ") encountered in MatrixIndexingOOCInstruction."); } private static class IndexedBlockMeta { From 270dec13eea0bf8de035e1e46109ad8ae1cc1233 Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Mon, 10 Nov 2025 09:29:12 +0100 Subject: [PATCH 3/5] Bugfix for stream indexing --- .../instructions/ooc/CachingStream.java | 45 ++++++++++--------- .../ooc/SubscribableTaskQueue.java | 2 +- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java index 1a540302806..b74f7ed5e13 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java @@ -80,36 +80,37 @@ public CachingStream(OOCStream source, long streamId) { }); } - private boolean fetchFromStream() throws InterruptedException { - synchronized (this) { - if(!_cacheInProgress) - throw new DMLRuntimeException("Stream is closed"); - } + private synchronized boolean fetchFromStream() throws InterruptedException { + if(!_cacheInProgress) + throw new DMLRuntimeException("Stream is closed"); IndexedMatrixValue task = _source.dequeue(); - synchronized (this) { - if(task != LocalTaskQueue.NO_MORE_TASKS) { - OOCEvictionManager.put(_streamId, _numBlocks, task); - if (_index != null) - _index.put(task.getIndexes(), _numBlocks); - _numBlocks++; - notifyAll(); - return false; - } - else { - _cacheInProgress = false; // caching is complete - notifyAll(); - return true; - } + if(task != LocalTaskQueue.NO_MORE_TASKS) { + OOCEvictionManager.put(_streamId, _numBlocks, task); + if (_index != null) + _index.put(task.getIndexes(), _numBlocks); + _numBlocks++; + notifyAll(); + return false; + } + else { + _cacheInProgress = false; // caching is complete + notifyAll(); + return true; } } public synchronized IndexedMatrixValue get(int idx) throws InterruptedException { while (true) { - if (idx < _numBlocks) - return OOCEvictionManager.get(_streamId, idx); - else if (!_cacheInProgress) + if (idx < _numBlocks) { + IndexedMatrixValue out = OOCEvictionManager.get(_streamId, idx); + + if (_index != null) // Ensure index is up to date + _index.putIfAbsent(out.getIndexes(), idx); + + return out; + } else if (!_cacheInProgress) return (IndexedMatrixValue)LocalTaskQueue.NO_MORE_TASKS; wait(); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java index 5f97bd99e9e..f136ffc2bb6 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java @@ -26,7 +26,7 @@ public class SubscribableTaskQueue extends LocalTaskQueue implements OOCSt private Runnable _subscriber; @Override - public void enqueue(T t) { + public synchronized void enqueue(T t) { try { super.enqueueTask(t); } From 4405bba512706547499c2d3db4f95307082c7e85 Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Mon, 10 Nov 2025 10:27:28 +0100 Subject: [PATCH 4/5] Variable Naming Consistency --- .../ooc/IndexingOOCInstruction.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java index 3736b5a703c..1d555da8d6c 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java @@ -84,8 +84,8 @@ protected IndexRange getIndexRange(ExecutionContext ec) { public static class BlockAligner { private final int _blocksize; - private final IndexRange indexRange; - private final IndexRange blockRange; + private final IndexRange _indexRange; + private final IndexRange _blockRange; private final int _outRows; private final int _outCols; private final Sector[] _blocks; @@ -93,14 +93,14 @@ public static class BlockAligner { @SuppressWarnings("unchecked") public BlockAligner(IndexRange range, int blocksize) { - indexRange = range; + _indexRange = range; _blocksize = blocksize; long firstBlockRow = range.rowStart / blocksize; long lastBlockRow = range.rowEnd / blocksize; long firstBlockCol = range.colStart / blocksize; long lastBlockCol = range.colEnd / blocksize; - blockRange = new IndexRange(firstBlockRow, lastBlockRow + 1, firstBlockCol, lastBlockCol + 1); + _blockRange = new IndexRange(firstBlockRow, lastBlockRow + 1, firstBlockCol, lastBlockCol + 1); long totalRows = range.rowSpan() + 1; long totalCols = range.colSpan() + 1; @@ -112,14 +112,14 @@ public BlockAligner(IndexRange range, int blocksize) { } public boolean isAligned() { - return (indexRange.rowStart % _blocksize) == 0 && (indexRange.colStart % _blocksize) == 0; + return (_indexRange.rowStart % _blocksize) == 0 && (_indexRange.colStart % _blocksize) == 0; } public boolean putNext(MatrixIndexes index, T data, BiConsumer> emitter) { long blockRow = index.getRowIndex() - 1; long blockCol = index.getColumnIndex() - 1; - if(!blockRange.isWithin(blockRow, blockCol)) + if(!_blockRange.isWithin(blockRow, blockCol)) return false; long blockRowStart = blockRow * _blocksize; @@ -127,28 +127,28 @@ public boolean putNext(MatrixIndexes index, T data, BiConsumer Date: Mon, 10 Nov 2025 10:36:43 +0100 Subject: [PATCH 5/5] Remove Unnecessary Check --- .../instructions/ooc/MatrixIndexingOOCInstruction.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java index 598c0cc697e..28f10dd8317 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java @@ -102,10 +102,6 @@ public void processInstruction(ExecutionContext ec) { long blockRow = inIdx.getRowIndex() - 1; long blockCol = inIdx.getColumnIndex() - 1; - if(blockRow < firstBlockRow || blockRow > lastBlockRow || blockCol < firstBlockCol || - blockCol > lastBlockCol) - return; - MatrixBlock block = (MatrixBlock) tmp.getValue(); int rowStartLocal = (blockRow == firstBlockRow) ? (int) (ix.rowStart % blocksize) : 0;