Skip to content

Commit 2ac5c43

Browse files
authored
Flink: make FLIP-27 default in SQL and mark the old FlinkSource as deprecated (#11345)
1 parent fd06438 commit 2ac5c43

File tree

8 files changed

+35
-10
lines changed

8 files changed

+35
-10
lines changed

docs/docs/flink-queries.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,17 @@ There are some options that could be set in Flink SQL hint options for streaming
6666

6767
### FLIP-27 source for SQL
6868

69-
Here are the SQL settings for the [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) source. All other SQL settings and options documented above are applicable to the FLIP-27 source.
69+
Here is the SQL setting to opt in or out of the
70+
[FLIP-27 source](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface).
7071

7172
```sql
72-
-- Opt in the FLIP-27 source. Default is false.
73-
SET table.exec.iceberg.use-flip27-source = true;
73+
-- Opt out the FLIP-27 source.
74+
-- Default is false for Flink 1.19 and below, and true for Flink 1.20 and above.
75+
SET table.exec.iceberg.use-flip27-source = false;
7476
```
7577

78+
All other SQL settings and options documented above are applicable to the FLIP-27 source.
79+
7680
### Reading branches and tags with SQL
7781
Branch and tags can be read via SQL by specifying options. For more details
7882
refer to [Flink Configuration](flink-configuration.md#read-options)

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.configuration.ReadableConfig;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
3132
import org.apache.flink.table.api.TableSchema;
3233
import org.apache.flink.table.data.RowData;
3334
import org.apache.iceberg.Schema;
@@ -46,6 +47,14 @@
4647
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4748
import org.apache.iceberg.util.PropertyUtil;
4849

50+
/**
51+
* Flink source builder for old {@link SourceFunction} implementation.
52+
*
53+
* @deprecated since 1.7.0, will be removed in 2.0.0. Use {@link IcebergSource} instead, which
54+
* implement the newer FLIP-27 source interface. This class implements the old {@link
55+
* SourceFunction} that has been marked as deprecated in Flink since Aug 2023.
56+
*/
57+
@Deprecated
4958
public class FlinkSource {
5059
private FlinkSource() {}
5160

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.TimeUnit;
2929
import javax.annotation.Nullable;
30-
import org.apache.flink.annotation.Experimental;
3130
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
3231
import org.apache.flink.api.common.typeinfo.TypeInformation;
3332
import org.apache.flink.api.connector.source.Boundedness;
@@ -86,7 +85,6 @@
8685
import org.slf4j.Logger;
8786
import org.slf4j.LoggerFactory;
8887

89-
@Experimental
9088
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
9189
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
9290

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.configuration.ReadableConfig;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
3132
import org.apache.flink.table.api.TableSchema;
3233
import org.apache.flink.table.data.RowData;
3334
import org.apache.iceberg.Schema;
@@ -46,6 +47,14 @@
4647
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4748
import org.apache.iceberg.util.PropertyUtil;
4849

50+
/**
51+
* Flink source builder for old {@link SourceFunction} implementation.
52+
*
53+
* @deprecated since 1.7.0, will be removed in 2.0.0. Use {@link IcebergSource} instead, which
54+
* implement the newer FLIP-27 source interface. This class implements the old {@link
55+
* SourceFunction} that has been marked as deprecated in Flink since Aug 2023.
56+
*/
57+
@Deprecated
4958
public class FlinkSource {
5059
private FlinkSource() {}
5160

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.TimeUnit;
2929
import javax.annotation.Nullable;
30-
import org.apache.flink.annotation.Experimental;
3130
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
3231
import org.apache.flink.api.common.typeinfo.TypeInformation;
3332
import org.apache.flink.api.connector.source.Boundedness;
@@ -86,7 +85,6 @@
8685
import org.slf4j.Logger;
8786
import org.slf4j.LoggerFactory;
8887

89-
@Experimental
9088
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
9189
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
9290

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private FlinkConfigOptions() {}
8888
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE =
8989
ConfigOptions.key("table.exec.iceberg.use-flip27-source")
9090
.booleanType()
91-
.defaultValue(false)
91+
.defaultValue(true)
9292
.withDescription("Use the FLIP-27 based Iceberg source implementation.");
9393

9494
public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.configuration.ReadableConfig;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
3132
import org.apache.flink.table.api.TableSchema;
3233
import org.apache.flink.table.data.RowData;
3334
import org.apache.iceberg.Schema;
@@ -46,6 +47,14 @@
4647
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4748
import org.apache.iceberg.util.PropertyUtil;
4849

50+
/**
51+
* /** Flink source builder for old {@link SourceFunction} implementation.
52+
*
53+
* @deprecated since 1.7.0, will be removed in 2.0.0. Use {@link IcebergSource} instead, which
54+
* implement the newer FLIP-27 source interface. This class implements the old {@link
55+
* SourceFunction} that has been marked as deprecated in Flink since Aug 2023.
56+
*/
57+
@Deprecated
4958
public class FlinkSource {
5059
private FlinkSource() {}
5160

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.TimeUnit;
2929
import javax.annotation.Nullable;
30-
import org.apache.flink.annotation.Experimental;
3130
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
3231
import org.apache.flink.api.common.typeinfo.TypeInformation;
3332
import org.apache.flink.api.connector.source.Boundedness;
@@ -86,7 +85,6 @@
8685
import org.slf4j.Logger;
8786
import org.slf4j.LoggerFactory;
8887

89-
@Experimental
9088
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
9189
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
9290

0 commit comments

Comments
 (0)