-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1019628 update file cleaner for snowpipe ingest #807
SNOW-1019628 update file cleaner for snowpipe ingest #807
Conversation
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/ProgressRegistryTelemetry.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets discuss this in our meeting for coming friday. High level idea is good, i just wish the PR was a bit smaller. (Breaking up in smaller chunks would have made reviewer's life easy)
src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. I'll wait for comment reply before the approval.
For the future please split the refactoring and the core changes to seprate PRs (or at least provide more details list of going through the changes).
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
Outdated
Show resolved
Hide resolved
|
||
@BeforeEach | ||
void setup() { | ||
manager = Mockito.mock(SimpleIngestManager.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we have our own fake (and abstraction over it SimpleIngestManager) instead of yet another mock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, but there were none so far and I wanted to provide some basic testing for the service itself, without relying on the underlying SDK class
} | ||
cleanerServiceExecutor = | ||
new ScheduledThreadPoolExecutor( | ||
Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need more than 1 thread? It could impact the customer workload and since customers run multiple kafka connect clusters on the same node it could be too much to use the availableProcessors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pool is reused by all partitions for given topic. should there be many partitions, it could potentially mean, that the single thread would be too busy serving cleanup - there are some potentially expensive I/O calls to fetch the history and stage.
anway - i've added additional configuration parameter, where one could potentially manually configure the pool size to fit their specifc needs.
in general - 1 thread per topic should be sufficient - this new algorithm does not block thread on sleep. only potential risk is duration of I/O calls.
…t is 1 now. some cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks.
* SNOW-922795 Added third-party-licenses to .zip distribution (snowflakedb#784) * PROD-39429 Add PR template (snowflakedb#786) * Run subset of e2e tests (snowflakedb#788) * SNOW-979849: Add offset verification logic (snowflakedb#795) SNOW-979849 Add offset verification logic as part of channel open. We're verifying that the current start offset = previous end offset +1, if not true, then it potentially means missing or duplicate data. Note that there are some false positives that we can't avoid like when SMT is used, but this is good enough to do blast radius analysis for any corruption issue. * SNOW-1117446 Snowpipe Streaming client provider map (snowflakedb#794) * NO-SNOW: upgrade JDBC to 3.14.5 (snowflakedb#799) NO-SNOW upgrade JDBC to 3.14.5 * SNOW-1049869 Cleanup streaming ingest threads when SinkTask stop() is called (snowflakedb#800) * [NO-SNOW] Upgrade dependency versions (snowflakedb#802) * Upgrade connect-api to 3.7.0 (snowflakedb#803) * SNOW-1057151: Add poc test setup for EmbeddedConnectCluster (snowflakedb#801) * SNOW-1049869 Rewrite client optimization unit tests (snowflakedb#804) * Release v2.2.1 (snowflakedb#805) * SNOW-1055524 E2E test with nullable SMT (snowflakedb#808) * SNOW-1057165 - Test-doubles for snowflake-ingest-java (snowflakedb#809) * SNOW-1055524 E2E SMT without schema evolution (snowflakedb#811) * SNOW-1055524 Check table schema in e2e smt (snowflakedb#813) * SNOW-1019628 update file cleaner for snowpipe ingest (snowflakedb#807) * SNOW-1055524 E2E SMT for a Snowpipe based connector (snowflakedb#812) * NO-SNOW Remove performance tests (snowflakedb#817) * SNOW-1055561: Check whether SMT returning null values no longer stops a data ingestion. (snowflakedb#816) * Update to release 2.2.2 (snowflakedb#819) --------- Co-authored-by: Michał Bobowski <145468486+sfc-gh-mbobowski@users.noreply.github.com> Co-authored-by: Jay Patel <jay.patel@snowflake.com> Co-authored-by: Toby Zhang <toby.zhang@snowflake.com> Co-authored-by: Xin Huang <xin.huang@snowflake.com> Co-authored-by: Wojciech Trefon <wojciech.trefon@snowflake.com> Co-authored-by: revi cheng <revi.cheng@snowflake.com> Co-authored-by: Lex Shcharbaty <147443583+sfc-gh-lshcharbaty@users.noreply.github.com> Co-authored-by: Artur Chyży <artur.chyzy@snowflake.com> Co-authored-by: Greg Jachimko <greg.jachimko@snowflake.com> Co-authored-by: Adrian Kowalczyk <85181931+sfc-gh-akowalczyk@users.noreply.github.com>
Overview
SNOW-1019628
This PR addresses reported issue of cleaner not properly handling file cleanup - in some scenarios, files will be kept in staging area for as long as there is no message in the topic.
This change modifies the file handling algorithm - it no longer depends on the first record to be received to start - it will start automatically for each partition.
To enable this new cleaner, provide following configuration entry to your connector config:
"snowflake.snowpipe.v2CleanerEnabled": true
Core processing logic happens in
StageFilesProcessor::trackFiles
This method is run in a worker thread. The only way to communicate with the cleaner, is through
ProgressRegister
interface, which allows adding new ingested files for tracking in a thread safe manner.Pre-review checklist
snowflake.snowpipe.v2CleanerEnabled
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected