diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java index c40b94cdb381d..a595055aa2fe6 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java @@ -85,11 +85,11 @@ public enum BackOffType { "Failure handling strategy in case a request to Elasticsearch fails") .list( text( - "\"fail\" (throws an exception if a request fails and thus causes a job failure),"), + "\"fail\" (throws an exception if a request fails and thus causes a job failure)"), text( - "\"ignore\" (ignores failures and drops the request),"), + "\"ignore\" (ignores failures and drops the request)"), text( - "\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation),"), + "\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation)"), text( "\"class name\" for failure handling with a ActionRequestFailureHandler subclass")) .build()); @@ -143,9 +143,8 @@ public enum BackOffType { .stringType() .defaultValue("json") .withDescription( - "Elasticsearch connector requires to specify a format.\n" - + "The format must produce a valid json document. \n" - + "By default uses built-in 'json' format. Please refer to Table Formats section for more details."); + "The format must produce a valid JSON document. " + + "Please refer to the documentation on formats for more details."); private ElasticsearchOptions() {} } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java index 02ae1d8e28a64..55054016eb1db 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -58,28 +58,28 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam ConfigOptions.key("url") .stringType() .noDefaultValue() - .withDescription("the jdbc database url."); + .withDescription("The JDBC database URL."); public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType() .noDefaultValue() - .withDescription("the jdbc table name."); + .withDescription("The JDBC table name."); public static final ConfigOption USERNAME = ConfigOptions.key("username") .stringType() .noDefaultValue() - .withDescription("the jdbc user name."); + .withDescription("The JDBC user name."); public static final ConfigOption PASSWORD = ConfigOptions.key("password") .stringType() .noDefaultValue() - .withDescription("the jdbc password."); + .withDescription("The JDBC password."); private static final ConfigOption DRIVER = ConfigOptions.key("driver") .stringType() .noDefaultValue() .withDescription( - "the class name of the JDBC driver to use to connect to this URL. " + "The class name of the JDBC driver to use to connect to this URL. " + "If not set, it will automatically be derived from the URL."); public static final ConfigOption MAX_RETRY_TIMEOUT = ConfigOptions.key("connection.max-retry-timeout") @@ -92,37 +92,35 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam ConfigOptions.key("scan.partition.column") .stringType() .noDefaultValue() - .withDescription("the column name used for partitioning the input."); + .withDescription("The column name used for partitioning the input."); private static final ConfigOption SCAN_PARTITION_NUM = ConfigOptions.key("scan.partition.num") .intType() .noDefaultValue() - .withDescription("the number of partitions."); + .withDescription("The number of partitions."); private static final ConfigOption SCAN_PARTITION_LOWER_BOUND = ConfigOptions.key("scan.partition.lower-bound") .longType() .noDefaultValue() - .withDescription("the smallest value of the first partition."); + .withDescription("The smallest value of the first partition."); private static final ConfigOption SCAN_PARTITION_UPPER_BOUND = ConfigOptions.key("scan.partition.upper-bound") .longType() .noDefaultValue() - .withDescription("the largest value of the last partition."); + .withDescription("The largest value of the last partition."); private static final ConfigOption SCAN_FETCH_SIZE = ConfigOptions.key("scan.fetch-size") .intType() .defaultValue(0) .withDescription( - "gives the reader a hint as to the number of rows that should be fetched, from" - + " the database when reading per round trip. If the value specified is zero, then the hint is ignored. The" - + " default value is zero."); + "Gives the reader a hint as to the number of rows that should be fetched " + + "from the database per round-trip when reading. " + + "If the value is zero, this hint is ignored."); private static final ConfigOption SCAN_AUTO_COMMIT = ConfigOptions.key("scan.auto-commit") .booleanType() .defaultValue(true) - .withDescription( - "sets whether the driver is in auto-commit mode. The default value is true, per" - + " the JDBC spec."); + .withDescription("Sets whether the driver is in auto-commit mode."); // look up config options private static final ConfigOption LOOKUP_CACHE_MAX_ROWS = @@ -130,19 +128,19 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam .longType() .defaultValue(-1L) .withDescription( - "the max number of rows of lookup cache, over this value, the oldest rows will " + "The max number of rows of lookup cache, over this value, the oldest rows will " + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " - + "specified. Cache is not enabled as default."); + + "specified."); private static final ConfigOption LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withDescription("the cache time to live."); + .withDescription("The cache time to live."); private static final ConfigOption LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries") .intType() .defaultValue(3) - .withDescription("the max retry times if lookup database failed."); + .withDescription("The max retry times if lookup database failed."); // write config options private static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS = @@ -150,20 +148,19 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam .intType() .defaultValue(100) .withDescription( - "the flush max size (includes all append, upsert and delete records), over this number" - + " of records, will flush data. The default value is 100."); + "The flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data."); private static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval") .durationType() .defaultValue(Duration.ofSeconds(1)) .withDescription( - "the flush interval mills, over this time, asynchronous threads will flush data. The " - + "default value is 1s."); + "The flush interval mills, over this time, asynchronous threads will flush data."); private static final ConfigOption SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries") .intType() .defaultValue(3) - .withDescription("the max retry times if writing records to database failed."); + .withDescription("The max retry times if writing records to database failed."); @Override public DynamicTableSink createDynamicTableSink(Context context) { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java index 993af275866a8..de8f45172b560 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.description.Description; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; @@ -51,6 +52,7 @@ import java.util.regex.Pattern; import java.util.stream.IntStream; +import static org.apache.flink.configuration.description.TextElement.text; import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.AT_LEAST_ONCE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.EXACTLY_ONCE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.NONE; @@ -97,32 +99,39 @@ private KafkaOptions() {} .enumType(ValueFieldsStrategy.class) .defaultValue(ValueFieldsStrategy.ALL) .withDescription( - "Defines a strategy how to deal with key columns in the data type of " - + "the value format. By default, '" - + ValueFieldsStrategy.ALL - + "' physical " - + "columns of the table schema will be included in the value format which " - + "means that key columns appear in the data type for both the key and value " - + "format."); + String.format( + "Defines a strategy how to deal with key columns in the data type " + + "of the value format. By default, '%s' physical columns of the table schema " + + "will be included in the value format which means that the key columns " + + "appear in the data type for both the key and value format.", + ValueFieldsStrategy.ALL)); public static final ConfigOption KEY_FIELDS_PREFIX = ConfigOptions.key("key.fields-prefix") .stringType() .noDefaultValue() .withDescription( - "Defines a custom prefix for all fields of the key format to avoid " - + "name clashes with fields of the value format. By default, the prefix is empty. " - + "If a custom prefix is defined, both the table schema and " - + "'" - + KEY_FIELDS.key() - + "' will work with prefixed names. When constructing " - + "the data type of the key format, the prefix will be removed and the " - + "non-prefixed names will be used within the key format. Please note that this " - + "option requires that '" - + VALUE_FIELDS_INCLUDE.key() - + "' must be '" - + ValueFieldsStrategy.EXCEPT_KEY - + "'."); + Description.builder() + .text( + "Defines a custom prefix for all fields of the key format to avoid " + + "name clashes with fields of the value format. " + + "By default, the prefix is empty.") + .linebreak() + .text( + String.format( + "If a custom prefix is defined, both the table schema and '%s' will work with prefixed names.", + KEY_FIELDS.key())) + .linebreak() + .text( + "When constructing the data type of the key format, the prefix " + + "will be removed and the non-prefixed names will be used within the key format.") + .linebreak() + .text( + String.format( + "Please note that this option requires that '%s' must be '%s'.", + VALUE_FIELDS_INCLUDE.key(), + ValueFieldsStrategy.EXCEPT_KEY)) + .build()); // -------------------------------------------------------------------------------------------- // Kafka specific options @@ -166,9 +175,16 @@ private KafkaOptions() {} .stringType() .defaultValue("group-offsets") .withDescription( - "Optional startup mode for Kafka consumer, valid enumerations are " - + "\"earliest-offset\", \"latest-offset\", \"group-offsets\", \"timestamp\"\n" - + "or \"specific-offsets\""); + Description.builder() + .text( + "Optional startup mode for Kafka consumer, valid enumerations are") + .list( + text("'earliest-offset'"), + text("'latest-offset'"), + text("'group-offsets'"), + text("'timestamp'"), + text("'specific-offsets'")) + .build()); public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key("scan.startup.specific-offsets") @@ -200,19 +216,30 @@ private KafkaOptions() {} .stringType() .defaultValue("default") .withDescription( - "Optional output partitioning from Flink's partitions\n" - + "into Kafka's partitions valid enumerations are\n" - + "\"default\": (use kafka default partitioner to partition records),\n" - + "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n" - + "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin when 'key.fields' is not specified.)\n" - + "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)"); + Description.builder() + .text( + "Optional output partitioning from Flink's partitions into Kafka's partitions. Valid enumerations are") + .list( + text( + "'default' (use kafka default partitioner to partition records)"), + text( + "'fixed' (each Flink partition ends up in at most one Kafka partition)"), + text( + "'round-robin' (a Flink partition is distributed to Kafka partitions round-robin when 'key.fields' is not specified)"), + text( + "custom class name (use custom FlinkKafkaPartitioner subclass)")) + .build()); public static final ConfigOption SINK_SEMANTIC = ConfigOptions.key("sink.semantic") .stringType() .defaultValue("at-least-once") .withDescription( - "Optional semantic when commit. Valid enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]"); + Description.builder() + .text( + "Optional semantic when committing. Valid enumerations are") + .list(text("at-least-once"), text("exactly-once"), text("none")) + .build()); // Disable this feature by default public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS = @@ -220,12 +247,19 @@ private KafkaOptions() {} .intType() .defaultValue(0) .withDescription( - "The max size of buffered records before flush. " - + "When the sink receives many updates on the same key, the buffer will retain the last record of the same key. " - + "This can help to reduce data shuffling and avoid possible tombstone messages to Kafka topic." - + "Can be set to '0' to disable it." - + "Note both 'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' " - + "must be set to be greater than zero to enable sink buffer flushing."); + Description.builder() + .text( + "The max size of buffered records before flushing. " + + "When the sink receives many updates on the same key, " + + "the buffer will retain the last records of the same key. " + + "This can help to reduce data shuffling and avoid possible tombstone messages to the Kafka topic.") + .linebreak() + .text("Can be set to '0' to disable it.") + .linebreak() + .text( + "Note both 'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' " + + "must be set to be greater than zero to enable sink buffer flushing.") + .build()); // Disable this feature by default public static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL = @@ -233,12 +267,18 @@ private KafkaOptions() {} .durationType() .defaultValue(Duration.ofSeconds(0)) .withDescription( - "The flush interval mills, over this time, asynchronous threads will flush data. " - + "When the sink receives many updates on the same key, the buffer will retain the last record of the same key. " - + "This can help to reduce data shuffling and avoid possible tombstone messages to Kafka topic." - + "Can be set to '0' to disable it. " - + "Note both 'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' " - + "must be set to be greater than zero to enable sink buffer flushing."); + Description.builder() + .text( + "The flush interval millis. Over this time, asynchronous threads " + + "will flush data. When the sink receives many updates on the same key, " + + "the buffer will retain the last record of the same key.") + .linebreak() + .text("Can be set to '0' to disable it.") + .linebreak() + .text( + "Note both 'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' " + + "must be set to be greater than zero to enable sink buffer flushing.") + .build()); private static final ConfigOption SCHEMA_REGISTRY_SUBJECT = ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue(); diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java index 5e50a21598b46..b30d40127d292 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.description.Description; import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner; @@ -40,6 +41,9 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + /** * Options for Kinesis tables supported by the {@code CREATE TABLE ... WITH ...} clause of the Flink * SQL dialect and the Flink Table API. @@ -86,7 +90,7 @@ private KinesisOptions() {} ConfigOptions.key("stream") .stringType() .noDefaultValue() - .withDescription("Name of the Kinesis stream backing this table (required)"); + .withDescription("Name of the Kinesis stream backing this table."); // ----------------------------------------------------------------------------------------- // Sink specific options @@ -97,26 +101,33 @@ private KinesisOptions() {} .stringType() .noDefaultValue() .withDescription( - "Optional output partitioning from Flink's partitions into Kinesis shards. " - + "Sinks that write to tables defined with the PARTITION BY clause " - + "always use a field-based partitioner and cannot define this option. " - + "Valid enumerations are: \n" - + "\"random\":" - + " (use a random partition key),\n" - + "\"fixed\":" - + " (each Flink partition ends up in at most one Kinesis shard),\n" - + "\"custom class name\":" - + " (use a custom " - + KinesisPartitioner.class.getName() - + " subclass)"); + Description.builder() + .text( + "Optional output partitioning from Flink's partitions into Kinesis shards. " + + "Sinks that write to tables defined with the %s clause always use a " + + "field-based partitioner and cannot define this option.", + code("PARTITION BY")) + .linebreak() + .text("Valid enumerations are") + .list( + text("random (use a random partition key)"), + text( + "fixed (each Flink partition ends up in at most one Kinesis shard)"), + text( + "custom class name (use a custom %s subclass)", + text(KinesisPartitioner.class.getName()))) + .build()); public static final ConfigOption SINK_PARTITIONER_FIELD_DELIMITER = ConfigOptions.key("sink.partitioner-field-delimiter") .stringType() .defaultValue("|") .withDescription( - "Optional field delimiter for fields-based partitioner " - + "derived from a PARTITION BY clause (\"|\" by default)"); + Description.builder() + .text( + "Optional field delimiter for fields-based partitioner derived from a %s clause", + code("PARTITION BY")) + .build()); // ----------------------------------------------------------------------------------------- // Option enumerations diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java index 0f985d6ac0210..bfc17e487162b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java @@ -20,11 +20,13 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.description.Description; import org.apache.flink.table.factories.FactoryUtil; import java.time.Duration; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.TextElement.text; /** This class holds configuration constants used by filesystem(Including hive) connector. */ public class FileSystemOptions { @@ -38,14 +40,13 @@ public class FileSystemOptions { .defaultValue("__DEFAULT_PARTITION__") .withDescription( "The default partition name in case the dynamic partition" - + " column value is null/empty string"); + + " column value is null/empty string."); public static final ConfigOption SINK_ROLLING_POLICY_FILE_SIZE = key("sink.rolling-policy.file-size") .memoryType() .defaultValue(MemorySize.ofMebiBytes(128)) - .withDescription( - "The maximum part file size before rolling (by default 128MB)."); + .withDescription("The maximum part file size before rolling."); public static final ConfigOption SINK_ROLLING_POLICY_ROLLOVER_INTERVAL = key("sink.rolling-policy.rollover-interval") @@ -53,7 +54,7 @@ public class FileSystemOptions { .defaultValue(Duration.ofMinutes(30)) .withDescription( "The maximum time duration a part file can stay open before rolling" - + " (by default 30 min to avoid to many small files). The frequency at which" + + " (by default long enough to avoid too many small files). The frequency at which" + " this is checked is controlled by the 'sink.rolling-policy.check-interval' option."); public static final ConfigOption SINK_ROLLING_POLICY_CHECK_INTERVAL = @@ -71,28 +72,34 @@ public class FileSystemOptions { .withDescription( "The option to enable shuffle data by dynamic partition fields in sink" + " phase, this can greatly reduce the number of file for filesystem sink but may" - + " lead data skew, the default value is disabled."); + + " lead data skew."); public static final ConfigOption STREAMING_SOURCE_ENABLE = key("streaming-source.enable") .booleanType() .defaultValue(false) .withDescription( - "Enable streaming source or not.\n" - + " NOTES: Please make sure that each partition/file should be written" - + " atomically, otherwise the reader may get incomplete data."); + Description.builder() + .text("Enable streaming source or not.") + .linebreak() + .text( + " NOTES: Please make sure that each partition/file should be written" + + " atomically, otherwise the reader may get incomplete data.") + .build()); public static final ConfigOption STREAMING_SOURCE_PARTITION_INCLUDE = key("streaming-source.partition.include") .stringType() .defaultValue("all") .withDescription( - "Option to set the partitions to read, the supported values " - + "are \"all\" and \"latest\"," - + " the \"all\" means read all partitions; the \"latest\" means read latest " - + "partition in order of streaming-source.partition.order, the \"latest\" only works" - + " when the streaming hive source table used as temporal table. " - + "By default the option is \"all\".\n."); + Description.builder() + .text( + "Option to set the partitions to read, supported values are") + .list( + text("all (read all partitions)"), + text( + "latest (read latest partition in order of 'streaming-source.partition.order', this only works when a streaming Hive source table is used as a temporal table)")) + .build()); public static final ConfigOption STREAMING_SOURCE_MONITOR_INTERVAL = key("streaming-source.monitor-interval") @@ -106,39 +113,47 @@ public class FileSystemOptions { .defaultValue("partition-name") .withDeprecatedKeys("streaming-source.consume-order") .withDescription( - "The partition order of streaming source," - + " support \"create-time\", \"partition-time\" and \"partition-name\"." - + " \"create-time\" compares partition/file creation time, this is not the" - + " partition create time in Hive metaStore, but the folder/file modification" - + " time in filesystem, if the partition folder somehow gets updated," - + " e.g. add new file into folder, it can affect how the data is consumed." - + " \"partition-time\" compares the time extracted from partition name." - + " \"partition-name\" compares partition name's alphabetical order." - + " This option is equality with deprecated option \"streaming-source.consume-order\"."); + Description.builder() + .text( + "The partition order of the streaming source, supported values are") + .list( + text( + "create-time (compares partition/file creation time, which is not the partition creation time in the Hive metastore, " + + "but the folder/file modification time in the filesystem; e.g., adding a new file into " + + "the folder may affect how the data is consumed)"), + text( + "partition-time (compares the time extracted from the partition name)"), + text( + "partition-name (compares partition names lexicographically)")) + .text( + "This is a synonym for the deprecated 'streaming-source.consume-order' option.") + .build()); public static final ConfigOption STREAMING_SOURCE_CONSUME_START_OFFSET = key("streaming-source.consume-start-offset") .stringType() .noDefaultValue() .withDescription( - "Start offset for streaming consuming." - + " How to parse and compare offsets depends on your order." - + " For create-time and partition-time, should be a timestamp" - + " string (yyyy-[m]m-[d]d [hh:mm:ss])." - + " For partition-time, will use partition time extractor to" - + " extract time from partition." - + " For partition-name, is the partition name string, e.g.:" - + " pt_year=2020/pt_mon=10/pt_day=01"); + Description.builder() + .text( + "Start offset for streaming consuming. How to parse and compare offsets depends on 'streaming-source.partition-order'.") + .list( + text( + "For 'create-time' and 'partition-time' it should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss])."), + text( + "For 'partition-time' it will use a partition time extractor to extract the time from the partition."), + text( + "For 'partition-name' it is the name of the partition, e.g. 'pt_year=2020/pt_mon=10/pt_day=01'.")) + .build()); public static final ConfigOption PARTITION_TIME_EXTRACTOR_KIND = key("partition.time-extractor.kind") .stringType() .defaultValue("default") .withDescription( - "Time extractor to extract time from partition values." - + " Support default and custom." - + " For default, can configure timestamp pattern." - + " For custom, should configure extractor class."); + "Time extractor to extract time from partition values. " + + "This can either be 'default' or a custom extractor class. " + + "For 'default', you can configure a timestamp pattern."); public static final ConfigOption PARTITION_TIME_EXTRACTOR_CLASS = key("partition.time-extractor.class") @@ -152,43 +167,56 @@ public class FileSystemOptions { .stringType() .noDefaultValue() .withDescription( - "The 'default' construction way allows users to use partition" - + " fields to get a legal timestamp pattern." - + " Default support 'yyyy-mm-dd hh:mm:ss' from first field." - + " If timestamp in partition is single field 'dt', can configure: '$dt'." - + " If timestamp in partition is year, month, day, hour," - + " can configure: '$year-$month-$day $hour:00:00'." - + " If timestamp in partition is dt and hour, can configure: '$dt $hour:00:00'."); + Description.builder() + .text( + "When 'partition.time-extractor.kind' is set to 'default', " + + "you can specify a pattern to get a timestamp from partitions.") + .list( + text( + "By default, a format of 'yyyy-mm-dd hh:mm:ss' is read from the first field."), + text( + "If the timestamp in the partition is a single field called 'dt', you can use '$dt'."), + text( + "If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'."), + text( + "If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.")) + .build()); public static final ConfigOption LOOKUP_JOIN_CACHE_TTL = key("lookup.join.cache.ttl") .durationType() .defaultValue(Duration.ofMinutes(60)) .withDescription( - "The cache TTL (e.g. 10min) for the build table in lookup join. " - + "By default the TTL is 60 minutes."); + "The cache TTL (e.g. 10min) for the build table in lookup join."); public static final ConfigOption SINK_PARTITION_COMMIT_TRIGGER = key("sink.partition-commit.trigger") .stringType() .defaultValue("process-time") .withDescription( - "Trigger type for partition commit:\n" - + " 'process-time': based on the time of the machine, it neither requires" - + " partition time extraction nor watermark generation. Commit partition" - + " once the 'current system time' passes 'partition creation system time' plus 'delay'.\n" - + " 'partition-time': based on the time that extracted from partition values," - + " it requires watermark generation. Commit partition once the 'watermark'" - + " passes 'time extracted from partition values' plus 'delay'."); + Description.builder() + .text("Trigger type for partition commit, supported values are") + .list( + text( + "process-time (based on the time of the machine, requires " + + "neither partition time extraction nor watermark generation; " + + "commits partition once the current system time passes partition creation system time plus delay)"), + text( + "partition-time (based on the time extracted from partition values, " + + "requires watermark generation; commits partition once " + + "the watermark passes the time extracted from partition values plus delay)")) + .build()); public static final ConfigOption SINK_PARTITION_COMMIT_DELAY = key("sink.partition-commit.delay") .durationType() .defaultValue(Duration.ofMillis(0)) .withDescription( - "The partition will not commit until the delay time." - + " if it is a day partition, should be '1 d'," - + " if it is a hour partition, should be '1 h'"); + Description.builder() + .text( + "The partition will not commit until the delay time. " + + "The value should be '1 d' for day partitions and '1 h' for hour partitions.") + .build()); public static final ConfigOption SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE = key("sink.partition-commit.watermark-time-zone")