Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3576] CLALib Combine Columngroups With Morphing #2162

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 215 additions & 10 deletions src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex.SliceResult;
import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
import org.apache.sysds.runtime.compress.lib.CLALibCombineGroups;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlockMCSR;
import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.instructions.cp.CM_COV_Object;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
Expand Down Expand Up @@ -164,14 +171,32 @@ public final void decompressToSparseBlock(SparseBlock sb, int rl, int ru) {
/**
* Decompress a range of rows into a dense block
*
* @param db Sparse Target block
* @param db Dense target block
* @param rl Row to start at
* @param ru Row to end at
*/
public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) {
decompressToDenseBlock(db, rl, ru, 0, 0);
}

/**
* Decompress a range of rows into a dense transposed block.
*
* @param db Dense target block
* @param rl Row in this column group to start at.
* @param ru Row in this column group to end at.
*/
public abstract void decompressToDenseBlockTransposed(DenseBlock db, int rl, int ru);

/**
* Decompress the column group to the sparse transposed block. Note that the column groups would only need to
* decompress into specific sub rows of the Sparse block
*
* @param sb Sparse target block
* @param nColOut The number of columns in the sb.
*/
public abstract void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int nColOut);

/**
* Serializes column group to data output.
*
Expand Down Expand Up @@ -320,7 +345,7 @@ public double get(int r, int c) {
*
* @param db Target DenseBlock
* @param rl Row to start decompression from
* @param ru Row to end decompression at
* @param ru Row to end decompression at (not inclusive)
* @param offR Row offset into the target to decompress
* @param offC Column offset into the target to decompress
*/
Expand All @@ -334,7 +359,7 @@ public double get(int r, int c) {
*
* @param sb Target SparseBlock
* @param rl Row to start decompression from
* @param ru Row to end decompression at
* @param ru Row to end decompression at (not inclusive)
* @param offR Row offset into the target to decompress
* @param offC Column offset into the target to decompress
*/
Expand All @@ -349,7 +374,7 @@ public double get(int r, int c) {
* @return The new Column Group or null that is the result of the matrix multiplication.
*/
public final AColGroup rightMultByMatrix(MatrixBlock right) {
return rightMultByMatrix(right, null);
return rightMultByMatrix(right, null, 1);
}

/**
Expand All @@ -360,9 +385,25 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) {
* @param right The MatrixBlock on the right of this matrix multiplication
* @param allCols A pre-materialized list of all col indexes, that can be shared across all column groups if use
* full, can be set to null.
* @param k The parallelization degree allowed internally in this operation.
* @return The new Column Group or null that is the result of the matrix multiplication.
*/
public abstract AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols);
public abstract AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols, int k);

/**
* Right side Matrix multiplication, iterating though this column group and adding to the ret
*
* @param right Right side matrix to multiply with.
* @param ret The return matrix to add results to
* @param rl The row of this column group to multiply from
* @param ru The row of this column group to multiply to (not inclusive)
* @param crl The right hand side column lower
* @param cru The right hand side column upper
* @param nRows The number of rows in this column group
*/
public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, int cru){
throw new NotImplementedException("not supporting right Decompressing Multiply on class: " + this.getClass().getSimpleName());
}

