Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aaa4c60
Flink-CDC checkin 2
voonhous Aug 8, 2025
dac5e01
Checkpoint 34 - Fix checkstyle and RAT
voonhous Oct 13, 2025
46d90fd
Checkpoint 35 - Remove unused code
voonhous Oct 13, 2025
c236bb4
Checkpoint 36 - Add restore with checkpoint test
voonhous Oct 22, 2025
43f4844
Checkpoint 36 - Fix spotless
voonhous Oct 22, 2025
d5cdb75
Checkpoint 37 - Fix spotless and import errors
voonhous Oct 22, 2025
76722f7
Checkpoint 38 - Start-stop-checkpoint fix
voonhous Oct 22, 2025
aeb4f3b
Checkpoint 39 - Enable MDT
voonhous Oct 23, 2025
b3f38bc
Checkpoint 40 - Remove MDT configs
voonhous Oct 23, 2025
be3f4d2
Checkpoint 41 - Fix testSyncWholeDb
voonhous Oct 23, 2025
de2f698
Checkpoint 42 - Add compaction scheduling support
voonhous Oct 24, 2025
a62aeb5
Checkpoint 43 - Change compaction to be event driven
voonhous Oct 28, 2025
410be89
Checkpoint 44 - Remove reflection call
voonhous Nov 3, 2025
e6460d8
Checkpoint 45 - Change partitioning logic to avoid skew
voonhous Nov 3, 2025
70600b2
Checkpoint 45 - Update naming convention to reduce confusion.
voonhous Nov 3, 2025
ed733aa
Checkpoint 46 - Add partition path extractor
voonhous Nov 3, 2025
6679b9e
Checkpoint 47 - Use HoodieFlinkInternalRow
voonhous Nov 3, 2025
bccb947
Checkpoint 48 - Fix partitioning issue for non-partitioned tables
voonhous Nov 4, 2025
26dadf0
Checkpoint 49 - Use RowDataKeyGen
voonhous Nov 4, 2025
41aa916
Checkpoint 50 - Remove code duplication via overloading
voonhous Nov 4, 2025
8a96e23
Checkpoint 51 - Refactor and remove Event*Functions
voonhous Nov 5, 2025
7132370
Checkpoint 52 - Use RowDataKeyGen implementations of RowDataUtils hel…
voonhous Nov 5, 2025
19568cf
Checkpoint 53 - Fix checkstyle issues
voonhous Nov 5, 2025
dc04d27
Checkpoint 54 - Remove reflection usage
voonhous Nov 5, 2025
b5400c0
Checkpoint 55 - Address comments
voonhous Nov 6, 2025
c527244
Checkpoint 56 - Address comments 2
voonhous Nov 6, 2025
5559469
Checkpoint 57 - Remove manual embedded timeline server management
voonhous Nov 7, 2025
13a2921
Checkpoint 58 - Remove unnecessary changes
voonhous Nov 7, 2025
2712125
Checkpoint 59 - Bump hudi version to 1.1.0
voonhous Nov 21, 2025
db61821
Checkpoint 60 - Fix java8 usage of Optional from isEmpty to !isPresent
voonhous Nov 21, 2025
e566daf
Checkpoint 61 - fix table initialization
cshuo Nov 24, 2025
314b7ef
fix style
cshuo Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.hudi.sink;

import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;

import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.configuration.FlinkOptions;

/**
* A utility class that holds all the configuration options for the Hudi sink. It wraps Hudi's
* {@link FlinkOptions} to provide a consistent interface within the CDC framework, using helper
* methods to reduce boilerplate.
*/
public class HudiConfig {

// ----- Helper Methods for Option Creation -----

private static ConfigOption<String> stringOption(String key, Description description) {
return ConfigOptions.key(key)
.stringType()
.noDefaultValue()
.withDescription(description.toString());
}

private static ConfigOption<String> stringOption(
String key, String defaultValue, Description description) {
return ConfigOptions.key(key)
.stringType()
.defaultValue(defaultValue)
.withDescription(description.toString());
}

private static ConfigOption<Integer> intOption(String key, Description description) {
return ConfigOptions.key(key)
.intType()
.noDefaultValue()
.withDescription(description.toString());
}

private static ConfigOption<Boolean> booleanOption(
String key, boolean defaultValue, Description description) {
return ConfigOptions.key(key)
.booleanType()
.defaultValue(defaultValue)
.withDescription(description.toString());
}

// ----- Public Configuration Options -----

// Core Hudi Options
public static final ConfigOption<String> PATH =
stringOption(FlinkOptions.PATH.key(), FlinkOptions.PATH.description());

// public static final ConfigOption<String> TABLE_TYPE =
// stringOption(
// FlinkOptions.TABLE_TYPE.key(),
// FlinkOptions.TABLE_TYPE.defaultValue(),
// FlinkOptions.TABLE_TYPE.description());
public static final ConfigOption<String> TABLE_TYPE =
stringOption(
"hoodie.table.type",
FlinkOptions.TABLE_TYPE.defaultValue(),
FlinkOptions.TABLE_TYPE.description());

// Required Fields for CDC
public static final ConfigOption<String> RECORD_KEY_FIELD =
stringOption(
FlinkOptions.RECORD_KEY_FIELD.key(),
FlinkOptions.RECORD_KEY_FIELD.description());

public static final ConfigOption<String> ORDERING_FIELDS =
stringOption(
FlinkOptions.ORDERING_FIELDS.key(), FlinkOptions.ORDERING_FIELDS.description());

public static final ConfigOption<String> PARTITION_PATH_FIELD =
stringOption(
FlinkOptions.PARTITION_PATH_FIELD.key(),
"",
FlinkOptions.PARTITION_PATH_FIELD.description());

// Bucket Index Options
public static final ConfigOption<String> INDEX_TYPE =
stringOption(
FlinkOptions.INDEX_TYPE.key(), "BUCKET", FlinkOptions.INDEX_TYPE.description());

public static final ConfigOption<String> INDEX_BUCKET_TARGET =
stringOption(
FlinkOptions.INDEX_KEY_FIELD.key(), FlinkOptions.INDEX_KEY_FIELD.description());

public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS =
intOption(
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(),
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.description());

// Hive Sync Options
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED =
booleanOption(
FlinkOptions.HIVE_SYNC_ENABLED.key(),
false,
FlinkOptions.HIVE_SYNC_ENABLED.description());

public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS =
stringOption(
FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
FlinkOptions.HIVE_SYNC_METASTORE_URIS.description());

public static final ConfigOption<String> HIVE_SYNC_DB =
stringOption(FlinkOptions.HIVE_SYNC_DB.key(), FlinkOptions.HIVE_SYNC_DB.description());

public static final ConfigOption<String> HIVE_SYNC_TABLE =
stringOption(
FlinkOptions.HIVE_SYNC_TABLE.key(), FlinkOptions.HIVE_SYNC_TABLE.description());

public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
ConfigOptions.key("schema.operator.uid")
.stringType()
.defaultValue("schema-operator-uid")
.withDescription(
"A unique ID for the schema operator, used by the BucketAssignerOperator to create a SchemaEvolutionClient.");

public static final ConfigOption<String> TABLE_SCHEMA =
ConfigOptions.key("table.schema")
.stringType()
.noDefaultValue()
.withDescription("The table schema in JSON format for the Hudi table.");

public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS =
intOption(
FlinkOptions.BUCKET_ASSIGN_TASKS.key(),
FlinkOptions.BUCKET_ASSIGN_TASKS.description());

public static final ConfigOption<Integer> WRITE_TASKS =
intOption(FlinkOptions.WRITE_TASKS.key(), FlinkOptions.WRITE_TASKS.description());

public static final ConfigOption<Boolean> SCHEMA_ON_READ_ENABLE =
booleanOption(
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
false,
Description.builder().build());

public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS =
ConfigOptions.key("compaction.delta_commits")
.intType()
.defaultValue(5)
.withDescription(
"Max delta commits needed to trigger compaction, default 5 commits");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.hudi.sink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.hudi.sink.v2.HudiSink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.ZoneId;

/**
* A {@link DataSink} for Apache Hudi that provides the main entry point for the Flink CDC
* framework.
*/
public class HudiDataSink implements DataSink, Serializable {

private static final Logger LOG = LoggerFactory.getLogger(HudiDataSink.class);

private final Configuration config;

private final String schemaOperatorUid;

public HudiDataSink(Configuration config, String schemaOperatorUid) {
LOG.info("Creating HudiDataSink with universal configuration {}", config);
this.config = config;
this.schemaOperatorUid = schemaOperatorUid;
}

/** Provides the core sink implementation that handles the data flow of events. */
@Override
public EventSinkProvider getEventSinkProvider() {
LOG.info("Creating HudiDataSinkProvider with universal configuration {}", config);
// For CDC pipelines, we don't have a pre-configured schema since tables are created
// dynamically
// Instead, we use a multi-table sink that handles schema discovery and table creation

// Convert CDC configuration to Flink configuration for HoodieSink
org.apache.flink.configuration.Configuration flinkConfig = toFlinkConfig(config);

// Extract configuration options
java.util.Map<String, String> configMap = config.toMap();
boolean overwrite = "insert_overwrite".equals(configMap.get("write.operation"));
boolean isBounded = "BATCH".equals(configMap.get("execution.checkpointing.mode"));

// Create the HudiSink with multi-table support via wrapper pattern
// Use empty RowType since tables are created dynamically
HudiSink hudiSink = new HudiSink(flinkConfig, schemaOperatorUid, ZoneId.systemDefault());

return FlinkSinkProvider.of(hudiSink);
}

/**
* Provides the metadata applier. In our design, this has a passive role (e.g., logging), as
* transactional metadata operations are handled by the HudiCommitter.
*/
@Override
public MetadataApplier getMetadataApplier() {
return new HudiMetadataApplier(config);
}

/**
* Converts a {@link org.apache.flink.cdc.common.configuration.Configuration} to a {@link
* org.apache.flink.configuration.Configuration}.
*
* @param cdcConfig The input CDC configuration.
* @return A new Flink configuration containing the same key-value pairs.
*/
private static org.apache.flink.configuration.Configuration toFlinkConfig(
Configuration cdcConfig) {
final org.apache.flink.configuration.Configuration flinkConfig =
new org.apache.flink.configuration.Configuration();
if (cdcConfig != null) {
cdcConfig.toMap().forEach(flinkConfig::setString);
}
return flinkConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.hudi.sink;

import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.cdc.connectors.hudi.sink.HudiDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.hudi.sink.HudiDataSinkOptions.PREFIX_TABLE_PROPERTIES;

/**
* Factory for creating {@link HudiDataSink}. This class defines the configuration options and
* instantiates the sink by delegating option definitions to {@link HudiConfig}.
*/
public class HudiDataSinkFactory implements DataSinkFactory {

private static final Logger LOG = LoggerFactory.getLogger(HudiDataSinkFactory.class);

public static final String IDENTIFIER = "hudi";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public HudiDataSink createDataSink(Context context) {
LOG.info("Creating HudiDataSink for {}", context);

FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);

FactoryHelper.DefaultContext factoryContext = (FactoryHelper.DefaultContext) context;
Configuration config = factoryContext.getFactoryConfiguration();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some validity check here? like index type check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a validation to ensure that index is BUCKET as that is the only index we are supporting.


// Validate that only BUCKET index type is used
String indexType = config.get(HudiConfig.INDEX_TYPE);
if (indexType != null && !indexType.equalsIgnoreCase("BUCKET")) {
throw new IllegalArgumentException(
String.format(
"Unsupported index type '%s'. Currently only 'BUCKET' index type is supported. "
+ "Other index types (e.g., FLINK_STATE, BLOOM, SIMPLE) are not yet implemented "
+ "for multi-table CDC pipelines.",
indexType));
}

String schemaOperatorUid =
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);

return new HudiDataSink(config, schemaOperatorUid);
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HudiConfig.PATH);
options.add(HudiConfig.RECORD_KEY_FIELD);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HudiConfig.TABLE_TYPE);
options.add(HudiConfig.PARTITION_PATH_FIELD);
options.add(HudiConfig.INDEX_TYPE);
options.add(HudiConfig.INDEX_BUCKET_TARGET);
options.add(HudiConfig.HIVE_SYNC_ENABLED);
options.add(HudiConfig.HIVE_SYNC_METASTORE_URIS);
options.add(HudiConfig.HIVE_SYNC_DB);
options.add(HudiConfig.HIVE_SYNC_TABLE);

options.add(HudiConfig.WRITE_TASKS);
options.add(HudiConfig.BUCKET_ASSIGN_TASKS);
options.add(HudiConfig.SCHEMA_ON_READ_ENABLE);

// Compaction settings
options.add(HudiConfig.COMPACTION_DELTA_COMMITS);
return options;
}
}
Loading
Loading