Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.PlaceHolderDict;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
Expand Down Expand Up @@ -79,7 +80,7 @@ private ColGroupConst(IColIndex colIndices, IDictionary dict) {
public static AColGroup create(IColIndex colIndices, IDictionary dict) {
if(dict == null)
return new ColGroupEmpty(colIndices);
else if(dict.getNumberOfValues(colIndices.size()) > 1) {
else if(dict.getNumberOfValues(colIndices.size()) > 1 && !(dict instanceof PlaceHolderDict)) {
// extract dict first row
final double[] nd = new double[colIndices.size()];
for(int i = 0; i < colIndices.size(); i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -95,7 +98,6 @@ public final void setUnique(int nUnique) {
*/
public abstract int getIndex(int n);


/**
* Shortcut method to support Integer objects, not really efficient but for the purpose of reusing code.
*
Expand All @@ -116,6 +118,18 @@ public void set(int n, Integer v) {
*/
public abstract void set(int n, int v);

/**
* set a range of values from another map.
*
* The given tm must only contain supported values, and it is not verified.
*
* @param l lower bound
* @param u upper bound (not inclusive)
* @param off offset to take values from tm
* @param tm the other map to copy values from
*/
public abstract void set(int l, int u, int off, AMapToData tm);

/**
* Set the index to the value and get the contained value after.
*
Expand Down Expand Up @@ -411,8 +425,6 @@ public final int[] getCounts() {
* @param nCol The number of columns
*/
public final void preAggregateDDC_DDC(AMapToData tm, IDictionary td, Dictionary ret, int nCol) {
if(td.getNumberOfValues(nCol) != tm.nUnique)
throw new DMLCompressionException("Invalid map and dict combination");
if(nCol == 1)
preAggregateDDC_DDCSingleCol(tm, td.getValues(), ret.getValues());
else
Expand Down Expand Up @@ -788,9 +800,9 @@ public void preAggregateDDC_RLE(int[] ptr, char[] data, IDictionary td, Dictiona
*/
public void copy(AMapToData d) {
if(d.nUnique == 1)
return;
// else if(d instanceof MapToBit)
// copyBit((MapToBit) d);
fill(0);
else if(d instanceof MapToBit)
copyBit((MapToBit) d);
else if(d instanceof MapToInt)
copyInt((MapToInt) d);
else {
Expand All @@ -813,9 +825,18 @@ protected void copyInt(MapToInt d) {
*
* @param d The array to copy
*/
public abstract void copyInt(int[] d);
public void copyInt(int[] d) {
copyInt(d, 0, Math.min(d.length, size()));
}

public abstract void copyInt(int[] d, int start, int end);

public abstract void copyBit(BitSet d);
public void copyBit(MapToBit d) {
fill(0);
for(int i = d.nextSetBit(0); i >= 0; i = d.nextSetBit(i + 1)) {
set(i, 1);
}
}

public int getMax() {
int m = -1;
Expand All @@ -826,13 +847,6 @@ public int getMax() {
return m;
}

/**
* Get the maximum possible value to encode in this encoding. For instance in a bit you can encode 2 values
*
* @return The maximum number of distinct values to encode
*/
public abstract int getMaxPossible();

/**
* Reallocate the map, to a smaller instance if applicable. Note it does not change the length of the array, just the
* datatype.
Expand Down Expand Up @@ -887,7 +901,8 @@ public int countRuns(AOffset off) {

@Override
public boolean equals(Object e) {
return e instanceof AMapToData && (this == e || this.equals((AMapToData) e));
return this == e || // same object or
(e instanceof AMapToData && this.equals((AMapToData) e));
}

/**
Expand All @@ -903,7 +918,7 @@ public void verify() {
if(CompressedMatrixBlock.debug) {
for(int i = 0; i < size(); i++) {
if(getIndex(i) >= nUnique) {
throw new DMLCompressionException("invalid construction of Mapping data containing values above unique");
throw new DMLCompressionException("Invalid construction of Mapping data containing values above unique");
}
}
}
Expand Down Expand Up @@ -934,7 +949,7 @@ public void decompressToRange(double[] c, int rl, int ru, int offR, double[] val
decompressToRangeOff(c, rl, ru, offR, values);
}

public void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
protected void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
for(int i = rl, offT = rl + offR; i < ru; i++, offT++)
c[offT] += values[getIndex(i)];
}
Expand All @@ -950,14 +965,70 @@ protected void decompressToRangeNoOffBy8(double[] c, int r, double[] values) {
c[r + 7] += values[getIndex(r + 7)];
}

public void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
protected void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
final int h = (ru - rl) % 8;
for(int rc = rl; rc < rl + h; rc++)
c[rc] += values[getIndex(rc)];
for(int rc = rl + h; rc < ru; rc += 8)
decompressToRangeNoOffBy8(c, rc, values);
}

/**
* Split this mapping into x smaller mappings according to round robin.
*
* @param multiplier The number of smaller mappings to construct
* @return The list of smaller mappings
*/
public AMapToData[] splitReshapeDDC(final int multiplier) {

final int s = size();
final AMapToData[] ret = new AMapToData[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = MapToFactory.create(eachSize, getUnique());

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
for(int i = 0; i < s; i += blkz)
splitReshapeDDCBlock(ret, multiplier, i, Math.min(i + blkz, s));

return ret;
}

public AMapToData[] splitReshapeDDCPushDown(final int multiplier, final ExecutorService pool) throws Exception {

final int s = size();
final AMapToData[] ret = new AMapToData[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = MapToFactory.create(eachSize, getUnique());

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
List<Future<?>> tasks = new ArrayList<>();
for(int i = 0; i < s; i += blkz) {
final int start = i;
final int end = Math.min(i + blkz, s);
tasks.add(pool.submit(() -> splitReshapeDDCBlock(ret, multiplier, start, end)));
}

for(Future<?> t : tasks)
t.get();

return ret;
}

private void splitReshapeDDCBlock(final AMapToData[] ret, final int multiplier, final int start, final int end) {

for(int i = start; i < end; i += multiplier)
splitReshapeDDCRow(ret, multiplier, i);
}

private void splitReshapeDDCRow(final AMapToData[] ret, final int multiplier, final int i) {
final int off = i / multiplier;
final int end = i + multiplier;
for(int j = i; j < end; j++)
ret[j % multiplier].set(off, getIndex(j));
}

@Override
public String toString() {
final int sz = size();
Expand Down
Loading
Loading