/**
* Do a transposed self matrix multiplication on the left side t(x) %*% x. but only with this column group.
Expand Down Expand Up @@ -671,11 +712,31 @@ public void clear() {
/**
* Recompress this column group into a new column group of the given type.
*
* @param ct The compressionType that the column group should morph into
* @param ct The compressionType that the column group should morph into
* @param nRow The number of rows in this columngroup.
* @return A new column group
*/
public AColGroup morph(CompressionType ct) {
throw new NotImplementedException();
public AColGroup morph(CompressionType ct, int nRow) {
if(ct == getCompType())
return this;
else if(ct == CompressionType.DDCFOR)
return this; // it does not make sense to change to FOR.
else if(ct == CompressionType.UNCOMPRESSED) {
AColGroup cgMoved = this.copyAndSet(ColIndexFactory.create(_colIndexes.size()));
final long nnz = getNumberNonZeros(nRow);
MatrixBlock newDict = new MatrixBlock(nRow, _colIndexes.size(), nnz);
newDict.allocateBlock();
if(newDict.isInSparseFormat())
cgMoved.decompressToSparseBlock(newDict.getSparseBlock(), 0, nRow);
else
cgMoved.decompressToDenseBlock(newDict.getDenseBlock(), 0, nRow);
newDict.setNonZeros(nnz);
AColGroup cgUC = ColGroupUncompressed.create(newDict);
return cgUC.copyAndSet(_colIndexes);
}
else {
throw new NotImplementedException("Morphing from : " + getCompType() + " to " + ct + " is not implemented");
}
}

/**
Expand All @@ -690,10 +751,11 @@ public AColGroup morph(CompressionType ct) {
* Combine this column group with another
*
* @param other The other column group to combine with.
* @param nRow The number of rows in both column groups.
* @return A combined representation as a column group.
*/
public AColGroup combine(AColGroup other) {
return CLALibCombineGroups.combine(this, other);
public AColGroup combine(AColGroup other, int nRow) {
return CLALibCombineGroups.combine(this, other, nRow);
}

/**
Expand Down Expand Up @@ -744,6 +806,13 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo
else
denseSelection(selection, points, ret, rl, ru);
}

/**
* Get an approximate sparsity of this column group
*
* @return the approximate sparsity of this columngroup
*/
public abstract double getSparsity();

/**
* Sparse selection (left matrix multiply)
Expand All @@ -767,6 +836,142 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo
*/
protected abstract void denseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru);

/**
* Method to determine if the columnGroup have the same index structure as another. Note that the column indexes and
* dictionaries are allowed to be different.
*
* @param that the other column group
* @return if the index is the same.
*/
public boolean sameIndexStructure(AColGroup that) {
return false;
}

/**
* C bind the list of column groups with this column group. the list of elements provided in the index of each list
* is guaranteed to have the same index structures
*
* @param nRow The number of rows contained in all right and this column group.
* @param nCol The number of columns to shift the right hand side column groups over when combining, this should
* only effect the column indexes
* @param right The right hand side column groups to combine. NOTE only the index offset of the second nested list
* should be used. The reason for providing this nested list is to avoid redundant allocations in
* calling methods.
* @return A combined compressed column group of the same type as this!.
*/
public AColGroup combineWithSameIndex(int nRow, int nCol, List<AColGroup> right) {
// default decompress... nasty !

IColIndex combinedColIndex = combineColIndexes(nCol, right);

MatrixBlock decompressTarget = new MatrixBlock(nRow, combinedColIndex.size(), false);

decompressTarget.allocateDenseBlock();
DenseBlock db = decompressTarget.getDenseBlock();
final int nColInThisGroup = _colIndexes.size();
this.copyAndSet(ColIndexFactory.create(nColInThisGroup)).decompressToDenseBlock(db, 0, nRow);

for(int i = 0; i < right.size(); i++) {
right.get(i).copyAndSet(ColIndexFactory.create(i * nColInThisGroup, i * nColInThisGroup + nColInThisGroup))
.decompressToDenseBlock(db, 0, nRow);
}

decompressTarget.setNonZeros(nRow * combinedColIndex.size());

CompressedSizeInfoColGroup ci = new CompressedSizeInfoColGroup(ColIndexFactory.create(combinedColIndex.size()),
nRow, nRow, CompressionType.DDC);
CompressedSizeInfo csi = new CompressedSizeInfo(ci);

CompressionSettings cs = new CompressionSettingsBuilder().create();
return ColGroupFactory.compressColGroups(decompressTarget, csi, cs).get(0).copyAndSet(combinedColIndex);
}

/**
* C bind the given column group to this.
*
* @param nRow The number of rows contained in the right and this column group.
* @param nCol The number of columns in this.
* @param right The column group to c-bind.
* @return a new combined column groups.
*/
public AColGroup combineWithSameIndex(int nRow, int nCol, AColGroup right) {

IColIndex combinedColIndex = _colIndexes.combine(right._colIndexes.shift(nCol));

MatrixBlock decompressTarget = new MatrixBlock(nRow, combinedColIndex.size(), false);

decompressTarget.allocateDenseBlock();
DenseBlock db = decompressTarget.getDenseBlock();
final int nColInThisGroup = _colIndexes.size();
this.copyAndSet(ColIndexFactory.create(nColInThisGroup)).decompressToDenseBlock(db, 0, nRow);

right.copyAndSet(ColIndexFactory.create(nColInThisGroup, nColInThisGroup + nColInThisGroup))
.decompressToDenseBlock(db, 0, nRow);

decompressTarget.setNonZeros(nRow * combinedColIndex.size());

CompressedSizeInfoColGroup ci = new CompressedSizeInfoColGroup(ColIndexFactory.create(combinedColIndex.size()),
nRow, nRow, CompressionType.DDC);
CompressedSizeInfo csi = new CompressedSizeInfo(ci);

CompressionSettings cs = new CompressionSettingsBuilder().create();
return ColGroupFactory.compressColGroups(decompressTarget, csi, cs).get(0).copyAndSet(combinedColIndex);
// throw new NotImplementedException("Combine of : " + this.getClass().getSimpleName() + " not implemented");
}

protected IColIndex combineColIndexes(final int nCol, List<AColGroup> right) {
IColIndex combinedColIndex = _colIndexes;
for(int i = 0; i < right.size(); i++)
combinedColIndex = combinedColIndex.combine(right.get(i).getColIndices().shift(nCol * i + nCol));
return combinedColIndex;
}

/**
* This method returns a list of column groups that are naive splits of this column group as if it is reshaped.
*
* This means the column groups rows are split into x number of other column groups where x is the multiplier.
*
* The indexes are assigned round robbin to each of the output groups, meaning the first index is assigned.
*
* If for instance the 4. column group is split by a 2 multiplier and there was 5 columns in total originally. The
* output becomes 2 column groups at column index 4 and one at 9.
*
* If possible the split column groups should reuse pointers back to the original dictionaries!
*
* @param multiplier The number of column groups to split into
* @param nRow The number of rows in this column group in case the underlying column group does not know
* @param nColOrg The number of overall columns in the host CompressedMatrixBlock.
* @return a list of split column groups
*/
public abstract AColGroup[] splitReshape(final int multiplier, final int nRow, final int nColOrg);

/**
* This method returns a list of column groups that are naive splits of this column group as if it is reshaped.
*
* This means the column groups rows are split into x number of other column groups where x is the multiplier.
*
* The indexes are assigned round robbin to each of the output groups, meaning the first index is assigned.
*
* If for instance the 4. column group is split by a 2 multiplier and there was 5 columns in total originally. The
* output becomes 2 column groups at column index 4 and one at 9.
*
* If possible the split column groups should reuse pointers back to the original dictionaries!
*
* This specific variation is pushing down the parallelization given via the executor service provided. If not
* overwritten the default is to call the normal split reshape
*
* @param multiplier The number of column groups to split into
* @param nRow The number of rows in this column group in case the underlying column group does not know
* @param nColOrg The number of overall columns in the host CompressedMatrixBlock
* @param pool The executor service to submit parallel tasks to
* @throws Exception In case there is an error we throw the exception out instead of handling it
* @return a list of split column groups
*/
public AColGroup[] splitReshapePushDown(final int multiplier, final int nRow, final int nColOrg,
final ExecutorService pool) throws Exception {
return splitReshape(multiplier, nRow, nColOrg);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ protected AColGroupCompressed(IColIndex colIndices) {

protected abstract double[] preAggBuiltinRows(Builtin builtin);

@Override
public boolean sameIndexStructure(AColGroup that) {
if(that instanceof AColGroupCompressed)
return sameIndexStructure((AColGroupCompressed) that);
else
return false;
}

public abstract boolean sameIndexStructure(AColGroupCompressed that);

public double[] preAggRows(ValueFunction fn) {
Expand Down Expand Up @@ -215,7 +223,8 @@ protected static void tsmm(double[] result, int numColumns, int[] counts, IDicti

}

protected static void tsmmDense(double[] result, int numColumns, double[] values, int[] counts, IColIndex colIndexes) {
protected static void tsmmDense(double[] result, int numColumns, double[] values, int[] counts,
IColIndex colIndexes) {
final int nCol = colIndexes.size();
final int nRow = counts.length;
for(int k = 0; k < nRow; k++) {
Expand All @@ -231,7 +240,8 @@ protected static void tsmmDense(double[] result, int numColumns, double[] values
}
}

protected static void tsmmSparse(double[] result, int numColumns, SparseBlock sb, int[] counts, IColIndex colIndexes) {
protected static void tsmmSparse(double[] result, int numColumns, SparseBlock sb, int[] counts,
IColIndex colIndexes) {
for(int row = 0; row < counts.length; row++) {
if(sb.isEmpty(row))
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,9 @@ public long getExactSizeOnDisk() {
public boolean containZerosTuples() {
return _zeros;
}

@Override
protected boolean allowShallowIdentityRightMult() {
return true;
}
}
Loading
Loading