Skip to content

Commit

Permalink
[MINOR] Mapping Add A Range Setting
Browse files Browse the repository at this point in the history
This commit adds a range setting function for mappings,
to enable subsequent parallel setting from integer arrays.

While adding the range support the commit also cleanup some edge case
logic on specific map types, to fix 127 vs 128 unique values support in
MapToUByte for instance.

Closes #2164

Signed-off-by: Sebastian Baunsgaard <baunsgaard@apache.org>
  • Loading branch information
Baunsgaard committed Dec 29, 2024
1 parent aaa0192 commit 809490f
Show file tree
Hide file tree
Showing 18 changed files with 1,168 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.PlaceHolderDict;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
Expand Down Expand Up @@ -79,7 +80,7 @@ private ColGroupConst(IColIndex colIndices, IDictionary dict) {
public static AColGroup create(IColIndex colIndices, IDictionary dict) {
if(dict == null)
return new ColGroupEmpty(colIndices);
else if(dict.getNumberOfValues(colIndices.size()) > 1) {
else if(dict.getNumberOfValues(colIndices.size()) > 1 && !(dict instanceof PlaceHolderDict)) {
// extract dict first row
final double[] nd = new double[colIndices.size()];
for(int i = 0; i < colIndices.size(); i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -95,7 +98,6 @@ public final void setUnique(int nUnique) {
*/
public abstract int getIndex(int n);


/**
* Shortcut method to support Integer objects, not really efficient but for the purpose of reusing code.
*
Expand All @@ -116,6 +118,18 @@ public void set(int n, Integer v) {
*/
public abstract void set(int n, int v);

/**
* set a range of values from another map.
*
* The given tm must only contain supported values, and it is not verified.
*
* @param l lower bound
* @param u upper bound (not inclusive)
* @param off offset to take values from tm
* @param tm the other map to copy values from
*/
public abstract void set(int l, int u, int off, AMapToData tm);

/**
* Set the index to the value and get the contained value after.
*
Expand Down Expand Up @@ -411,8 +425,6 @@ public final int[] getCounts() {
* @param nCol The number of columns
*/
public final void preAggregateDDC_DDC(AMapToData tm, IDictionary td, Dictionary ret, int nCol) {
if(td.getNumberOfValues(nCol) != tm.nUnique)
throw new DMLCompressionException("Invalid map and dict combination");
if(nCol == 1)
preAggregateDDC_DDCSingleCol(tm, td.getValues(), ret.getValues());
else
Expand Down Expand Up @@ -788,9 +800,9 @@ public void preAggregateDDC_RLE(int[] ptr, char[] data, IDictionary td, Dictiona
*/
public void copy(AMapToData d) {
if(d.nUnique == 1)
return;
// else if(d instanceof MapToBit)
// copyBit((MapToBit) d);
fill(0);
else if(d instanceof MapToBit)
copyBit((MapToBit) d);
else if(d instanceof MapToInt)
copyInt((MapToInt) d);
else {
Expand All @@ -813,9 +825,18 @@ protected void copyInt(MapToInt d) {
*
* @param d The array to copy
*/
public abstract void copyInt(int[] d);
public void copyInt(int[] d) {
copyInt(d, 0, Math.min(d.length, size()));
}

public abstract void copyInt(int[] d, int start, int end);

public abstract void copyBit(BitSet d);
public void copyBit(MapToBit d) {
fill(0);
for(int i = d.nextSetBit(0); i >= 0; i = d.nextSetBit(i + 1)) {
set(i, 1);
}
}

public int getMax() {
int m = -1;
Expand All @@ -826,13 +847,6 @@ public int getMax() {
return m;
}

/**
* Get the maximum possible value to encode in this encoding. For instance in a bit you can encode 2 values
*
* @return The maximum number of distinct values to encode
*/
public abstract int getMaxPossible();

/**
* Reallocate the map, to a smaller instance if applicable. Note it does not change the length of the array, just the
* datatype.
Expand Down Expand Up @@ -887,7 +901,8 @@ public int countRuns(AOffset off) {

@Override
public boolean equals(Object e) {
return e instanceof AMapToData && (this == e || this.equals((AMapToData) e));
return this == e || // same object or
(e instanceof AMapToData && this.equals((AMapToData) e));
}

/**
Expand All @@ -903,7 +918,7 @@ public void verify() {
if(CompressedMatrixBlock.debug) {
for(int i = 0; i < size(); i++) {
if(getIndex(i) >= nUnique) {
throw new DMLCompressionException("invalid construction of Mapping data containing values above unique");
throw new DMLCompressionException("Invalid construction of Mapping data containing values above unique");
}
}
}
Expand Down Expand Up @@ -934,7 +949,7 @@ public void decompressToRange(double[] c, int rl, int ru, int offR, double[] val
decompressToRangeOff(c, rl, ru, offR, values);
}

public void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
protected void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
for(int i = rl, offT = rl + offR; i < ru; i++, offT++)
c[offT] += values[getIndex(i)];
}
Expand All @@ -950,14 +965,70 @@ protected void decompressToRangeNoOffBy8(double[] c, int r, double[] values) {
c[r + 7] += values[getIndex(r + 7)];
}

public void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
protected void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
final int h = (ru - rl) % 8;
for(int rc = rl; rc < rl + h; rc++)
c[rc] += values[getIndex(rc)];
for(int rc = rl + h; rc < ru; rc += 8)
decompressToRangeNoOffBy8(c, rc, values);
}

/**
* Split this mapping into x smaller mappings according to round robin.
*
* @param multiplier The number of smaller mappings to construct
* @return The list of smaller mappings
*/
public AMapToData[] splitReshapeDDC(final int multiplier) {

final int s = size();
final AMapToData[] ret = new AMapToData[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = MapToFactory.create(eachSize, getUnique());

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
for(int i = 0; i < s; i += blkz)
splitReshapeDDCBlock(ret, multiplier, i, Math.min(i + blkz, s));

return ret;
}

public AMapToData[] splitReshapeDDCPushDown(final int multiplier, final ExecutorService pool) throws Exception {

final int s = size();
final AMapToData[] ret = new AMapToData[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = MapToFactory.create(eachSize, getUnique());

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
List<Future<?>> tasks = new ArrayList<>();
for(int i = 0; i < s; i += blkz) {
final int start = i;
final int end = Math.min(i + blkz, s);
tasks.add(pool.submit(() -> splitReshapeDDCBlock(ret, multiplier, start, end)));
}

for(Future<?> t : tasks)
t.get();

return ret;
}

private void splitReshapeDDCBlock(final AMapToData[] ret, final int multiplier, final int start, final int end) {

for(int i = start; i < end; i += multiplier)
splitReshapeDDCRow(ret, multiplier, i);
}

private void splitReshapeDDCRow(final AMapToData[] ret, final int multiplier, final int i) {
final int off = i / multiplier;
final int end = i + multiplier;
for(int j = i; j < end; j++)
ret[j % multiplier].set(off, getIndex(j));
}

@Override
public String toString() {
final int sz = size();
Expand Down
Loading

0 comments on commit 809490f

Please sign in to comment.