Skip to content

Commit

Permalink
py work on peek model.
Browse files Browse the repository at this point in the history
  • Loading branch information
sonalgoyal committed Mar 10, 2023
1 parent 180afef commit 55708e1
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
5 changes: 5 additions & 0 deletions common/core/src/main/java/zingg/common/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface Context <S,D, R, C,T> extends Serializable {

public void init(String license)
throws ZinggClientException;

/**convenience method to set all utils
* especially useful when you dont want to create the connection/spark context etc
* */
public void setUtils();

public S getSession();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ public SparkClient(Arguments args, ClientOptions options) throws ZinggClientExce
}

public SparkClient() {
SparkSession session = SparkSession
/*SparkSession session = SparkSession
.builder()
.appName("Zingg")
.getOrCreate();
JavaSparkContext ctx = new JavaSparkContext(session.sparkContext());
JavaSparkContext ctx = JavaSparkContext.fromSparkContext(session.sparkContext());
JavaSparkContext.jarOfClass(IZingg.class);
*/

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,35 @@
import org.apache.commons.logging.LogFactory;
import org.apache.spark.deploy.PythonRunner;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;

import zingg.common.client.Arguments;
import zingg.common.client.ClientOptions;
import zingg.common.client.ZinggClientException;
import zingg.common.client.ZinggOptions;
import zingg.common.core.executor.ZinggBase;

public class PeekModel extends ZinggBase{
public class SparkPeekModel extends ZinggBase<SparkSession, Dataset<Row>, Row, Column, DataType>{

protected static String name = "zingg.spark.core.executor.PeekModel";
public static final Log LOG = LogFactory.getLog(PeekModel.class);
protected static String name = "zingg.spark.core.executor.SparkPeekModel";
public static final Log LOG = LogFactory.getLog(SparkPeekModel.class);

public PeekModel() {
public SparkPeekModel() {
setZinggOptions(ZinggOptions.PEEK_MODEL);
setContext(new ZinggSparkContext());
}

@Override
public void init(Arguments args, String license)
throws ZinggClientException {
startTime = System.currentTimeMillis();
this.args = args;
this.args = args;
//dont call init here as spark session already created
getContext().setUtils();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public SparkZFactory() {}
zinggers.put(ZinggOptions.UPDATE_LABEL, SparkLabelUpdater.name);
zinggers.put(ZinggOptions.FIND_AND_LABEL, SparkFindAndLabeller.name);
zinggers.put(ZinggOptions.RECOMMEND, SparkRecommender.name);
zinggers.put(ZinggOptions.PEEK_MODEL, SparkPeekModel.name);
}

public IZingg get(ZinggOptions z) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,24 @@ public void init(String license)
//initHashFns();

ctx.setCheckpointDir("/tmp/checkpoint");
setPipeUtil(new SparkPipeUtil(spark));
setDSUtil(new SparkDSUtil(spark));
setHashUtil(new SparkHashUtil(spark));
setGraphUtil(new SparkGraphUtil());
setModelUtil(new SparkModelUtil(spark));
setBlockingTreeUtil(new SparkBlockingTreeUtil(spark, getPipeUtil()));
setUtils();
}
catch(Throwable e) {
if (LOG.isDebugEnabled()) e.printStackTrace();
throw new ZinggClientException(e.getMessage());
}
}

@Override
public void setUtils() {
setPipeUtil(new SparkPipeUtil(spark));
setDSUtil(new SparkDSUtil(spark));
setHashUtil(new SparkHashUtil(spark));
setGraphUtil(new SparkGraphUtil());
setModelUtil(new SparkModelUtil(spark));
setBlockingTreeUtil(new SparkBlockingTreeUtil(spark, getPipeUtil()));
}

/**
public void initHashFns() throws ZinggClientException {
try {
Expand Down

0 comments on commit 55708e1

Please sign in to comment.