Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ public enum BackOffType {
"Failure handling strategy in case a request to Elasticsearch fails")
.list(
text(
"\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
"\"fail\" (throws an exception if a request fails and thus causes a job failure)"),
text(
"\"ignore\" (ignores failures and drops the request),"),
"\"ignore\" (ignores failures and drops the request)"),
text(
"\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
"\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation)"),
text(
"\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
.build());
Expand Down Expand Up @@ -143,9 +143,8 @@ public enum BackOffType {
.stringType()
.defaultValue("json")
.withDescription(
"Elasticsearch connector requires to specify a format.\n"
+ "The format must produce a valid json document. \n"
+ "By default uses built-in 'json' format. Please refer to Table Formats section for more details.");
"The format must produce a valid JSON document. "
+ "Please refer to the documentation on formats for more details.");

private ElasticsearchOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,28 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
ConfigOptions.key("url")
.stringType()
.noDefaultValue()
.withDescription("the jdbc database url.");
.withDescription("The JDBC database URL.");
public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
.withDescription("The JDBC table name.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
.withDescription("The JDBC user name.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
.withDescription("The JDBC password.");
private static final ConfigOption<String> DRIVER =
ConfigOptions.key("driver")
.stringType()
.noDefaultValue()
.withDescription(
"the class name of the JDBC driver to use to connect to this URL. "
"The class name of the JDBC driver to use to connect to this URL. "
+ "If not set, it will automatically be derived from the URL.");
public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
ConfigOptions.key("connection.max-retry-timeout")
Expand All @@ -92,78 +92,75 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
ConfigOptions.key("scan.partition.column")
.stringType()
.noDefaultValue()
.withDescription("the column name used for partitioning the input.");
.withDescription("The column name used for partitioning the input.");
private static final ConfigOption<Integer> SCAN_PARTITION_NUM =
ConfigOptions.key("scan.partition.num")
.intType()
.noDefaultValue()
.withDescription("the number of partitions.");
.withDescription("The number of partitions.");
private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND =
ConfigOptions.key("scan.partition.lower-bound")
.longType()
.noDefaultValue()
.withDescription("the smallest value of the first partition.");
.withDescription("The smallest value of the first partition.");
private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND =
ConfigOptions.key("scan.partition.upper-bound")
.longType()
.noDefaultValue()
.withDescription("the largest value of the last partition.");
.withDescription("The largest value of the last partition.");
private static final ConfigOption<Integer> SCAN_FETCH_SIZE =
ConfigOptions.key("scan.fetch-size")
.intType()
.defaultValue(0)
.withDescription(
"gives the reader a hint as to the number of rows that should be fetched, from"
+ " the database when reading per round trip. If the value specified is zero, then the hint is ignored. The"
+ " default value is zero.");
"Gives the reader a hint as to the number of rows that should be fetched "
+ "from the database per round-trip when reading. "
+ "If the value is zero, this hint is ignored.");
private static final ConfigOption<Boolean> SCAN_AUTO_COMMIT =
ConfigOptions.key("scan.auto-commit")
.booleanType()
.defaultValue(true)
.withDescription(
"sets whether the driver is in auto-commit mode. The default value is true, per"
+ " the JDBC spec.");
.withDescription("Sets whether the driver is in auto-commit mode.");

// look up config options
private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription(
"the max number of rows of lookup cache, over this value, the oldest rows will "
"The max number of rows of lookup cache, over this value, the oldest rows will "
+ "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+ "specified. Cache is not enabled as default.");
+ "specified.");
private static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription("the cache time to live.");
.withDescription("The cache time to live.");
private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if lookup database failed.");
.withDescription("The max retry times if lookup database failed.");

// write config options
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
.defaultValue(100)
.withDescription(
"the flush max size (includes all append, upsert and delete records), over this number"
+ " of records, will flush data. The default value is 100.");
"The flush max size (includes all append, upsert and delete records), over this number"
+ " of records, will flush data.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
"the flush interval mills, over this time, asynchronous threads will flush data. The "
+ "default value is 1s.");
"The flush interval mills, over this time, asynchronous threads will flush data.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
.withDescription("The max retry times if writing records to database failed.");

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Expand Down
Loading