Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ Flink SQL> SELECT * FROM orders;
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。</td>
</tr>
<tr>
<td>connect.max-retries</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ pipeline:
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。</td>
</tr>
<tr>
<td>connect.max-retries</td>
Expand Down
3 changes: 2 additions & 1 deletion docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ During a snapshot operation, the connector will query each included table to pro
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
This value cannot be less than 250ms.</td>
</tr>
<tr>
<td>connect.max-retries</td>
Expand Down
3 changes: 2 additions & 1 deletion docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ pipeline:
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
This value cannot be less than 250ms.</td>
</tr>
<tr>
<td>connect.max-retries</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class MySqlSourceOptions {
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.");
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms.");

public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
ConfigOptions.key("connection.pool.size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
validateIntegerOption(MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
validateDurationOption(
MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
}

OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
Expand Down Expand Up @@ -316,6 +318,16 @@ private void validateIntegerOption(
option.key(), exclusiveMin, optionValue));
}

/** Checks the value of given duration option is valid. */
private void validateDurationOption(
ConfigOption<Duration> option, Duration optionValue, Duration exclusiveMin) {
checkState(
optionValue.toMillis() > exclusiveMin.toMillis(),
String.format(
"The value of option '%s' cannot be less than %s, but actual is %s",
option.key(), exclusiveMin, optionValue));
}

/**
* Checks the given regular expression's syntax is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,22 @@ public void testValidation() {
.isPresent());
}

// validate illegal connect.timeout
try {
Map<String, String> properties = getAllOptions();
properties.put("scan.incremental.snapshot.enabled", "true");
properties.put("connect.timeout", "240ms");

createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The value of option 'connect.timeout' cannot be less than PT0.25S, but actual is PT0.24S")
.isPresent());
}

// validate illegal split size
try {
Map<String, String> properties = getAllOptions();
Expand Down