diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java
new file mode 100644
index 00000000000..2707b9e6eb0
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.caching;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Eviction Manager for the Out-Of-Core stream cache
+ * This is the base implementation for LRU, FIFO
+ *
+ * Design choice 1: Pure JVM-memory cache
+ * What: Store MatrixBlock objects in a synchronized in-memory cache
+ * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock
+ * only when evicting.
+ * Pros: Simple to implement; no off-heap management; easy to debug;
+ * no serialization race since you serialize only when evicting;
+ * fast cache hits (direct object access).
+ * Cons: Heap usage counted roughly via serialized-size estimate — actual
+ * JVM object overhead not accounted; risk of GC pressure and OOM if
+ * estimates are off or if many small objects cause fragmentation;
+ * eviction may be more expensive (serialize on eviction).
+ *
+ * Design choice 2:
+ *
+ * This manager runtime memory management by caching serialized
+ * ByteBuffers and spilling them to disk when needed.
+ *
+ * * core function: Caches ByteBuffers (off-heap/direct) and
+ * spills them to disk
+ * * Eviction: Evicts a ByteBuffer by writing its contents to a file
+ * * Granularity: Evicts one IndexedMatrixValue block at a time
+ * * Data replay: get() will always return the data either from memory or
+ * by falling back to the disk
+ * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk,
+ * there won't be OOM.
+ *
+ * Pros: Avoids heap OOM by keeping large data off-heap; predictable
+ * memory usage; good for very large blocks.
+ * Cons: More complex synchronization; need robust off-heap allocator/free;
+ * must ensure serialization finishes before adding to queue or make evict
+ * wait on serialization; careful with native memory leaks.
+ */
+public class OOCEvictionManager {
+
+ // Configuration: OOC buffer limit as percentage of heap
+ private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap
+
+ // Memory limit for ByteBuffers
+ private static long _limit;
+ private static long _size;
+
+ // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block)
+ private static final Map _cache = new HashMap<>();
+ private static final Deque _evictDeque = new ArrayDeque<>();
+
+ // Single lock for synchronization
+ private static final Object lock = new Object();
+
+ // Spill directory for evicted blocks
+ private static String _spillDir;
+
+ public enum RPolicy {
+ FIFO, LRU
+ }
+ private static RPolicy _policy = RPolicy.FIFO;
+
+ private OOCEvictionManager() {}
+
+ static {
+ _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap
+ _size = 0;
+ _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream");
+ LocalFileUtils.createLocalFileIfNotExist(_spillDir);
+ }
+
+ /**
+ * Store a block in the OOC cache (serialize once)
+ */
+ public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) {
+ MatrixBlock mb = (MatrixBlock) value.getValue();
+ long size = estimateSerializedSize(mb);
+ String key = streamId + "_" + blockId;
+
+ synchronized (lock) {
+ IndexedMatrixValue old = _cache.remove(key); // remove old value
+ if (old != null) {
+ _evictDeque.remove(key);
+ _size -= estimateSerializedSize((MatrixBlock) old.getValue());
+ }
+
+ try {
+ evict(size);
+ } catch (IOException e) {
+ throw new DMLRuntimeException(e);
+ }
+
+ _cache.put(key, value); // put new value
+ _evictDeque.addLast(key); // add to end for FIFO/LRU
+ _size += size;
+ }
+ }
+
+ /**
+ * Get a block from the OOC cache (deserialize on read)
+ */
+ public static synchronized IndexedMatrixValue get(long streamId, int blockId) {
+
+ String key = streamId + "_" + blockId;
+ IndexedMatrixValue imv = _cache.get(key);
+
+ synchronized (lock) {
+ if (imv != null && _policy == RPolicy.LRU) {
+ _evictDeque.remove(key);
+ _evictDeque.addLast(key);
+ }
+ }
+
+ if (imv != null) {
+ return imv;
+ } else {
+ try {
+ return loadFromDisk(streamId, blockId);
+ } catch (IOException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ }
+
+ /**
+ * Evict ByteBuffers to disk
+ */
+ private static void evict(long requiredSize) throws IOException {
+ while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) {
+ System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit);
+ String oldKey = _evictDeque.removeLast();
+ IndexedMatrixValue toEvict = _cache.remove(oldKey);
+
+ if (toEvict == null) { continue;}
+ MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue();
+
+ // Spill to disk
+ String filename = _spillDir + "/" + oldKey;
+ File spillDirFile = new File(_spillDir);
+ if (!spillDirFile.exists()) {
+ spillDirFile.mkdirs();
+ }
+
+ LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict);
+
+ long freedSize = estimateSerializedSize(mbToEvict);
+ _size -= freedSize;
+
+ }
+ }
+
+ /**
+ * Load block from spill file
+ */
+ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException {
+ String filename = _spillDir + "/" + streamId + "_" + blockId;
+
+ // check if file exists
+ if (!LocalFileUtils.isExisting(filename)) {
+ throw new IOException("File " + filename + " does not exist");
+ }
+
+ // Read from disk
+ MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename);
+
+ MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1);
+
+ // Put back in cache (may trigger eviction)
+ // get() operation should not modify cache
+ // put(streamId, blockId, new IndexedMatrixValue(ix, mb));
+
+ return new IndexedMatrixValue(ix, mb);
+ }
+
+ private static long estimateSerializedSize(MatrixBlock mb) {
+ return mb.getExactSerializedSize();
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java
index 038e1a8b983..ebf2a94be5d 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java
@@ -19,14 +19,17 @@
package org.apache.sysds.runtime.instructions.ooc;
+import org.apache.sysds.runtime.controlprogram.caching.OOCEvictionManager;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
-import java.util.ArrayList;
/**
* A wrapper around LocalTaskQueue to consume the source stream and reset to
* consume again for other operators.
+ *
+ * Uses OOCEvictionManager for out-of-core caching.
*
*/
public class ResettableStream extends LocalTaskQueue {
@@ -34,16 +37,24 @@ public class ResettableStream extends LocalTaskQueue {
// original live stream
private final LocalTaskQueue _source;
- // in-memory cache to store stream for re-play
- private final ArrayList _cache;
+ private static final IDSequence _streamSeq = new IDSequence();
+ // stream identifier
+ private final long _streamId;
+
+ // block counter
+ private int _numBlocks = 0;
+
// state flags
private boolean _cacheInProgress = true; // caching in progress, in the first pass.
private int _replayPosition = 0; // slider position in the stream
public ResettableStream(LocalTaskQueue source) {
+ this(source, _streamSeq.getNextID());
+ }
+ public ResettableStream(LocalTaskQueue source, long streamId) {
_source = source;
- _cache = new ArrayList<>();
+ _streamId = streamId;
}
/**
@@ -51,7 +62,6 @@ public ResettableStream(LocalTaskQueue source) {
* For subsequent passes it reads from the memory.
*
* @return The next matrix value in the stream, or NO_MORE_TASKS
- * @throws InterruptedException
*/
@Override
public synchronized IndexedMatrixValue dequeueTask()
@@ -60,18 +70,23 @@ public synchronized IndexedMatrixValue dequeueTask()
// First pass: Read value from the source and cache it, and return.
IndexedMatrixValue task = _source.dequeueTask();
if (task != NO_MORE_TASKS) {
- _cache.add(new IndexedMatrixValue(task));
+
+ OOCEvictionManager.put(_streamId, _numBlocks, task);
+ _numBlocks++;
+
+ return task;
} else {
_cacheInProgress = false; // caching is complete
_source.closeInput(); // close source stream
+
+ // Notify all the waiting consumers waiting for cache to fill with this stream
+ notifyAll();
+ return (IndexedMatrixValue) NO_MORE_TASKS;
}
- notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream
- return task;
} else {
- // Replay pass: read directly from in-memory cache
- if (_replayPosition < _cache.size()) {
- // Return a copy to ensure consumer won't modify the cache
- return new IndexedMatrixValue(_cache.get(_replayPosition++));
+ // Replay pass: read from the buffer
+ if (_replayPosition < _numBlocks) {
+ return OOCEvictionManager.get(_streamId, _replayPosition++);
} else {
return (IndexedMatrixValue) NO_MORE_TASKS;
}