diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
index 8c8a64b0225..c87b3c99cf2 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
@@ -36,10 +36,8 @@
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
-import org.apache.sysds.runtime.util.CommonThreadPool;
import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
public class AggregateUnaryOOCInstruction extends ComputationOOCInstruction {
private AggregateOperator _aop = null;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
index 82ad12ae554..1dfc99be811 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
@@ -19,8 +19,6 @@
package org.apache.sysds.runtime.instructions.ooc;
-import java.util.concurrent.ExecutorService;
-
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -33,7 +31,6 @@
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
public class BinaryOOCInstruction extends ComputationOOCInstruction {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
index c1d1ed6ace7..aa215e83e90 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
@@ -20,7 +20,6 @@
package org.apache.sysds.runtime.instructions.ooc;
import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
import org.apache.sysds.common.Opcodes;
import org.apache.sysds.conf.ConfigurationManager;
@@ -39,7 +38,6 @@
import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
public class MatrixVectorBinaryOOCInstruction extends ComputationOOCInstruction {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
new file mode 100644
index 00000000000..747167d5102
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Eviction Manager for the Out-Of-Core stream cache
+ * This is the base implementation for LRU, FIFO
+ *
+ * Design choice 1: Pure JVM-memory cache
+ * What: Store MatrixBlock objects in a synchronized in-memory cache
+ * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock
+ * only when evicting.
+ * Pros: Simple to implement; no off-heap management; easy to debug;
+ * no serialization race since you serialize only when evicting;
+ * fast cache hits (direct object access).
+ * Cons: Heap usage counted roughly via serialized-size estimate — actual
+ * JVM object overhead not accounted; risk of GC pressure and OOM if
+ * estimates are off or if many small objects cause fragmentation;
+ * eviction may be more expensive (serialize on eviction).
+ *
+ * Design choice 2:
+ *
+ * This manager runtime memory management by caching serialized
+ * ByteBuffers and spilling them to disk when needed.
+ *
+ * * core function: Caches ByteBuffers (off-heap/direct) and
+ * spills them to disk
+ * * Eviction: Evicts a ByteBuffer by writing its contents to a file
+ * * Granularity: Evicts one IndexedMatrixValue block at a time
+ * * Data replay: get() will always return the data either from memory or
+ * by falling back to the disk
+ * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk,
+ * there won't be OOM.
+ *
+ * Pros: Avoids heap OOM by keeping large data off-heap; predictable
+ * memory usage; good for very large blocks.
+ * Cons: More complex synchronization; need robust off-heap allocator/free;
+ * must ensure serialization finishes before adding to queue or make evict
+ * wait on serialization; careful with native memory leaks.
+ */
+public class OOCEvictionManager {
+
+ // Configuration: OOC buffer limit as percentage of heap
+ private static final double OOC_BUFFER_PERCENTAGE = 0.00015; // 15% of heap
+
+ // Memory limit for ByteBuffers
+ private static long _limit;
+ private static long _size;
+
+ // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block)
+ private static LinkedHashMap _cache = new LinkedHashMap<>();
+
+ // Spill directory for evicted blocks
+ private static String _spillDir;
+
+ public enum RPolicy {
+ FIFO, LRU
+ }
+ private static RPolicy _policy = RPolicy.FIFO;
+
+ static {
+ _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap
+ _size = 0;
+ _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream");
+ LocalFileUtils.createLocalFileIfNotExist(_spillDir);
+ }
+
+ /**
+ * Store a block in the OOC cache (serialize once)
+ */
+ public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) {
+ MatrixBlock mb = (MatrixBlock) value.getValue();
+ long size = estimateSerializedSize(mb);
+ String key = streamId + "_" + blockId;
+
+ IndexedMatrixValue old = _cache.remove(key); // remove old value
+ if (old != null) {
+ _size -= estimateSerializedSize((MatrixBlock) old.getValue());
+ }
+
+ //make room if needed
+ evict(size);
+
+ _cache.put(key, value); // put new value last
+ _size += size;
+ }
+
+ /**
+ * Get a block from the OOC cache (deserialize on read)
+ */
+ public static synchronized IndexedMatrixValue get(long streamId, int blockId) {
+ String key = streamId + "_" + blockId;
+ IndexedMatrixValue imv = _cache.get(key);
+
+ if (imv != null && _policy == RPolicy.LRU) {
+ _cache.remove(key);
+ _cache.put(key, imv); //add last semantic
+ }
+
+ //restore if needed
+ return (imv.getValue() != null) ? imv :
+ loadFromDisk(streamId, blockId);
+ }
+
+ /**
+ * Evict ByteBuffers to disk
+ */
+ private static void evict(long requiredSize) {
+ try {
+ int pos = 0;
+ while(_size + requiredSize > _limit && pos++ < _cache.size()) {
+ //System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size());
+ Map.Entry tmp = removeFirstFromCache();
+ if( tmp == null || tmp.getValue().getValue() == null ) {
+ if( tmp != null )
+ _cache.put(tmp.getKey(), tmp.getValue());
+ continue;
+ }
+
+ // Spill to disk
+ String filename = _spillDir + "/" + tmp.getKey();
+ File spillDirFile = new File(_spillDir);
+ if (!spillDirFile.exists()) {
+ spillDirFile.mkdirs();
+ }
+ LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().getValue());
+
+ // Evict from memory
+ long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().getValue());
+ tmp.getValue().setValue(null);
+ _cache.put(tmp.getKey(), tmp.getValue()); // add last semantic
+ _size -= freedSize;
+ }
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
+ /**
+ * Load block from spill file
+ */
+ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) {
+ String key = streamId + "_" + blockId;
+ String filename = _spillDir + "/" + key;
+
+ try {
+ // check if file exists
+ if (!LocalFileUtils.isExisting(filename)) {
+ throw new IOException("File " + filename + " does not exist");
+ }
+
+ // Read from disk and put into original indexed matrix value
+ MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename);
+ IndexedMatrixValue imv = _cache.get(key);
+ imv.setValue(mb);
+ return imv;
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
+ private static long estimateSerializedSize(MatrixBlock mb) {
+ return mb.getExactSerializedSize();
+ }
+
+ private static Map.Entry removeFirstFromCache() {
+ //move iterator to first entry
+ Iterator> iter = _cache.entrySet().iterator();
+ Map.Entry entry = iter.next();
+
+ //remove current iterator entry
+ iter.remove();
+
+ return entry;
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
index 06386c5d66c..3c78879b45d 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
@@ -19,8 +19,6 @@
package org.apache.sysds.runtime.instructions.ooc;
-import java.util.concurrent.ExecutorService;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -40,7 +38,6 @@
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
-import org.apache.sysds.runtime.util.CommonThreadPool;
public class ReblockOOCInstruction extends ComputationOOCInstruction {
private int blen;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java
index 038e1a8b983..6179811f7a7 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java
@@ -20,13 +20,15 @@
package org.apache.sysds.runtime.instructions.ooc;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
-import java.util.ArrayList;
/**
* A wrapper around LocalTaskQueue to consume the source stream and reset to
* consume again for other operators.
+ *
+ * Uses OOCEvictionManager for out-of-core caching.
*
*/
public class ResettableStream extends LocalTaskQueue {
@@ -34,16 +36,24 @@ public class ResettableStream extends LocalTaskQueue {
// original live stream
private final LocalTaskQueue _source;
- // in-memory cache to store stream for re-play
- private final ArrayList _cache;
+ private static final IDSequence _streamSeq = new IDSequence();
+ // stream identifier
+ private final long _streamId;
+
+ // block counter
+ private int _numBlocks = 0;
+
// state flags
private boolean _cacheInProgress = true; // caching in progress, in the first pass.
private int _replayPosition = 0; // slider position in the stream
public ResettableStream(LocalTaskQueue source) {
+ this(source, _streamSeq.getNextID());
+ }
+ public ResettableStream(LocalTaskQueue source, long streamId) {
_source = source;
- _cache = new ArrayList<>();
+ _streamId = streamId;
}
/**
@@ -51,7 +61,6 @@ public ResettableStream(LocalTaskQueue source) {
* For subsequent passes it reads from the memory.
*
* @return The next matrix value in the stream, or NO_MORE_TASKS
- * @throws InterruptedException
*/
@Override
public synchronized IndexedMatrixValue dequeueTask()
@@ -60,18 +69,23 @@ public synchronized IndexedMatrixValue dequeueTask()
// First pass: Read value from the source and cache it, and return.
IndexedMatrixValue task = _source.dequeueTask();
if (task != NO_MORE_TASKS) {
- _cache.add(new IndexedMatrixValue(task));
+
+ OOCEvictionManager.put(_streamId, _numBlocks, task);
+ _numBlocks++;
+
+ return task;
} else {
_cacheInProgress = false; // caching is complete
_source.closeInput(); // close source stream
+
+ // Notify all the waiting consumers waiting for cache to fill with this stream
+ notifyAll();
+ return (IndexedMatrixValue) NO_MORE_TASKS;
}
- notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream
- return task;
} else {
- // Replay pass: read directly from in-memory cache
- if (_replayPosition < _cache.size()) {
- // Return a copy to ensure consumer won't modify the cache
- return new IndexedMatrixValue(_cache.get(_replayPosition++));
+ // Replay pass: read from the buffer
+ if (_replayPosition < _numBlocks) {
+ return OOCEvictionManager.get(_streamId, _replayPosition++);
} else {
return (IndexedMatrixValue) NO_MORE_TASKS;
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
index fce5408960e..05e31830a56 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
@@ -30,9 +30,6 @@
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
-
-import java.util.concurrent.ExecutorService;
public class TransposeOOCInstruction extends ComputationOOCInstruction {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
index 63f42f5bf15..173486844a6 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -28,9 +28,6 @@
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
-
-import java.util.concurrent.ExecutorService;
public class UnaryOOCInstruction extends ComputationOOCInstruction {
private UnaryOperator _uop = null;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
index 8f82a99abff..7b20fe2f9e5 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
@@ -66,6 +66,10 @@ public MatrixIndexes getIndexes() {
public MatrixValue getValue() {
return _value;
}
+
+ public void setValue(MatrixValue value) {
+ _value = value;
+ }
public void set(MatrixIndexes indexes2, MatrixValue block2) {
_indexes.setIndexes(indexes2);
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java
index 5a3d0a64306..7f0f3b7a658 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java
@@ -108,6 +108,11 @@ public int hashCode() {
public String toString() {
return "("+_row+", "+_col+")";
}
+
+ public MatrixIndexes fromString(String ix) {
+ String[] parts = ix.substring(1, ix.length()-1).split(",");
+ return new MatrixIndexes(Long.parseLong(parts[0]), Long.parseLong(parts[1].trim()));
+ }
////////////////////////////////////////////////////
// implementation of Writable read/write