Skip to content

Commit

Permalink
[HUDI-5095] Flink: Stores a special watermark(flag) to identify the c…
Browse files Browse the repository at this point in the history
…urrent progress of writing data
  • Loading branch information
SteNicholas committed May 18, 2023
1 parent 213940a commit b4fe07f
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.common;

import org.apache.hudi.common.model.HoodieCommitMetadata;

/**
* Extra metadata key of {@link HoodieCommitMetadata}.
*/
public enum HoodieExtraMetadata {

/**
* Watermark extra metadata that represents the current watermark of commit metadata.
*/
WATERMARK("watermark");

private final String key;

HoodieExtraMetadata(String key) {
this.key = key;
}

/**
* Getter for key.
*
* @return string form of {@link HoodieExtraMetadata}.
*/
public String key() {
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ private boolean flushBucket(DataBucket bucket) {
.writeStatus(writeStatus)
.lastBatch(false)
.endInput(false)
.watermark(getWatermark())
.build();

this.eventGateway.sendEventToCoordinator(event);
Expand Down Expand Up @@ -484,6 +485,7 @@ private void flushRemaining(boolean endInput) {
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
.watermark(getWatermark())
.build();

this.eventGateway.sendEventToCoordinator(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieExtraMetadata;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand All @@ -50,6 +52,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,6 +72,7 @@
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.Option.fromJavaOptional;
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;

/**
Expand Down Expand Up @@ -552,6 +556,16 @@ private void doCommit(String instant, List<WriteStatus> writeResults) {
+ totalErrorRecords + "/" + totalRecords);
}

// Does not advance watermark for commit on empty batch.
// Meanwhile, advance watermark for subtasks in which no data has written.
final long watermark = fromJavaOptional(Arrays.stream(eventBuffer)
.filter(event -> Objects.nonNull(event)
&& (OptionsResolver.allowCommitOnEmptyBatch(conf)
|| CollectionUtils.nonEmpty(event.getWriteStatuses())))
.map(WriteMetadataEvent::getWatermark)
.min((o1, o2) -> (int) (o1 - o2)))
.orElse(Watermark.UNINITIALIZED.getTimestamp());
checkpointCommitMetadata.put(HoodieExtraMetadata.WATERMARK.key(), String.valueOf(watermark));
final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite
? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
: Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private void flushData(boolean endInput) {
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
.watermark(getWatermark())
.build();
this.eventGateway.sendEventToCoordinator(event);
// nullify the write helper for next ckp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -99,6 +100,11 @@ public class BulkInsertWriteFunction<I>
*/
private CkpMetadata ckpMetadata;

/**
* Watermark for the current checkpoint.
*/
private Watermark watermark;

/**
* Constructs a StreamingSinkFunction.
*
Expand All @@ -124,6 +130,11 @@ public void processElement(I value, Context ctx, Collector<Object> out) throws I
this.writerHelper.write((RowData) value);
}

@Override
public void processWatermark(Watermark watermark) {
this.watermark = watermark;
}

@Override
public void close() {
if (this.writeClient != null) {
Expand All @@ -143,6 +154,7 @@ public void endInput() {
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(true)
.watermark(getWatermark())
.build();
this.eventGateway.sendEventToCoordinator(event);
}
Expand Down Expand Up @@ -208,4 +220,11 @@ private String instantToWrite() {
}
return instant;
}

/**
* Returns the watermark for the current checkpoint.
*/
public long getWatermark() {
return watermark == null ? Watermark.UNINITIALIZED.getTimestamp() : watermark.getTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -127,6 +128,11 @@ public abstract class AbstractStreamWriteFunction<I>
*/
private transient boolean inputEnded;

/**
* Watermark for the current checkpoint.
*/
protected Watermark watermark;

/**
* Constructs a StreamWriteFunctionBase.
*
Expand Down Expand Up @@ -159,6 +165,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
this.confirming = true;
}

@Override
public void processWatermark(Watermark watermark) {
this.watermark = watermark;
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (inputEnded) {
Expand Down Expand Up @@ -238,6 +249,7 @@ private void reloadWriteMetaState() throws Exception {
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.bootstrap(true)
.watermark(getWatermark())
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
Expand Down Expand Up @@ -294,4 +306,11 @@ protected String instantToWrite(boolean hasData) {
private boolean invalidInstant(String instant, boolean hasData) {
return instant.equals(this.currentInstant) && hasData;
}

/**
* Returns the watermark for the current checkpoint.
*/
public long getWatermark() {
return watermark == null ? Watermark.UNINITIALIZED.getTimestamp() : watermark.getTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.watermark.Watermark;

/**
* Base class for write function.
Expand All @@ -45,4 +46,10 @@ public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object
* @param event The event
*/
public abstract void handleOperatorEvent(OperatorEvent event);

/**
* Processes the given {@link Watermark}.
* @param watermark The watermark to process.
*/
public abstract void processWatermark(Watermark watermark);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;

/**
* Base class for write operator.
Expand Down Expand Up @@ -52,4 +53,10 @@ public void endInput() {
public void handleOperatorEvent(OperatorEvent evt) {
this.function.handleOperatorEvent(evt);
}

@Override
public void processWatermark(Watermark watermark) throws Exception {
super.processWatermark(watermark);
this.function.processWatermark(watermark);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.util.ValidationUtils;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -54,6 +55,11 @@ public class WriteMetadataEvent implements OperatorEvent {
*/
private boolean bootstrap;

/**
* Watermark telling the progress in event time of a write function.
*/
private long watermark;

/**
* Creates an event.
*
Expand All @@ -65,20 +71,23 @@ public class WriteMetadataEvent implements OperatorEvent {
* if true, the whole data set of the checkpoint
* has been flushed successfully
* @param bootstrap Whether the event comes from the bootstrap
* @param watermark The progress in event time
*/
private WriteMetadataEvent(
int taskID,
String instantTime,
List<WriteStatus> writeStatuses,
boolean lastBatch,
boolean endInput,
boolean bootstrap) {
boolean bootstrap,
long watermark) {
this.taskID = taskID;
this.instantTime = instantTime;
this.writeStatuses = new ArrayList<>(writeStatuses);
this.lastBatch = lastBatch;
this.endInput = endInput;
this.bootstrap = bootstrap;
this.watermark = watermark;
}

// default constructor for efficient serialization
Expand Down Expand Up @@ -140,6 +149,14 @@ public void setLastBatch(boolean lastBatch) {
this.lastBatch = lastBatch;
}

public long getWatermark() {
return watermark;
}

public void setWatermark(long watermark) {
this.watermark = watermark;
}

/**
* Merges this event with given {@link WriteMetadataEvent} {@code other}.
*
Expand All @@ -165,14 +182,15 @@ public boolean isReady(String currentInstant) {

@Override
public String toString() {
return "WriteMetadataEvent{"
+ "writeStatusesSize=" + writeStatuses.size()
+ ", taskID=" + taskID
+ ", instantTime='" + instantTime + '\''
+ ", lastBatch=" + lastBatch
+ ", endInput=" + endInput
+ ", bootstrap=" + bootstrap
+ '}';
return "WriteMetadataEvent{" +
"writeStatuses=" + writeStatuses +
", taskID=" + taskID +
", instantTime='" + instantTime + '\'' +
", lastBatch=" + lastBatch +
", endInput=" + endInput +
", bootstrap=" + bootstrap +
", watermark=" + watermark +
'}';
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -208,12 +226,13 @@ public static class Builder {
private boolean lastBatch = false;
private boolean endInput = false;
private boolean bootstrap = false;
private long watermark = Watermark.UNINITIALIZED.getTimestamp();

public WriteMetadataEvent build() {
Objects.requireNonNull(taskID);
Objects.requireNonNull(instantTime);
Objects.requireNonNull(writeStatus);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap, watermark);
}

public Builder taskID(int taskID) {
Expand Down Expand Up @@ -245,5 +264,10 @@ public Builder bootstrap(boolean bootstrap) {
this.bootstrap = bootstrap;
return this;
}

public Builder watermark(long watermark) {
this.watermark = watermark;
return this;
}
}
}
Loading

0 comments on commit b4fe07f

Please sign in to comment.