Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-2650] Standardize deduplicated lineage trace serialization … #2072

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
@Override
protected FrameBlock reconstructByLineage(LineageItem li) throws IOException {
return ((FrameObject) LineageRecomputeUtils
.parseNComputeLineageTrace(li.getData(), null))
.parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String output
@Override
protected MatrixBlock reconstructByLineage(LineageItem li) throws IOException {
return ((MatrixObject) LineageRecomputeUtils
.parseNComputeLineageTrace(Explain.explain(li), null))
.parseNComputeLineageTrace(Explain.explain(li)))
.acquireReadAndRelease();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
@Override
protected TensorBlock reconstructByLineage(LineageItem li) throws IOException {
return ((TensorObject) LineageRecomputeUtils
.parseNComputeLineageTrace(li.getData(), null))
.parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ else if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {

LineageItem li = ec.getLineageItem(input1);
String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) :
Explain.explain(li) + LineageDedupUtils.mergeExplainDedupBlocks(ec);
Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec);
ec.setScalarOutput(outputName, new StringObject(out));
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,9 @@ private void processWriteLI(CPOperand input1, CPOperand input2, ExecutionContext
LineageItem li = get(input1);
String fName = ec.getScalarInput(input2.getName(), Types.ValueType.STRING, input2.isLiteral()).getStringValue();

if (DMLScript.LINEAGE_DEDUP) {
// gracefully serialize the dedup maps without decompressing
LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup");
}
LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage");
// Combine the global trace and dedup patches in a single file.
String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) :
Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec);
LineageItemUtils.writeTraceToHDFS(out, fName + ".lineage");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static LineageItem parseLineageInstruction(Long id, String str, Map<Long
}
return new LineageItem(id, "", opcode, inputs.toArray(new LineageItem[0]), specialValueBits);
}

protected static void parseLineageTraceDedup(String str) {
str.replaceAll("\r\n", "\n");
String[] allPatches = str.split("\n\n");
Expand All @@ -145,4 +145,22 @@ protected static void parseLineageTraceDedup(String str) {
loopItem.patchLiMap.get(pathId).put(parts[1], patchLi);
}
}

protected static String[] separateMainAndDedupPatches(String str) {
str.replaceAll("\r\n", "\n");
String[] allPatches = str.split("\n\n");
if (allPatches.length == 1) //no dedup patches
return allPatches;

// Merge the dedup patches into a single string
String[] patches = new String[2];
patches[0] = allPatches[0];
StringBuilder sb = new StringBuilder();
for (int i=1; i<allPatches.length; i++) {
sb.append(allPatches[i]);
sb.append("\n\n");
}
patches[1] = sb.toString();
return patches;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,18 @@
public class LineageRecomputeUtils {
private static final String LVARPREFIX = "lvar";
public static final String LPLACEHOLDER = "IN#";
private static final boolean DEBUG = false;
private static final boolean DEBUG = true;
public static Map<String, DedupLoopItem> loopPatchMap = new HashMap<>();

public static Data parseNComputeLineageTrace(String mainTrace, String dedupPatches) {
if (DEBUG) {
public static Data parseNComputeLineageTrace(String mainTrace) {
if (DEBUG)
System.out.println(mainTrace);
System.out.println(dedupPatches);
}
LineageItem root = LineageParser.parseLineageTrace(mainTrace);
if (dedupPatches != null)
LineageParser.parseLineageTraceDedup(dedupPatches);

// Separate the global trace and the dedup patches
String[] patches = LineageParser.separateMainAndDedupPatches(mainTrace);
LineageItem root = LineageParser.parseLineageTrace(patches[0]); //global trace
if (patches.length > 1)
LineageParser.parseLineageTraceDedup(patches[1]);

// Disable GPU execution. TODO: Support GPU
boolean GPUenabled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void testLineageTrace(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
TestUtils.compareMatrices(dmlfile, tmp, 1e-6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private void testLineageTraceBuiltin(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ public void testLineageTrace(String testname) {

//deserialize, generate program and execute
String Rtrace = readDMLLineageFromHDFS("R");
String RDedupPatches = readDMLLineageDedupFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

//match the original and recomputed results
HashMap<CellIndex, Double> orig = readDMLMatrixFromOutputDir("R");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ public void testLineageTraceSpark(String testname) {
TestUtils.compareScalars(Y_lineage, Explain.explain(Y_li));

//generate program
Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null);
Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage);
HashMap<MatrixValue.CellIndex, Double> X_dmlfile = readDMLMatrixFromOutputDir("X");
MatrixBlock X_tmp = ((MatrixObject)X_data).acquireReadAndRelease();
TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6);

Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null);
Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage);
HashMap<MatrixValue.CellIndex, Double> Y_dmlfile = readDMLMatrixFromOutputDir("Y");
MatrixBlock Y_tmp = ((MatrixObject)Y_data).acquireReadAndRelease();
TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void testLineageTraceExec(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) {
double val1 = readDMLScalarFromOutputDir("R").get(new CellIndex(1,1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void testLineageTraceFunction(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void testLineageTraceExec(String testname) {
String Rtrace = readDMLLineageFromHDFS("R");
AutomatedTestBase.TEST_GPU = false;
//NOTE: the generated program is CP-only.
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private void testLineageTraceParFor(int ncol, String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject) ret).acquireReadAndRelease();
Expand Down
Loading