Skip to content

Commit

Permalink
fix(spark-lineage): remove need for sparksession.stop call (#4940)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <shirshanka@apache.org>
  • Loading branch information
MugdhaHardikar-GSLab and shirshanka authored May 29, 2022
1 parent 96f923e commit 1479c35
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
#InsertIntoHadoopFsRelationCommand
df.write.mode('overwrite').csv(DATA_DIR + "/" + TEST_CASE_NAME+"/out.csv")

spark.sparkContext.stop()

spark.stop()



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -32,7 +31,6 @@
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;

import com.google.common.base.Splitter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;

import datahub.spark.consumer.impl.McpEmitter;
Expand All @@ -52,7 +50,6 @@
@Slf4j
public class DatahubSparkListener extends SparkListener {

private static final int THREAD_CNT = 16;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
public static final String DATAHUB_EMITTER = "mcpEmitter";
public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
Expand All @@ -61,15 +58,14 @@ public class DatahubSparkListener extends SparkListener {

private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
private final Map<String, Config> appConfig = new ConcurrentHashMap<>();

public DatahubSparkListener() {
log.info("DatahubSparkListener initialised.");
}

private class SqlStartTask implements Runnable {
private class SqlStartTask {

private final SparkListenerSQLExecutionStart sqlStart;
private final SparkContext ctx;
Expand All @@ -81,7 +77,6 @@ public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, S
this.ctx = ctx;
}

@Override
public void run() {
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
Expand Down Expand Up @@ -166,7 +161,7 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {

@Override
public Void apply(SparkContext sc) {
getOrCreateApplicationSetup(sc);
checkOrCreateApplicationSetup(sc);
return null;
}
});
Expand All @@ -190,7 +185,6 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
public Void apply(SparkContext sc) {
log.info("Application ended : {} {}", sc.appName(), sc.applicationId());
AppStartEvent start = appDetails.remove(sc.applicationId());
appPoolDetails.remove(sc.applicationId()).shutdown();
appSqlDetails.remove(sc.applicationId());
if (start == null) {
log.error("Application end event received, but start event missing for appId " + sc.applicationId());
Expand Down Expand Up @@ -274,9 +268,8 @@ public Void apply(SparkContext sc) {
}
});
}

private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) {


private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {
ExecutorService pool = null;
String appId = ctx.applicationId();
Config datahubConfig = appConfig.get(appId);
Expand All @@ -292,16 +285,7 @@ private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ct
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
appSqlDetails.put(appId, new ConcurrentHashMap<>());
pool = Executors.newFixedThreadPool(THREAD_CNT,
new ThreadFactoryBuilder().setNameFormat("datahub-emit-pool").build());
appPoolDetails.put(appId, pool);
log.debug("Execution thread pool initialised for {}", appId);
} else {
pool = appPoolDetails.get(appId);
}

return pool;

}
}

private String getPipelineName(SparkContext cx) {
Expand Down Expand Up @@ -329,8 +313,7 @@ private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
LogicalPlan plan = queryExec.optimizedPlan();
SparkSession sess = queryExec.sparkSession();
SparkContext ctx = sess.sparkContext();
ExecutorService pool = getOrCreateApplicationSetup(ctx);
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
(new SqlStartTask(sqlStart, plan, ctx)).run();
}

private List<LineageConsumer> consumers() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
package datahub.spark.consumer.impl;

import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

import com.typesafe.config.Config;

import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.client.rest.RestEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class McpEmitter implements LineageConsumer {

private final Optional<Emitter> emitter;
private String emitterType;
private Optional<RestEmitterConfig> restEmitterConfig;
private static final String TRANSPORT_KEY = "transport";
private static final String GMS_URL_KEY = "rest.server";
private static final String GMS_AUTH_TOKEN = "rest.token";

private Optional<Emitter> getEmitter() {
Optional<Emitter> emitter = Optional.empty();
switch (emitterType) {
case "rest":
if (restEmitterConfig.isPresent()) {
emitter = Optional.of(new RestEmitter(restEmitterConfig.get()));
}
break;

default:
log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType);
break;

}
return emitter;
}

private void emit(List<MetadataChangeProposalWrapper> mcpws) {
Optional<Emitter> emitter = getEmitter();
if (emitter.isPresent()) {
mcpws.stream().map(mcpw -> {
try {
Expand All @@ -42,11 +63,16 @@ private void emit(List<MetadataChangeProposalWrapper> mcpws) {
log.error("Failed to emit metadata to DataHub", e);
}
});
try {
emitter.get().close();
} catch (IOException e) {
log.error("Issue while closing emitter" + e);
}
}
}

public McpEmitter(Config datahubConf) {
String emitterType = datahubConf.hasPath(TRANSPORT_KEY) ? datahubConf.getString(TRANSPORT_KEY) : "rest";
emitterType = datahubConf.hasPath(TRANSPORT_KEY) ? datahubConf.getString(TRANSPORT_KEY) : "rest";
switch (emitterType) {
case "rest":
String gmsUrl = datahubConf.hasPath(GMS_URL_KEY) ? datahubConf.getString(GMS_URL_KEY)
Expand All @@ -57,10 +83,10 @@ public McpEmitter(Config datahubConf) {
if (token != null) {
log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)");
}
emitter = Optional.of(RestEmitter.create($ -> $.server(gmsUrl).token(token)));
restEmitterConfig = Optional.of(RestEmitterConfig.builder().server(gmsUrl).token(token).build());

break;
default:
emitter = Optional.empty();
log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType);
break;
}
Expand All @@ -73,8 +99,9 @@ public void accept(LineageEvent evt) {

@Override
public void close() throws IOException {
if (emitter.isPresent()) {
emitter.get().close();
}
// Nothing to close at this point

}
}


}

0 comments on commit 1479c35

Please sign in to comment.