Skip to content

Commit

Permalink
commenct
Browse files Browse the repository at this point in the history
  • Loading branch information
laglangyue committed Aug 17, 2023
1 parent 05ed58e commit 2378e77
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public MicroBatchReader createMicroBatchReader(
// 创建微批 读取器,spark streaming,即将被删除
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1);
// checkpoint
// checkpoint 配置
Integer checkpointInterval =
options.getInt(
EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

/**
* 一个分区读取器
*/
public class MicroBatchPartition implements InputPartition<InternalRow> {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 这个类只处理任务状态,真正的读取在父类
*/
public class ParallelMicroBatchPartitionReader extends ParallelBatchPartitionReader {
protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
protected static final Integer CHECKPOINT_RETRIES = 3;
Expand Down Expand Up @@ -91,6 +94,7 @@ protected void prepare() {

protected FileSystem getFileSystem()
throws URISyntaxException, IOException, InterruptedException {
// 状态读取
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsRoot);
if (StringUtils.isNotBlank(hdfsUser)) {
Expand Down

0 comments on commit 2378e77

Please sign in to comment.