Skip to content

Commit

Permalink
[spark] fix spark streaming task
Browse files Browse the repository at this point in the history
  • Loading branch information
laglangyue committed Jul 26, 2023
1 parent 32b7f2b commit da5ac41
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@

<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.2.4.0.version}</version>
<scope>${spark.scope}</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
Expand All @@ -43,13 +43,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.3.3.0.version}</version>
<scope>${spark.scope}</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.spark.execution;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.spark.SparkConf;

import java.util.HashMap;
import java.util.Map;

public class CheckpointConfig {

public static final String CHECKPOINT_LOCATION = "checkpointLocation";
public static final String ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled";
public static final String ASYNC_PROGRESS_CHECK_POINTING_INTERVAL =
"asyncProgressCheckpointingInterval";

private static final String DEFAULT_CHECKPOINT_LOCATION = "/tmp";
private static final long DEFAULT_CHECKPOINT_INTERVAL = 60000L;
private static final String KEY_CHECK_POINTING_INTERVAL = "checkpoint.interval";
private static final String KEY_TRIGGER_PROGRESS_INTERVAL = "triggerProcessingInterval";

/**
* checkpointLocation
*/
private String checkpointLocation;
/**
* async checkpoint for microBatch
*/
private boolean asyncProgressTrackingEnabled;
/**
* async checkpoint interval for microBatch, and asyncProgressTrackingEnabled should be true
* unit is minute
*/
private Long asyncProgressCheckpointingInterval;

/**
* micro batch processing data interval; <br>
* if asyncProgressTrackingEnabled = false, checkpointInterval = processingInterval, it's
* exactly-once <br>
* if asyncProgressTrackingEnabled = ture, it's at at-most-once <br>
*/
private Long triggerProcessingInterval;

public String getCheckpointLocation() {
return checkpointLocation;
}

public boolean asyncProgressTrackingEnabled() {
return asyncProgressTrackingEnabled;
}

public Long asyncProgressCheckpointingInterval() {
return asyncProgressCheckpointingInterval;
}

public Long getTriggerProcessingInterval() {
return triggerProcessingInterval;
}

public CheckpointConfig(Config env, SparkConf sparkConf) {
// checkpoint location
if (env.hasPath(CHECKPOINT_LOCATION)) {
this.checkpointLocation = env.getString(CHECKPOINT_LOCATION);
} else {
this.checkpointLocation =
sparkConf.get(CHECKPOINT_LOCATION, DEFAULT_CHECKPOINT_LOCATION);
}
// checkpoint interval default value
if (env.hasPath(KEY_CHECK_POINTING_INTERVAL)) {
this.triggerProcessingInterval = env.getLong(KEY_CHECK_POINTING_INTERVAL);
this.asyncProgressCheckpointingInterval = env.getLong(KEY_CHECK_POINTING_INTERVAL) / 60000;
} else {
this.triggerProcessingInterval = sparkConf.getLong(KEY_CHECK_POINTING_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL);
this.asyncProgressCheckpointingInterval = sparkConf.getLong(KEY_CHECK_POINTING_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL / 60000);
}
this.asyncProgressTrackingEnabled = false;

// process interval
if (env.hasPath(KEY_TRIGGER_PROGRESS_INTERVAL)) {
this.triggerProcessingInterval = env.getLong(KEY_TRIGGER_PROGRESS_INTERVAL);
}
// async
if (env.hasPath(ASYNC_PROGRESS_TRACKING_ENABLED)) {
this.asyncProgressTrackingEnabled = env.getBoolean(ASYNC_PROGRESS_TRACKING_ENABLED);
}
if (env.hasPath(ASYNC_PROGRESS_CHECK_POINTING_INTERVAL)) {
this.asyncProgressCheckpointingInterval = env.getLong(ASYNC_PROGRESS_CHECK_POINTING_INTERVAL) / 60000;
}
}


public Map<String, String> microBatchConf() {
// its supported Trigger.processTime(CheckpointInterval)
HashMap<String, String> checkpointConf = new HashMap<>();
checkpointConf.put(CHECKPOINT_LOCATION, checkpointLocation);
if (asyncProgressTrackingEnabled) {
checkpointConf.put(ASYNC_PROGRESS_TRACKING_ENABLED, Boolean.TRUE.toString());
checkpointConf.put(ASYNC_PROGRESS_CHECK_POINTING_INTERVAL, asyncProgressCheckpointingInterval.toString());
}
return checkpointConf;

}

public Map<String, String> continuousConf() {
// todo: support continuous streaming
HashMap<String, String> checkpointConf = new HashMap<>();
checkpointConf.put(CHECKPOINT_LOCATION, checkpointLocation);
return checkpointConf;
}

public Map<String, String> batchConf() {
HashMap<String, String> checkpointConf = new HashMap<>();
checkpointConf.put(CHECKPOINT_LOCATION, checkpointLocation);
return checkpointConf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
Expand All @@ -35,28 +36,33 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

import com.google.common.collect.Lists;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>> {
extends SparkAbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>> {

private static final String PLUGIN_TYPE = PluginType.SINK.getType();

protected SinkExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
JobContext jobContext,
List<? extends Config> pluginConfigs) {
SparkRuntimeEnvironment sparkRuntimeEnvironment,
JobContext jobContext,
List<? extends Config> pluginConfigs) {
super(sparkRuntimeEnvironment, jobContext, pluginConfigs);
}

@Override
protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(
List<? extends Config> pluginConfigs) {
List<? extends Config> pluginConfigs) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
List<SeaTunnelSink<?, ?, ?, ?>> sinks =
Expand Down Expand Up @@ -86,39 +92,66 @@ protected SinkExecuteProcessor(

@Override
public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
throws TaskExecuteException {
throws TaskExecuteException {
Dataset<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset =
fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input);
fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input);
int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
} else {
parallelism =
sparkRuntimeEnvironment
.getSparkConf()
.getInt(
CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
sparkRuntimeEnvironment
.getSparkConf()
.getInt(
CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo(
(SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
(SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();
if (jobContext.getJobMode() == JobMode.BATCH) {
batch(dataset, seaTunnelSink, parallelism);
} else {
streaming(dataset, seaTunnelSink, parallelism);
}
}
// the sink is the last stream
return null;
}

private void batch(Dataset<Row> dataset, SeaTunnelSink<?, ?, ?, ?> seaTunnelSink, int parallelism) {
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism);
SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
.options(sparkRuntimeEnvironment.getCheckpointConfig().batchConf())
.mode(SaveMode.Append)
.save();
}

private void streaming(Dataset<Row> dataset, SeaTunnelSink<?, ?, ?, ?> seaTunnelSink, int parallelism) {
dataset.sparkSession().readStream().option(CommonOptions.PARALLELISM.key(), parallelism);
try {
SparkSinkInjector.inject(dataset.writeStream(), seaTunnelSink)
.outputMode(OutputMode.Append())
.options(sparkRuntimeEnvironment.getCheckpointConfig().microBatchConf())
.trigger(
Trigger.ProcessingTime(
sparkRuntimeEnvironment
.getCheckpointConfig()
.getTriggerProcessingInterval()))
.start()
.awaitTermination();
} catch (StreamingQueryException | TimeoutException e) {
throw new RuntimeException(e);
} finally {
dataset.sparkSession().close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
Expand Down Expand Up @@ -68,19 +69,12 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
.option(CommonOptions.PARALLELISM.key(), parallelism)
.option(
Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema(
(StructType)
TypeConverterUtils.convert(source.getProducedType()))
.load();
Dataset<Row> dataset;
if (jobContext.getJobMode() == JobMode.BATCH) {
dataset = batch(parallelism, source);
} else {
dataset = streaming(parallelism, source);
}
sources.add(dataset);
registerInputTempView(pluginConfigs.get(i), dataset);
}
Expand Down Expand Up @@ -108,4 +102,26 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
return sources;
}

private Dataset<Row> batch(int parallelism, SeaTunnelSource<?, ?, ?> source) {
return sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
.option(CommonOptions.PARALLELISM.key(), parallelism)
.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
.schema((StructType) TypeConverterUtils.convert(source.getProducedType()))
.load();
}

private Dataset<Row> streaming(int parallelism, SeaTunnelSource<?, ?, ?> source) {
return sparkRuntimeEnvironment
.getSparkSession()
.readStream()
.format(SeaTunnelSource.class.getSimpleName())
.option(CommonOptions.PARALLELISM.key(), parallelism)
.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
.schema((StructType) TypeConverterUtils.convert(source.getProducedType()))
.load();
}
}
Loading

0 comments on commit da5ac41

Please sign in to comment.