diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java index 8c8a64b0225..c87b3c99cf2 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java @@ -36,10 +36,8 @@ import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.CommonThreadPool; import java.util.HashMap; -import java.util.concurrent.ExecutorService; public class AggregateUnaryOOCInstruction extends ComputationOOCInstruction { private AggregateOperator _aop = null; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java index 82ad12ae554..1dfc99be811 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java @@ -19,8 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.util.concurrent.ExecutorService; - import org.apache.sysds.common.Types.DataType; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; @@ -33,7 +31,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; public class BinaryOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java index c1d1ed6ace7..aa215e83e90 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java @@ -20,7 +20,6 @@ package org.apache.sysds.runtime.instructions.ooc; import java.util.HashMap; -import java.util.concurrent.ExecutorService; import org.apache.sysds.common.Opcodes; import org.apache.sysds.conf.ConfigurationManager; @@ -39,7 +38,6 @@ import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; -import org.apache.sysds.runtime.util.CommonThreadPool; public class MatrixVectorBinaryOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java new file mode 100644 index 00000000000..747167d5102 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.LocalFileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Eviction Manager for the Out-Of-Core stream cache + * This is the base implementation for LRU, FIFO + * + * Design choice 1: Pure JVM-memory cache + * What: Store MatrixBlock objects in a synchronized in-memory cache + * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock + * only when evicting. + * Pros: Simple to implement; no off-heap management; easy to debug; + * no serialization race since you serialize only when evicting; + * fast cache hits (direct object access). + * Cons: Heap usage counted roughly via serialized-size estimate — actual + * JVM object overhead not accounted; risk of GC pressure and OOM if + * estimates are off or if many small objects cause fragmentation; + * eviction may be more expensive (serialize on eviction). + *

+ * Design choice 2: + *

+ * This manager runtime memory management by caching serialized + * ByteBuffers and spilling them to disk when needed. + *

+ * * core function: Caches ByteBuffers (off-heap/direct) and + * spills them to disk + * * Eviction: Evicts a ByteBuffer by writing its contents to a file + * * Granularity: Evicts one IndexedMatrixValue block at a time + * * Data replay: get() will always return the data either from memory or + * by falling back to the disk + * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk, + * there won't be OOM. + * + * Pros: Avoids heap OOM by keeping large data off-heap; predictable + * memory usage; good for very large blocks. + * Cons: More complex synchronization; need robust off-heap allocator/free; + * must ensure serialization finishes before adding to queue or make evict + * wait on serialization; careful with native memory leaks. + */ +public class OOCEvictionManager { + + // Configuration: OOC buffer limit as percentage of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.00015; // 15% of heap + + // Memory limit for ByteBuffers + private static long _limit; + private static long _size; + + // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) + private static LinkedHashMap _cache = new LinkedHashMap<>(); + + // Spill directory for evicted blocks + private static String _spillDir; + + public enum RPolicy { + FIFO, LRU + } + private static RPolicy _policy = RPolicy.FIFO; + + static { + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap + _size = 0; + _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); + LocalFileUtils.createLocalFileIfNotExist(_spillDir); + } + + /** + * Store a block in the OOC cache (serialize once) + */ + public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) { + MatrixBlock mb = (MatrixBlock) value.getValue(); + long size = estimateSerializedSize(mb); + String key = streamId + "_" + blockId; + + IndexedMatrixValue old = _cache.remove(key); // remove old value + if (old != null) { + _size -= estimateSerializedSize((MatrixBlock) old.getValue()); + } + + //make room if needed + evict(size); + + _cache.put(key, value); // put new value last + _size += size; + } + + /** + * Get a block from the OOC cache (deserialize on read) + */ + public static synchronized IndexedMatrixValue get(long streamId, int blockId) { + String key = streamId + "_" + blockId; + IndexedMatrixValue imv = _cache.get(key); + + if (imv != null && _policy == RPolicy.LRU) { + _cache.remove(key); + _cache.put(key, imv); //add last semantic + } + + //restore if needed + return (imv.getValue() != null) ? imv : + loadFromDisk(streamId, blockId); + } + + /** + * Evict ByteBuffers to disk + */ + private static void evict(long requiredSize) { + try { + int pos = 0; + while(_size + requiredSize > _limit && pos++ < _cache.size()) { + //System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); + Map.Entry tmp = removeFirstFromCache(); + if( tmp == null || tmp.getValue().getValue() == null ) { + if( tmp != null ) + _cache.put(tmp.getKey(), tmp.getValue()); + continue; + } + + // Spill to disk + String filename = _spillDir + "/" + tmp.getKey(); + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } + LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().getValue()); + + // Evict from memory + long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().getValue()); + tmp.getValue().setValue(null); + _cache.put(tmp.getKey(), tmp.getValue()); // add last semantic + _size -= freedSize; + } + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + + /** + * Load block from spill file + */ + private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { + String key = streamId + "_" + blockId; + String filename = _spillDir + "/" + key; + + try { + // check if file exists + if (!LocalFileUtils.isExisting(filename)) { + throw new IOException("File " + filename + " does not exist"); + } + + // Read from disk and put into original indexed matrix value + MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + IndexedMatrixValue imv = _cache.get(key); + imv.setValue(mb); + return imv; + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + + private static long estimateSerializedSize(MatrixBlock mb) { + return mb.getExactSerializedSize(); + } + + private static Map.Entry removeFirstFromCache() { + //move iterator to first entry + Iterator> iter = _cache.entrySet().iterator(); + Map.Entry entry = iter.next(); + + //remove current iterator entry + iter.remove(); + + return entry; + } +} diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java index 06386c5d66c..3c78879b45d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java @@ -19,8 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.util.concurrent.ExecutorService; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -40,7 +38,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.CommonThreadPool; public class ReblockOOCInstruction extends ComputationOOCInstruction { private int blen; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 038e1a8b983..6179811f7a7 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -20,13 +20,15 @@ package org.apache.sysds.runtime.instructions.ooc; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import java.util.ArrayList; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to * consume again for other operators. + *

+ * Uses OOCEvictionManager for out-of-core caching. * */ public class ResettableStream extends LocalTaskQueue { @@ -34,16 +36,24 @@ public class ResettableStream extends LocalTaskQueue { // original live stream private final LocalTaskQueue _source; - // in-memory cache to store stream for re-play - private final ArrayList _cache; + private static final IDSequence _streamSeq = new IDSequence(); + // stream identifier + private final long _streamId; + + // block counter + private int _numBlocks = 0; + // state flags private boolean _cacheInProgress = true; // caching in progress, in the first pass. private int _replayPosition = 0; // slider position in the stream public ResettableStream(LocalTaskQueue source) { + this(source, _streamSeq.getNextID()); + } + public ResettableStream(LocalTaskQueue source, long streamId) { _source = source; - _cache = new ArrayList<>(); + _streamId = streamId; } /** @@ -51,7 +61,6 @@ public ResettableStream(LocalTaskQueue source) { * For subsequent passes it reads from the memory. * * @return The next matrix value in the stream, or NO_MORE_TASKS - * @throws InterruptedException */ @Override public synchronized IndexedMatrixValue dequeueTask() @@ -60,18 +69,23 @@ public synchronized IndexedMatrixValue dequeueTask() // First pass: Read value from the source and cache it, and return. IndexedMatrixValue task = _source.dequeueTask(); if (task != NO_MORE_TASKS) { - _cache.add(new IndexedMatrixValue(task)); + + OOCEvictionManager.put(_streamId, _numBlocks, task); + _numBlocks++; + + return task; } else { _cacheInProgress = false; // caching is complete _source.closeInput(); // close source stream + + // Notify all the waiting consumers waiting for cache to fill with this stream + notifyAll(); + return (IndexedMatrixValue) NO_MORE_TASKS; } - notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream - return task; } else { - // Replay pass: read directly from in-memory cache - if (_replayPosition < _cache.size()) { - // Return a copy to ensure consumer won't modify the cache - return new IndexedMatrixValue(_cache.get(_replayPosition++)); + // Replay pass: read from the buffer + if (_replayPosition < _numBlocks) { + return OOCEvictionManager.get(_streamId, _replayPosition++); } else { return (IndexedMatrixValue) NO_MORE_TASKS; } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java index fce5408960e..05e31830a56 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java @@ -30,9 +30,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.ReorgOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; - -import java.util.concurrent.ExecutorService; public class TransposeOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 63f42f5bf15..173486844a6 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -28,9 +28,6 @@ import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; - -import java.util.concurrent.ExecutorService; public class UnaryOOCInstruction extends ComputationOOCInstruction { private UnaryOperator _uop = null; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java index 8f82a99abff..7b20fe2f9e5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java @@ -66,6 +66,10 @@ public MatrixIndexes getIndexes() { public MatrixValue getValue() { return _value; } + + public void setValue(MatrixValue value) { + _value = value; + } public void set(MatrixIndexes indexes2, MatrixValue block2) { _indexes.setIndexes(indexes2); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java index 5a3d0a64306..7f0f3b7a658 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java @@ -108,6 +108,11 @@ public int hashCode() { public String toString() { return "("+_row+", "+_col+")"; } + + public MatrixIndexes fromString(String ix) { + String[] parts = ix.substring(1, ix.length()-1).split(","); + return new MatrixIndexes(Long.parseLong(parts[0]), Long.parseLong(parts[1].trim())); + } //////////////////////////////////////////////////// // implementation of Writable read/write