diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java new file mode 100644 index 00000000000..2707b9e6eb0 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.controlprogram.caching; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.util.LocalFileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * Eviction Manager for the Out-Of-Core stream cache + * This is the base implementation for LRU, FIFO + * + * Design choice 1: Pure JVM-memory cache + * What: Store MatrixBlock objects in a synchronized in-memory cache + * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock + * only when evicting. + * Pros: Simple to implement; no off-heap management; easy to debug; + * no serialization race since you serialize only when evicting; + * fast cache hits (direct object access). + * Cons: Heap usage counted roughly via serialized-size estimate — actual + * JVM object overhead not accounted; risk of GC pressure and OOM if + * estimates are off or if many small objects cause fragmentation; + * eviction may be more expensive (serialize on eviction). + *

+ * Design choice 2: + *

+ * This manager runtime memory management by caching serialized + * ByteBuffers and spilling them to disk when needed. + *

+ * * core function: Caches ByteBuffers (off-heap/direct) and + * spills them to disk + * * Eviction: Evicts a ByteBuffer by writing its contents to a file + * * Granularity: Evicts one IndexedMatrixValue block at a time + * * Data replay: get() will always return the data either from memory or + * by falling back to the disk + * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk, + * there won't be OOM. + * + * Pros: Avoids heap OOM by keeping large data off-heap; predictable + * memory usage; good for very large blocks. + * Cons: More complex synchronization; need robust off-heap allocator/free; + * must ensure serialization finishes before adding to queue or make evict + * wait on serialization; careful with native memory leaks. + */ +public class OOCEvictionManager { + + // Configuration: OOC buffer limit as percentage of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + + // Memory limit for ByteBuffers + private static long _limit; + private static long _size; + + // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) + private static final Map _cache = new HashMap<>(); + private static final Deque _evictDeque = new ArrayDeque<>(); + + // Single lock for synchronization + private static final Object lock = new Object(); + + // Spill directory for evicted blocks + private static String _spillDir; + + public enum RPolicy { + FIFO, LRU + } + private static RPolicy _policy = RPolicy.FIFO; + + private OOCEvictionManager() {} + + static { + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap + _size = 0; + _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); + LocalFileUtils.createLocalFileIfNotExist(_spillDir); + } + + /** + * Store a block in the OOC cache (serialize once) + */ + public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) { + MatrixBlock mb = (MatrixBlock) value.getValue(); + long size = estimateSerializedSize(mb); + String key = streamId + "_" + blockId; + + synchronized (lock) { + IndexedMatrixValue old = _cache.remove(key); // remove old value + if (old != null) { + _evictDeque.remove(key); + _size -= estimateSerializedSize((MatrixBlock) old.getValue()); + } + + try { + evict(size); + } catch (IOException e) { + throw new DMLRuntimeException(e); + } + + _cache.put(key, value); // put new value + _evictDeque.addLast(key); // add to end for FIFO/LRU + _size += size; + } + } + + /** + * Get a block from the OOC cache (deserialize on read) + */ + public static synchronized IndexedMatrixValue get(long streamId, int blockId) { + + String key = streamId + "_" + blockId; + IndexedMatrixValue imv = _cache.get(key); + + synchronized (lock) { + if (imv != null && _policy == RPolicy.LRU) { + _evictDeque.remove(key); + _evictDeque.addLast(key); + } + } + + if (imv != null) { + return imv; + } else { + try { + return loadFromDisk(streamId, blockId); + } catch (IOException e) { + throw new DMLRuntimeException(e); + } + } + + } + + /** + * Evict ByteBuffers to disk + */ + private static void evict(long requiredSize) throws IOException { + while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) { + System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); + String oldKey = _evictDeque.removeLast(); + IndexedMatrixValue toEvict = _cache.remove(oldKey); + + if (toEvict == null) { continue;} + MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue(); + + // Spill to disk + String filename = _spillDir + "/" + oldKey; + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } + + LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); + + long freedSize = estimateSerializedSize(mbToEvict); + _size -= freedSize; + + } + } + + /** + * Load block from spill file + */ + private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException { + String filename = _spillDir + "/" + streamId + "_" + blockId; + + // check if file exists + if (!LocalFileUtils.isExisting(filename)) { + throw new IOException("File " + filename + " does not exist"); + } + + // Read from disk + MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + + MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); + + // Put back in cache (may trigger eviction) + // get() operation should not modify cache + // put(streamId, blockId, new IndexedMatrixValue(ix, mb)); + + return new IndexedMatrixValue(ix, mb); + } + + private static long estimateSerializedSize(MatrixBlock mb) { + return mb.getExactSerializedSize(); + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 038e1a8b983..ebf2a94be5d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -19,14 +19,17 @@ package org.apache.sysds.runtime.instructions.ooc; +import org.apache.sysds.runtime.controlprogram.caching.OOCEvictionManager; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import java.util.ArrayList; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to * consume again for other operators. + *

+ * Uses OOCEvictionManager for out-of-core caching. * */ public class ResettableStream extends LocalTaskQueue { @@ -34,16 +37,24 @@ public class ResettableStream extends LocalTaskQueue { // original live stream private final LocalTaskQueue _source; - // in-memory cache to store stream for re-play - private final ArrayList _cache; + private static final IDSequence _streamSeq = new IDSequence(); + // stream identifier + private final long _streamId; + + // block counter + private int _numBlocks = 0; + // state flags private boolean _cacheInProgress = true; // caching in progress, in the first pass. private int _replayPosition = 0; // slider position in the stream public ResettableStream(LocalTaskQueue source) { + this(source, _streamSeq.getNextID()); + } + public ResettableStream(LocalTaskQueue source, long streamId) { _source = source; - _cache = new ArrayList<>(); + _streamId = streamId; } /** @@ -51,7 +62,6 @@ public ResettableStream(LocalTaskQueue source) { * For subsequent passes it reads from the memory. * * @return The next matrix value in the stream, or NO_MORE_TASKS - * @throws InterruptedException */ @Override public synchronized IndexedMatrixValue dequeueTask() @@ -60,18 +70,23 @@ public synchronized IndexedMatrixValue dequeueTask() // First pass: Read value from the source and cache it, and return. IndexedMatrixValue task = _source.dequeueTask(); if (task != NO_MORE_TASKS) { - _cache.add(new IndexedMatrixValue(task)); + + OOCEvictionManager.put(_streamId, _numBlocks, task); + _numBlocks++; + + return task; } else { _cacheInProgress = false; // caching is complete _source.closeInput(); // close source stream + + // Notify all the waiting consumers waiting for cache to fill with this stream + notifyAll(); + return (IndexedMatrixValue) NO_MORE_TASKS; } - notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream - return task; } else { - // Replay pass: read directly from in-memory cache - if (_replayPosition < _cache.size()) { - // Return a copy to ensure consumer won't modify the cache - return new IndexedMatrixValue(_cache.get(_replayPosition++)); + // Replay pass: read from the buffer + if (_replayPosition < _numBlocks) { + return OOCEvictionManager.get(_streamId, _replayPosition++); } else { return (IndexedMatrixValue) NO_MORE_TASKS; }