-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid parsing a message if this message is too big #21090
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to see this getting addressed!
I added some suggestions, but I don't think they are blocking.
@@ -199,7 +199,7 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc | |||
final Scanner input = new Scanner(System.in, StandardCharsets.UTF_8).useDelimiter("[\r\n]+"); | |||
consumer.start(); | |||
while (input.hasNext()) { | |||
consumeMessage(consumer, input.next()); | |||
consumeMessage(consumer, input.next(), Runtime.getRuntime().maxMemory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not being good at ☕, is Runtime.getRuntime().maxMemory()
something we should memoize - if this makes a system call, it might be slow. It is in Ruby anyway...
if (inputString.getBytes().length > (runnerMaximumMemoryInBytes * 0.6)) { | ||
LOGGER.error("One messge is too big, the replication will fail in order to avoid OOM. The size of the message is: {} bytes", | ||
inputString.getBytes().length); | ||
throw new IllegalStateException("Invalid message, the message is too big"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this error will eventually bubble up to the user/UI. If that's true, perhaps we should provide a little more context to try and learn as much as we can about the message:
Airbyte has received a message from the source(!) at 2022-01-01 01:02:03(!) which is larger than xxxMB(!). The sync has been failed to prevent running out of memory.
Everything with (!) is dynamic. Ideally, I'd like to use a regular expression or similar to grab the stream name, but we can add that later if this becomes a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this exception properly be attributed to the source and build a FailureReason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During my test, it turns out that adding that here will only be apply on the destination. I'll rework it and test the error message and make sure that the getMaxMemory is only call once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@evantahler , I made some change (it's a draft because I need to clean it up, I'll re-request a review when its ready). In my test it looked like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
…t-parse-too-big-records
…t-parse-too-big-records
…t-parse-too-big-records
…t-parse-too-big-records
Affected Connector ReportNOTE
|
Connector | Version | Changelog | Publish |
---|---|---|---|
source-alloydb |
1.0.34 |
✅ | ✅ |
source-alloydb-strict-encrypt |
1.0.34 |
🔵 (ignored) |
🔵 (ignored) |
source-bigquery |
0.2.3 |
✅ | ✅ |
source-clickhouse |
0.1.14 |
✅ | ✅ |
source-clickhouse-strict-encrypt |
0.1.14 |
🔵 (ignored) |
🔵 (ignored) |
source-cockroachdb |
0.1.18 |
✅ | ✅ |
source-cockroachdb-strict-encrypt |
0.1.18 |
🔵 (ignored) |
🔵 (ignored) |
source-db2 |
0.1.16 |
✅ | ✅ |
source-db2-strict-encrypt |
0.1.16 |
🔵 (ignored) |
🔵 (ignored) |
source-dynamodb |
0.1.0 |
✅ | ✅ |
source-e2e-test |
2.1.3 |
✅ | ✅ |
source-e2e-test-cloud |
2.1.1 |
🔵 (ignored) |
🔵 (ignored) |
source-elasticsearch |
0.1.1 |
✅ | ✅ |
source-jdbc |
0.3.5 |
🔵 (ignored) |
🔵 (ignored) |
source-kafka |
0.2.3 |
✅ | ✅ |
source-mongodb-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
source-mongodb-v2 |
0.1.19 |
✅ | ✅ |
source-mssql |
0.4.26 |
✅ | ✅ |
source-mssql-strict-encrypt |
0.4.26 |
🔵 (ignored) |
🔵 (ignored) |
source-mysql |
1.0.18 |
✅ | ✅ |
source-mysql-strict-encrypt |
1.0.18 |
🔵 (ignored) |
🔵 (ignored) |
source-oracle |
0.3.21 |
✅ | ✅ |
source-oracle-strict-encrypt |
0.3.21 |
🔵 (ignored) |
🔵 (ignored) |
source-postgres |
1.0.36 |
✅ | ✅ |
source-postgres-strict-encrypt |
1.0.36 |
🔵 (ignored) |
🔵 (ignored) |
source-redshift |
0.3.15 |
✅ | ✅ |
source-relational-db |
0.3.1 |
🔵 (ignored) |
🔵 (ignored) |
source-scaffold-java-jdbc |
0.1.0 |
🔵 (ignored) |
🔵 (ignored) |
source-sftp |
0.1.2 |
✅ | ✅ |
source-snowflake |
0.1.28 |
✅ | ✅ |
source-tidb |
0.2.1 |
✅ | ✅ |
- See "Actionable Items" below for how to resolve warnings and errors.
❌ Destinations (47)
Connector | Version | Changelog | Publish |
---|---|---|---|
destination-azure-blob-storage |
0.1.6 |
✅ | ✅ |
destination-bigquery |
1.2.9 |
✅ | ✅ |
destination-bigquery-denormalized |
1.2.10 |
✅ | ✅ |
destination-cassandra |
0.1.4 |
✅ | ✅ |
destination-clickhouse |
0.2.1 |
✅ | ✅ |
destination-clickhouse-strict-encrypt |
0.2.1 |
🔵 (ignored) |
🔵 (ignored) |
destination-csv |
1.0.0 |
❌ (changelog missing) |
✅ |
destination-databricks |
0.3.1 |
✅ | ✅ |
destination-dev-null |
0.2.7 |
🔵 (ignored) |
🔵 (ignored) |
destination-doris |
0.1.0 |
✅ | ✅ |
destination-dynamodb |
0.1.7 |
✅ | ✅ |
destination-e2e-test |
0.2.4 |
✅ | ✅ |
destination-elasticsearch |
0.1.6 |
✅ | ✅ |
destination-elasticsearch-strict-encrypt |
0.1.6 |
🔵 (ignored) |
🔵 (ignored) |
destination-gcs |
0.2.12 |
✅ | ✅ |
destination-iceberg |
0.1.0 |
✅ | ✅ |
destination-jdbc |
0.3.14 |
🔵 (ignored) |
🔵 (ignored) |
destination-kafka |
0.1.10 |
✅ | ✅ |
destination-keen |
0.2.4 |
✅ | ✅ |
destination-kinesis |
0.1.5 |
✅ | ✅ |
destination-local-json |
0.2.11 |
✅ | ✅ |
destination-mariadb-columnstore |
0.1.7 |
✅ | ✅ |
destination-mongodb |
0.1.9 |
✅ | ✅ |
destination-mongodb-strict-encrypt |
0.1.9 |
🔵 (ignored) |
🔵 (ignored) |
destination-mqtt |
0.1.3 |
✅ | ✅ |
destination-mssql |
0.1.22 |
✅ | ✅ |
destination-mssql-strict-encrypt |
0.1.22 |
🔵 (ignored) |
🔵 (ignored) |
destination-mysql |
0.1.20 |
✅ | ✅ |
destination-mysql-strict-encrypt |
❌ 0.1.21 (mismatch: 0.1.20 ) |
🔵 (ignored) |
🔵 (ignored) |
destination-oracle |
0.1.19 |
✅ | ✅ |
destination-oracle-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
destination-postgres |
0.3.26 |
✅ | ✅ |
destination-postgres-strict-encrypt |
0.3.26 |
🔵 (ignored) |
🔵 (ignored) |
destination-pubsub |
0.2.0 |
✅ | ✅ |
destination-pulsar |
0.1.3 |
✅ | ✅ |
destination-r2 |
0.1.0 |
✅ | ✅ |
destination-redis |
0.1.4 |
✅ | ✅ |
destination-redpanda |
0.1.0 |
✅ | ✅ |
destination-redshift |
0.3.53 |
✅ | ✅ |
destination-rockset |
0.1.4 |
✅ | ✅ |
destination-s3 |
0.3.18 |
✅ | ✅ |
destination-s3-glue |
0.1.1 |
✅ | ✅ |
destination-scylla |
0.1.3 |
✅ | ✅ |
destination-snowflake |
0.4.41 |
✅ | ✅ |
destination-teradata |
0.1.0 |
✅ | ✅ |
destination-tidb |
0.1.0 |
✅ | ✅ |
destination-yugabytedb |
0.1.0 |
✅ | ✅ |
- See "Actionable Items" below for how to resolve warnings and errors.
✅ Other Modules (0)
Actionable Items
(click to expand)
Category | Status | Actionable Item |
---|---|---|
Version | ❌ mismatch |
The version of the connector is different from its normal variant. Please bump the version of the connector. |
⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
|
Changelog | ⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
❌ changelog missing |
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog. | |
Publish | ⚠ not in seed |
The connector is not in the seed file (e.g. source_definitions.yaml ), so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that it is not a bug. |
❌ diff seed version |
The connector exists in the seed file, but the latest version is not listed there. This usually means that the latest version is not published. Please use the /publish command to publish the latest version. |
protocolValidator = protocolPredicate; | ||
this.logger = logger; | ||
this.containerLogMdcBuilder = containerLogMdcBuilder; | ||
this.exceptionClass = messageSizeExceptionClass; | ||
this.maxMemory = Runtime.getRuntime().maxMemory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK maxMemory()
returns the total amount of memory reserved for the VM (including the unallocated memory). Judging by the rest of the code, there is an assumption here that 80% of that will be available for local variables - how is this assumption made and validated?
I might suggest that we use freeMemory()
instead
.peek(str -> metricClient.distribution(OssMetricsRegistry.JSON_STRING_LENGTH, str.getBytes(StandardCharsets.UTF_8).length)) | ||
.peek(str -> { | ||
if (exceptionClass.isPresent()) { | ||
final long messageSize = str.getBytes(StandardCharsets.UTF_8).length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at this point, isn't str
already occupying as many as str.getBytes(StandardCharsets.UTF_8).length
bytes in memory?
This reverts commit 954ca75.
* Tmp * Format * TMP * TMP * Inject max memory * Clean up * Improve error message * PR comments * Unrelated changes * Fix pmd
What
Address: #17426
In order to avoid OOM when trying to parse a record, we need to check that the size of the messages received by the orchestrator are not too big. If they are, the replication is failed by throwing an
IllegalStateException
.