Skip to content

Commit

Permalink
Merge pull request #986 from Nitish1814/fabric-fix
Browse files Browse the repository at this point in the history
added check before setting checkpoint directory
  • Loading branch information
sonalgoyal authored Dec 17, 2024
2 parents 176804a + 7ad79c4 commit 1f2d833
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
13 changes: 10 additions & 3 deletions spark/client/src/main/java/zingg/spark/client/SparkClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package zingg.spark.client;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -79,12 +80,18 @@ public SparkSession getSession() {
SparkSession s = SparkSession
.builder()
.appName("Zingg")
.getOrCreate();
JavaSparkContext ctx = JavaSparkContext.fromSparkContext(s.sparkContext());
.getOrCreate();
SparkContext sparkContext = s.sparkContext();
if (sparkContext.getCheckpointDir().isEmpty()) {
sparkContext.setCheckpointDir("/tmp/checkpoint");
}
JavaSparkContext ctx = JavaSparkContext.fromSparkContext(sparkContext);
JavaSparkContext.jarOfClass(IZingg.class);
LOG.debug("Context " + ctx.toString());
//initHashFns();
ctx.setCheckpointDir("/tmp/checkpoint");
if (!ctx.getCheckpointDir().isPresent()) {
ctx.setCheckpointDir(String.valueOf(sparkContext.getCheckpointDir()));
}
setSession(s);
return s;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;

import zingg.common.client.IZingg;
import org.junit.jupiter.api.extension.ExtendWith;
import zingg.common.client.ZinggClientException;
import zingg.common.client.util.DFObjectUtil;
import zingg.common.client.util.IWithSession;
import zingg.common.client.util.WithSession;
import zingg.common.core.executor.TestExecutorsCompound;
import zingg.common.core.executor.TrainMatcher;
import zingg.spark.client.util.SparkDFObjectUtil;
import zingg.spark.core.TestSparkBase;
import zingg.spark.core.context.ZinggSparkContext;
import zingg.spark.core.executor.labeller.ProgrammaticSparkLabeller;
import zingg.spark.core.executor.validate.SparkTrainMatchValidator;

@ExtendWith(TestSparkBase.class)
public class TestSparkExecutorsCompound extends TestExecutorsCompound<SparkSession,Dataset<Row>,Row,Column,DataType> {
protected static final String CONFIG_FILE = "zingg/spark/core/executor/configSparkIntTest.json";
protected static final String TEST_DATA_FILE = "zingg/spark/core/executor/test.csv";
Expand All @@ -31,22 +32,11 @@ public class TestSparkExecutorsCompound extends TestExecutorsCompound<SparkSessi

protected ZinggSparkContext ctx;

public TestSparkExecutorsCompound() throws IOException, ZinggClientException {
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("Zingg" + "Junit")
.getOrCreate();

JavaSparkContext ctx1 = new JavaSparkContext(spark.sparkContext());
JavaSparkContext.jarOfClass(IZingg.class);
ctx1.setCheckpointDir("/tmp/checkpoint");

public TestSparkExecutorsCompound(SparkSession sparkSession) throws IOException, ZinggClientException {
this.ctx = new ZinggSparkContext();
this.ctx.setSession(spark);
this.ctx.setSession(sparkSession);
this.ctx.setUtils();
init(spark);
//setupArgs();
init(sparkSession);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import zingg.common.client.Arguments;
Expand All @@ -28,9 +29,15 @@ private void initializeSession() {
.appName("ZinggJunit")
.config("spark.debug.maxToStringFields", 100)
.getOrCreate();
javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
SparkContext sparkContext = sparkSession.sparkContext();
if (sparkContext.getCheckpointDir().isEmpty()) {
sparkContext.setCheckpointDir("/tmp/checkpoint");
}
javaSparkContext = new JavaSparkContext(sparkContext);
JavaSparkContext.jarOfClass(IZingg.class);
javaSparkContext.setCheckpointDir("/tmp/checkpoint");
if (!javaSparkContext.getCheckpointDir().isPresent()) {
javaSparkContext.setCheckpointDir(String.valueOf(sparkContext.getCheckpointDir()));
}
args = new Arguments();
zinggSparkContext = new ZinggSparkContext();
zinggSparkContext.init(sparkSession);
Expand Down

0 comments on commit 1f2d833

Please sign in to comment.