Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data #8753

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.common;

import org.apache.hudi.common.model.HoodieCommitMetadata;

/**
* Extra metadata key of {@link HoodieCommitMetadata}.
*/
public enum HoodieExtraMetadata {

/**
* Watermark extra metadata that represents the current watermark of commit metadata.
*/
WATERMARK("watermark");

private final String key;

HoodieExtraMetadata(String key) {
this.key = key;
}

/**
* Getter for key.
*
* @return string form of {@link HoodieExtraMetadata}.
*/
public String key() {
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ private boolean flushBucket(DataBucket bucket) {
.writeStatus(writeStatus)
.lastBatch(false)
.endInput(false)
.watermark(getWatermark())
.build();

this.eventGateway.sendEventToCoordinator(event);
Expand Down Expand Up @@ -484,6 +485,7 @@ private void flushRemaining(boolean endInput) {
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
.watermark(getWatermark())
.build();

this.eventGateway.sendEventToCoordinator(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieExtraMetadata;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand All @@ -50,6 +52,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,6 +72,7 @@
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.Option.fromJavaOptional;
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;

/**
Expand Down Expand Up @@ -552,6 +556,16 @@ private void doCommit(String instant, List<WriteStatus> writeResults) {
+ totalErrorRecords + "/" + totalRecords);
}

// Does not advance watermark for commit on empty batch.
// Meanwhile, advance watermark for subtasks in which no data has written.
final long watermark = fromJavaOptional(Arrays.stream(eventBuffer)
.filter(event -> Objects.nonNull(event)
&& (OptionsResolver.allowCommitOnEmptyBatch(conf)
|| CollectionUtils.nonEmpty(event.getWriteStatuses())))
.map(WriteMetadataEvent::getWatermark)
.min((o1, o2) -> (int) (o1 - o2)))
.orElse(Watermark.UNINITIALIZED.getTimestamp());
checkpointCommitMetadata.put(HoodieExtraMetadata.WATERMARK.key(), String.valueOf(watermark));
final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite
? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
: Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private void flushData(boolean endInput) {
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
.watermark(getWatermark())
.build();
this.eventGateway.sendEventToCoordinator(event);
// nullify the write helper for next ckp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -99,6 +100,11 @@ public class BulkInsertWriteFunction<I>
*/
private CkpMetadata ckpMetadata;

/**
* Watermark for the current checkpoint.
*/
private Watermark watermark;

/**
* Constructs a StreamingSinkFunction.
*
Expand All @@ -124,6 +130,11 @@ public void processElement(I value, Context ctx, Collector<Object> out) throws I
this.writerHelper.write((RowData) value);
}

@Override
public void processWatermark(Watermark watermark) {
this.watermark = watermark;
}

@Override
public void close() {
if (this.writeClient != null) {
Expand All @@ -143,6 +154,7 @@ public void endInput() {
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(true)
.watermark(getWatermark())
.build();
this.eventGateway.sendEventToCoordinator(event);
}
Expand Down Expand Up @@ -208,4 +220,11 @@ private String instantToWrite() {
}
return instant;
}

/**
* Returns the watermark for the current checkpoint.
*/
public long getWatermark() {
return watermark == null ? Watermark.UNINITIALIZED.getTimestamp() : watermark.getTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -127,6 +128,11 @@ public abstract class AbstractStreamWriteFunction<I>
*/
private transient boolean inputEnded;

/**
* Watermark for the current checkpoint.
*/
protected Watermark watermark;

/**
* Constructs a StreamWriteFunctionBase.
*
Expand Down Expand Up @@ -159,6 +165,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
this.confirming = true;
}

@Override
public void processWatermark(Watermark watermark) {
this.watermark = watermark;
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (inputEnded) {
Expand Down Expand Up @@ -238,6 +249,7 @@ private void reloadWriteMetaState() throws Exception {
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.bootstrap(true)
.watermark(getWatermark())
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
Expand Down Expand Up @@ -294,4 +306,11 @@ protected String instantToWrite(boolean hasData) {
private boolean invalidInstant(String instant, boolean hasData) {
return instant.equals(this.currentInstant) && hasData;
}

/**
* Returns the watermark for the current checkpoint.
*/
public long getWatermark() {
return watermark == null ? Watermark.UNINITIALIZED.getTimestamp() : watermark.getTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.watermark.Watermark;

/**
* Base class for write function.
Expand All @@ -45,4 +46,10 @@ public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object
* @param event The event
*/
public abstract void handleOperatorEvent(OperatorEvent event);

/**
* Processes the given {@link Watermark}.
* @param watermark The watermark to process.
*/
public abstract void processWatermark(Watermark watermark);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;

/**
* Base class for write operator.
Expand Down Expand Up @@ -52,4 +53,10 @@ public void endInput() {
public void handleOperatorEvent(OperatorEvent evt) {
this.function.handleOperatorEvent(evt);
}

@Override
public void processWatermark(Watermark watermark) throws Exception {
super.processWatermark(watermark);
this.function.processWatermark(watermark);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.util.ValidationUtils;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -54,6 +55,11 @@ public class WriteMetadataEvent implements OperatorEvent {
*/
private boolean bootstrap;

/**
* Watermark telling the progress in event time of a write function.
*/
private long watermark;

/**
* Creates an event.
*
Expand All @@ -65,20 +71,23 @@ public class WriteMetadataEvent implements OperatorEvent {
* if true, the whole data set of the checkpoint
* has been flushed successfully
* @param bootstrap Whether the event comes from the bootstrap
* @param watermark The progress in event time
*/
private WriteMetadataEvent(
int taskID,
String instantTime,
List<WriteStatus> writeStatuses,
boolean lastBatch,
boolean endInput,
boolean bootstrap) {
boolean bootstrap,
long watermark) {
this.taskID = taskID;
this.instantTime = instantTime;
this.writeStatuses = new ArrayList<>(writeStatuses);
this.lastBatch = lastBatch;
this.endInput = endInput;
this.bootstrap = bootstrap;
this.watermark = watermark;
}

// default constructor for efficient serialization
Expand Down Expand Up @@ -140,6 +149,14 @@ public void setLastBatch(boolean lastBatch) {
this.lastBatch = lastBatch;
}

public long getWatermark() {
return watermark;
}

public void setWatermark(long watermark) {
this.watermark = watermark;
}

/**
* Merges this event with given {@link WriteMetadataEvent} {@code other}.
*
Expand Down Expand Up @@ -172,6 +189,7 @@ public String toString() {
+ ", lastBatch=" + lastBatch
+ ", endInput=" + endInput
+ ", bootstrap=" + bootstrap
+ ", watermark=" + watermark
+ '}';
}

Expand Down Expand Up @@ -208,12 +226,13 @@ public static class Builder {
private boolean lastBatch = false;
private boolean endInput = false;
private boolean bootstrap = false;
private long watermark = Watermark.UNINITIALIZED.getTimestamp();

public WriteMetadataEvent build() {
Objects.requireNonNull(taskID);
Objects.requireNonNull(instantTime);
Objects.requireNonNull(writeStatus);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap, watermark);
}

public Builder taskID(int taskID) {
Expand Down Expand Up @@ -245,5 +264,10 @@ public Builder bootstrap(boolean bootstrap) {
this.bootstrap = bootstrap;
return this;
}

public Builder watermark(long watermark) {
this.watermark = watermark;
return this;
}
}
}
Loading