Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1512047 Introduce independent per-table flushes when interleaving is disabled #788

Merged
merged 7 commits into from
Jul 26, 2024

Conversation

sfc-gh-alhuang
Copy link
Contributor

The current SDK behavior flushes all channels simultaneously when any buffer reaches its limit, potentially causing unnecessary small file flushes if interleaving is disabled (MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST = 1) and the ingestion throughput between tables is uneven.

As the WIP streaming to Iceberg table feature set MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST = 1. This PR introduces per-table flushing to avoid the above issue.

JIRA

@sfc-gh-alhuang sfc-gh-alhuang marked this pull request as ready for review July 3, 2024 23:55
@sfc-gh-alhuang sfc-gh-alhuang requested review from sfc-gh-tzhang and a team as code owners July 3, 2024 23:55
&& !tablesToFlush.isEmpty()) {
tablesToFlush.addAll(
this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this, If the previous code block already picked up the minimal set of tables needing flush?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, even if interleaving is enabled, I'd prefer to keep the above logic for flushing and wait until the MaxClientLag for each channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aimed to maintain the original interleaving behavior, where all channels are flushed if any channel needs it. With independent flushing intervals, we might miss the chance to combine multiple chunks into the same BDEC. A potential workaround is to discretize timestamps and reduce jitter on lastFlushTime in interleaving mode. This can increase the chances of combining multiple chunks into the same blob. What do you think?

@@ -33,6 +40,12 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());

// Update the last flush time for the table, add jitter to avoid all channels flush at the same
// time when the blobs are not interleaved
this.lastFlushTime.putIfAbsent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand what this helps with. If someone does addChannel, and doesn't add any data for a minute, the first row that they add will trigger a flush since we'll mistakenly think its been a long time since the last flush.

Copy link
Contributor Author

@sfc-gh-alhuang sfc-gh-alhuang Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Should we change the logic to following?

  1. Don't edit lastFlushTime when creating a channel.
  2. When calling putRow or putRows, if lastFlushTime is null, set to current time.
  3. Whenever a table is flushed, set lastFlushTime to null, go to step 2.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed pl track this with a JIRA so we don't forget about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jira created.

Copy link
Contributor

@sfc-gh-asen sfc-gh-asen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, had small comments on top of Hitesh's comments

&& !tablesToFlush.isEmpty()) {
tablesToFlush.addAll(
this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, even if interleaving is enabled, I'd prefer to keep the above logic for flushing and wait until the MaxClientLag for each channel

Copy link
Collaborator

@sfc-gh-hmadan sfc-gh-hmadan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please get Alkin's or Toby's signoff before merging!

return;
logFlushTask(isForce, tablesToFlush, flushStartTime);
distributeFlushTasks(tablesToFlush);
long flushEndTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is confusing, this is the end time of the previous flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to prevFlushEndTime.

Comment on lines 305 to 306
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1
&& !tablesToFlush.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the check before populating tablesToFlush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Preserve the client level lastFlushTime and isNeedFlush to avoid checking table level flush info when interleaving is enabled which might cause performance change. Preserve old logging format when interleaving is enabled to avoid logging too much information.

cc: @sfc-gh-hmadan

Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, otherwise LGTM

@@ -276,7 +276,7 @@ public void testDropChannel() throws Exception {
@Test
public void testParameterOverrides() throws Exception {
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3 sec");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParameterProvider does not support "sec" (ref). The old code somehow ignore the exception thrown in thread.

@sfc-gh-alhuang sfc-gh-alhuang merged commit 164b030 into master Jul 26, 2024
48 checks passed
@sfc-gh-alhuang sfc-gh-alhuang deleted the alhuang-table-level-flush branch July 26, 2024 22:17
@jdcaperon
Copy link

Thanks for solving this issue :D #570

sfc-gh-kgaputis pushed a commit to sfc-gh-kgaputis/snowflake-ingest-java that referenced this pull request Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants