Skip to content

Commit

Permalink
Flink Delta Source PR 12.1 - DeltaSource::option(...) value type conv…
Browse files Browse the repository at this point in the history
…ersion - bug fix and more tests (delta-io#365)

* PR ColumnsFromDeltaLog - get table schema from Delta Log.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR ColumnsFromDeltaLog - get table schema from Delta Log. Using SnapshotVersion in Enumerator factory.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 ColumnsFromDeltaLog_Tests - extra test for Delta table Schema discovery.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 - Partition support using Delta Log Metadata

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 - Added Delta - Flink - Delta type conversion test.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10 ColumnsFromDeltaLog - Prevent user to set internal options via source builder.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10 ColumnsFromDeltaLog - Changes after code review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 Adding tests

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 Adding tests

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 Partition Support

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 Option validation - Get BATCH_SIZE for Format builder from Source options. Add validation for option names and values.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 Option validation - Get BATCH_SIZE for Format builder from Source options. Add validation for option names and values.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 Added tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 Test fix after merge

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 10.1 cleanup.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 Fix after merge conflicts from master.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 Make RowDataFormat constructor package protected.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 Changes after Code review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 Fix compilation error after merge from base branch.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - Validation for Inapplicable Option Used + tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - Javadocs

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - Adding option type safety

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - Adding option type safety tests

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 11 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - test for options

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - Type conversion for Timestamp based options and adding tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - Change TODO's from PR 12 to PR 12.1

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12.1 - Option validation, throw DeltaSourceValidationException + tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12 - changes after code review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12.1 - Validation for option setting and more tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12.1 - Validation for option setting and more tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12.1 - Validation for set options - BugFixes, tests, changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR 12.1 - Changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
  • Loading branch information
kristoffSC and Krzysztof Chmielewski authored Jun 8, 2022
1 parent e151fde commit 153993d
Show file tree
Hide file tree
Showing 17 changed files with 770 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import java.util.HashMap;
import java.util.Map;

import io.delta.flink.source.internal.builder.BooleanOptionTypeConverter;
import io.delta.flink.source.internal.builder.DeltaConfigOption;
import io.delta.flink.source.internal.builder.NonNegativeNumberTypeConverter;
import io.delta.flink.source.internal.builder.StartingVersionOptionTypeConverter;
import io.delta.flink.source.internal.builder.TimestampOptionTypeConverter;
import org.apache.flink.configuration.ConfigOptions;

Expand Down Expand Up @@ -59,7 +62,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<Long> VERSION_AS_OF =
DeltaConfigOption.of(
ConfigOptions.key("versionAsOf").longType().noDefaultValue(),
Long.class);
Long.class,
new NonNegativeNumberTypeConverter<>());

/**
* An option that allow time travel to the latest {@link io.delta.standalone.Snapshot} that was
Expand Down Expand Up @@ -88,7 +92,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<String> STARTING_VERSION =
DeltaConfigOption.of(
ConfigOptions.key("startingVersion").stringType().noDefaultValue(),
String.class);
String.class,
new StartingVersionOptionTypeConverter());

/**
* An option used to read only changes from {@link io.delta.standalone.Snapshot} that was
Expand Down Expand Up @@ -117,7 +122,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<Long> UPDATE_CHECK_INTERVAL =
DeltaConfigOption.of(
ConfigOptions.key("updateCheckIntervalMillis").longType().defaultValue(5000L),
Long.class);
Long.class,
new NonNegativeNumberTypeConverter<>());

/**
* An option to specify initial delay (in milliseconds) for starting periodical Delta table
Expand All @@ -132,7 +138,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<Long> UPDATE_CHECK_INITIAL_DELAY =
DeltaConfigOption.of(
ConfigOptions.key("updateCheckDelayMillis").longType().defaultValue(1000L),
Long.class);
Long.class,
new NonNegativeNumberTypeConverter<>());

/**
* An option used to allow processing Delta table versions containing only {@link
Expand All @@ -149,7 +156,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<Boolean> IGNORE_DELETES =
DeltaConfigOption.of(
ConfigOptions.key("ignoreDeletes").booleanType().defaultValue(false),
Boolean.class);
Boolean.class,
new BooleanOptionTypeConverter());

/**
* An option used to allow processing Delta table versions containing both {@link
Expand All @@ -169,7 +177,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<Boolean> IGNORE_CHANGES =
DeltaConfigOption.of(
ConfigOptions.key("ignoreChanges").booleanType().defaultValue(false),
Boolean.class);
Boolean.class,
new BooleanOptionTypeConverter());

/**
* An option to set the number of rows read per Parquet Reader per batch from underlying Parquet
Expand All @@ -179,7 +188,8 @@ public class DeltaSourceOptions {
public static final DeltaConfigOption<Integer> PARQUET_BATCH_SIZE =
DeltaConfigOption.of(
ConfigOptions.key("parquetBatchSize").intType().defaultValue(2048),
Integer.class);
Integer.class,
new NonNegativeNumberTypeConverter<>());

// ----- INNER ONLY OPTIONS ----- //
// Inner options should not be set by user, and they are used internally by Flin connector.
Expand Down Expand Up @@ -211,7 +221,6 @@ public class DeltaSourceOptions {

// ----------------------------- //

// TODO PR 12.1 test all allowed options
static {
USER_FACING_SOURCE_OPTIONS.put(VERSION_AS_OF.key(), VERSION_AS_OF);
USER_FACING_SOURCE_OPTIONS.put(TIMESTAMP_AS_OF.key(), TIMESTAMP_AS_OF);
Expand All @@ -226,7 +235,6 @@ public class DeltaSourceOptions {
USER_FACING_SOURCE_OPTIONS.put(PARQUET_BATCH_SIZE.key(), PARQUET_BATCH_SIZE);
}

// TODO PR 12.1 test all allowed options
static {
INNER_SOURCE_OPTIONS.put(LOADED_SCHEMA_SNAPSHOT_VERSION.key(),
LOADED_SCHEMA_SNAPSHOT_VERSION);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.delta.flink.source.internal.builder;

public abstract class BaseOptionTypeConverter
implements OptionTypeConverter {
public abstract class BaseOptionTypeConverter<TYPE>
implements OptionTypeConverter<TYPE> {

/**
* Converts an Integer valueToConvert to desired type of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.delta.flink.source.internal.builder;

/**
* Implementation of {@link OptionTypeConverter} that validates values for
* {@link DeltaConfigOption} with type Boolean.
*/
public class BooleanOptionTypeConverter extends BaseOptionTypeConverter<Boolean> {

/**
* Converts String values for {@link DeltaConfigOption} with Boolean value type.
* Strings "true" and "false" will be converted to Boolean true and false values.
*
* @param desiredOption The {@link DeltaConfigOption} instance we want to do the conversion
* for.
* @param valueToConvert String value to convert.
* @return A String representing Boolean value for given {@code valueToConvert} parameter.
* @throws IllegalArgumentException in case of conversion failure.
*/
@Override
@SuppressWarnings("unchecked")
public <T> T convertType(DeltaConfigOption<T> desiredOption, String valueToConvert) {
Class<T> decoratedType = desiredOption.getValueType();
OptionType type = OptionType.instanceFrom(decoratedType);

if (type == OptionType.BOOLEAN) {

if ("true".equalsIgnoreCase(valueToConvert) ||
"false".equalsIgnoreCase(valueToConvert)) {
return (T) Boolean.valueOf(valueToConvert);
}

throw invalidValueException(desiredOption.key(), valueToConvert);
}

throw new IllegalArgumentException(
String.format(
"BooleanOptionTypeConverter used with a incompatible DeltaConfigOption "
+ "option type. This converter must be used only for "
+ "DeltaConfigOption::Boolean however it was used for '%s' with option '%s'",
desiredOption.getValueType(), desiredOption.key())
);
}

private IllegalArgumentException invalidValueException(
String optionName,
String valueToConvert) {
return new IllegalArgumentException(
String.format(
"Illegal value used for [%s] option. Expected values "
+ "\"true\" or \"false\" keywords (case insensitive) or boolean true,"
+ " false values. Used value was [%s]",
optionName, valueToConvert)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.delta.flink.source.internal.enumerator.BoundedSplitEnumeratorProvider;
import io.delta.flink.source.internal.enumerator.supplier.BoundedSnapshotSupplierFactory;
import io.delta.flink.source.internal.enumerator.supplier.TimestampFormatConverter;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import static io.delta.flink.source.internal.DeltaSourceOptions.PARQUET_BATCH_SIZE;
Expand Down Expand Up @@ -51,13 +50,12 @@ public BoundedDeltaSourceBuilder(
}

public SELF versionAsOf(long snapshotVersion) {
sourceConfiguration.addOption(VERSION_AS_OF, snapshotVersion);
tryToSetOption(() -> VERSION_AS_OF.setOnConfig(sourceConfiguration, snapshotVersion));
return self();
}

public SELF timestampAsOf(String snapshotTimestamp) {
long toTimestamp = TimestampFormatConverter.convertToTimestamp(snapshotTimestamp);
sourceConfiguration.addOption(TIMESTAMP_AS_OF, toTimestamp);
tryToSetOption(() -> TIMESTAMP_AS_OF.setOnConfig(sourceConfiguration, snapshotTimestamp));
return self();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.delta.flink.source.internal.enumerator.ContinuousSplitEnumeratorProvider;
import io.delta.flink.source.internal.enumerator.supplier.ContinuousSnapshotSupplierFactory;
import io.delta.flink.source.internal.enumerator.supplier.TimestampFormatConverter;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import static io.delta.flink.source.internal.DeltaSourceOptions.IGNORE_CHANGES;
Expand Down Expand Up @@ -60,33 +59,36 @@ public ContinuousDeltaSourceBuilder(
}

public SELF startingVersion(String startingVersion) {
sourceConfiguration.addOption(STARTING_VERSION, startingVersion);
tryToSetOption(() -> STARTING_VERSION.setOnConfig(sourceConfiguration, startingVersion));
return self();
}

public SELF startingVersion(long startingVersion) {
startingVersion(String.valueOf(startingVersion));
tryToSetOption(() -> STARTING_VERSION.setOnConfig(sourceConfiguration, startingVersion));
return self();
}

public SELF startingTimestamp(String startingTimestamp) {
long toTimestamp = TimestampFormatConverter.convertToTimestamp(startingTimestamp);
sourceConfiguration.addOption(STARTING_TIMESTAMP, toTimestamp);
tryToSetOption(
() -> STARTING_TIMESTAMP.setOnConfig(sourceConfiguration, startingTimestamp)
);
return self();
}

public SELF updateCheckIntervalMillis(long updateCheckInterval) {
sourceConfiguration.addOption(UPDATE_CHECK_INTERVAL, updateCheckInterval);
tryToSetOption(
() -> UPDATE_CHECK_INTERVAL.setOnConfig(sourceConfiguration, updateCheckInterval)
);
return self();
}

public SELF ignoreDeletes(boolean ignoreDeletes) {
sourceConfiguration.addOption(IGNORE_DELETES, ignoreDeletes);
tryToSetOption(() -> IGNORE_DELETES.setOnConfig(sourceConfiguration, ignoreDeletes));
return self();
}

public SELF ignoreChanges(boolean ignoreChanges) {
sourceConfiguration.addOption(IGNORE_CHANGES, ignoreChanges);
tryToSetOption(() -> IGNORE_CHANGES.setOnConfig(sourceConfiguration, ignoreChanges));
return self();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* An implementation of {@link OptionTypeConverter} interface to convert {@link DeltaConfigOption}
* values to desired {@link Class} type.
*/
public final class DefaultOptionTypeConverter extends BaseOptionTypeConverter {
public final class DefaultOptionTypeConverter extends BaseOptionTypeConverter<Object> {

private static final String TYPE_EXCEPTION_MSG = "Unsupported value type {%s] for option [%s]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,30 @@ public class DeltaConfigOption<T> {
/**
* Value type converter for this configuration option.
*/
private final OptionTypeConverter typeConverter;
private final OptionTypeConverter<T> typeConverter;

private DeltaConfigOption(
ConfigOption<T> decoratedOption,
Class<T> type,
OptionTypeConverter typeConverter) {
OptionTypeConverter<T> typeConverter) {
this.decoratedOption = decoratedOption;
this.decoratedType = type;
this.typeConverter = typeConverter;
}

@SuppressWarnings("unchecked")
public static <T> DeltaConfigOption<T> of(ConfigOption<T> configOption, Class<T> type) {
return new DeltaConfigOption<>(configOption, type, new DefaultOptionTypeConverter());
return new DeltaConfigOption<>(
configOption,
type,
(OptionTypeConverter<T>) new DefaultOptionTypeConverter()
);
}

public static <T> DeltaConfigOption<T> of(
ConfigOption<T> configOption,
Class<T> type,
OptionTypeConverter typeConverter) {
OptionTypeConverter<T> typeConverter) {
return new DeltaConfigOption<>(configOption, type, typeConverter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,54 @@ protected DeltaSourceBuilderBase(
* Sets a {@link List} of column names that should be read from Delta table.
*/
public SELF columnNames(List<String> columnNames) {
this.userColumnNames = columnNames;
tryToSetOption(() -> this.userColumnNames = columnNames);
return self();
}

/**
* Sets a configuration option.
*/
public SELF option(String optionName, String optionValue) {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
});
return self();
}

/**
* Sets a configuration option.
*/
public SELF option(String optionName, boolean optionValue) {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
});
return self();
}

/**
* Sets a configuration option.
*/
public SELF option(String optionName, int optionValue) {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
});
return self();
}

/**
* Sets a configuration option.
*/
public SELF option(String optionName, long optionValue) {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(sourceConfiguration, optionValue);
});
return self();
}

// TODO PR 12.1 test immutability
/**
* @return A copy of {@link DeltaSourceConfiguration} used by builder. The changes made on
* returned copy do not change the state of builder's configuration.
Expand Down Expand Up @@ -299,8 +306,36 @@ protected SourceSchema getSourceSchema() {
}
}

/**
* Try to set option on Builder configuration. The option's value conversion, validation and
* logic for adding it to builder's configuration is wrapped with {@link Executable}. If {@link
* Executable#execute()} call throws eny exception, this exception will be wrapped in {@link
* DeltaSourceValidationException} and re-throw.
*
* @param argument the {@link Executable} wrapping any logic for converting and setting {@link
* DeltaConfigOption} value.
*/
protected void tryToSetOption(Executable argument) {
try {
argument.execute();
} catch (Exception e) {
throw DeltaSourceExceptions.optionValidationException(
SourceUtils.pathToString(tablePath),
e
);
}
}

@SuppressWarnings("unchecked")
protected SELF self() {
return (SELF) this;
}

/**
* A functional interface to execute logic that takes no argument nor returns any value.
*/
@FunctionalInterface
protected interface Executable {
void execute();
}
}
Loading

0 comments on commit 153993d

Please sign in to comment.