Skip to content

Commit

Permalink
[Optimization-1326][client] Optimize lineage to support watermark and…
Browse files Browse the repository at this point in the history
… udtf (#1327)

Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Nov 25, 2022
1 parent 6eed6dd commit 11db8e5
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
Expand Down Expand Up @@ -56,11 +55,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;

/**
* LineageContext
*
Expand All @@ -77,33 +71,6 @@ public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentI
this.tableEnv = tableEnv;
}

/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName()),
classPool.get(RelMetadataQuery.class.getName()), CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}

public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
Expand Down Expand Up @@ -178,9 +145,9 @@ public SqlExprToRexConverter create(RelDataType relDataType) {

@Override
public <C> C unwrap(Class<C> clazz) {
if(clazz.isInterface()){
if (clazz.isInterface()) {
return clazz.cast(this);
}else{
} else {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
Expand Down Expand Up @@ -56,11 +55,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;

/**
* LineageContext
*
Expand All @@ -77,33 +71,6 @@ public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentI
this.tableEnv = tableEnv;
}

/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName()),
classPool.get(RelMetadataQuery.class.getName()), CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}

public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
Expand All @@ -48,11 +47,6 @@
import java.util.List;
import java.util.Set;

import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;

/**
* LineageContext
*
Expand All @@ -69,33 +63,6 @@ public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentI
this.tableEnv = tableEnv;
}

/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName()),
classPool.get(RelMetadataQuery.class.getName()), CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}

public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -48,11 +47,6 @@
import java.util.List;
import java.util.Set;

import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;

/**
* LineageContext
*
Expand All @@ -69,33 +63,6 @@ public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentI
this.tableEnv = tableEnv;
}

/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName()),
classPool.get(RelMetadataQuery.class.getName()), CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}

public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -49,11 +48,6 @@
import java.util.List;
import java.util.Set;

import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;

/**
* LineageContext
*
Expand All @@ -70,33 +64,6 @@ public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentI
this.tableEnv = tableEnv;
}

/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName()),
classPool.get(RelMetadataQuery.class.getName()), CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}

public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -49,11 +48,6 @@
import java.util.List;
import java.util.Set;

import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;

/**
* LineageContext
*
Expand All @@ -70,33 +64,6 @@ public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentI
this.tableEnv = tableEnv;
}

/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName()),
classPool.get(RelMetadataQuery.class.getName()), CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}

public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
Expand Down
Loading

0 comments on commit 11db8e5

Please sign in to comment.