Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1512047 Introduce independent per-table flushes when interleaving is disabled #788

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import java.util.Iterator;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -23,6 +25,12 @@ class ChannelCache<T> {
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>
cache = new ConcurrentHashMap<>();

// Last flush time for each table, the key is FullyQualifiedTableName.
sfc-gh-hmadan marked this conversation as resolved.
Show resolved Hide resolved
private final ConcurrentHashMap<String, Long> lastFlushTime = new ConcurrentHashMap<>();

// Need flush flag for each table, the key is FullyQualifiedTableName.
private final ConcurrentHashMap<String, Boolean> needFlush = new ConcurrentHashMap<>();
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

/**
* Add a channel to the channel cache
*
Expand All @@ -33,6 +41,11 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());

// Update the last flush time for the table, add jitter to avoid all channels flush at the same
// time when the blobs are not interleaved
this.lastFlushTime.putIfAbsent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand what this helps with. If someone does addChannel, and doesn't add any data for a minute, the first row that they add will trigger a flush since we'll mistakenly think its been a long time since the last flush.

Copy link
Contributor Author

@sfc-gh-alhuang sfc-gh-alhuang Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Should we change the logic to following?

  1. Don't edit lastFlushTime when creating a channel.
  2. When calling putRow or putRows, if lastFlushTime is null, set to current time.
  3. Whenever a table is flushed, set lastFlushTime to null, go to step 2.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed pl track this with a JIRA so we don't forget about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jira created.

channel.getFullyQualifiedTableName(), System.currentTimeMillis() + getRandomFlushJitter());

SnowflakeStreamingIngestChannelInternal<T> oldChannel =
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
Expand All @@ -44,13 +57,54 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
}

/**
* Returns an iterator over the (table, channels) in this map.
* Get the last flush time for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return last flush time in milliseconds
*/
Optional<Long> getLastFlushTime(String fullyQualifiedTableName) {
return Optional.ofNullable(this.lastFlushTime.get(fullyQualifiedTableName));
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Set the last flush time for a table as the current time
*
* @param fullyQualifiedTableName fully qualified table name
* @param lastFlushTime last flush time in milliseconds
*/
void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime + getRandomFlushJitter());
}

/**
* Get need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return need flush flag
*/
Boolean getNeedFlush(String fullyQualifiedTableName) {
return this.needFlush.getOrDefault(fullyQualifiedTableName, false);
}

/**
* Set need flush flag for a table
*
* @return
* @param fullyQualifiedTableName fully qualified table name
* @param needFlush need flush flag
*/
Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
iterator() {
return this.cache.entrySet().iterator();
void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
this.needFlush.put(fullyQualifiedTableName, needFlush);
}

/** Returns an immutable set view of the mappings contained in the channel cache. */
Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
entrySet() {
return Collections.unmodifiableSet(cache.entrySet());
}

/** Returns an immutable set view of the keys contained in the channel cache. */
Set<String> keySet() {
return Collections.unmodifiableSet(cache.keySet());
}

/** Close all channels in the channel cache */
Expand Down Expand Up @@ -101,4 +155,8 @@ void invalidateChannelIfSequencersMatch(
int getSize() {
return cache.size();
}

sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
private long getRandomFlushJitter() {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
return (long) (Math.random() * 1000);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
Expand Down Expand Up @@ -108,12 +110,6 @@ List<List<ChannelData<T>>> getData() {
// Reference to register service
private final RegisterService<T> registerService;

// Indicates whether we need to schedule a flush
@VisibleForTesting volatile boolean isNeedFlush;

// Latest flush time
@VisibleForTesting volatile long lastFlushTime;

// Indicates whether it's running as part of the test
private final boolean isTestMode;

Expand Down Expand Up @@ -141,8 +137,6 @@ List<List<ChannelData<T>>> getData() {
this.targetStage = targetStage;
this.counter = new AtomicLong(0);
this.registerService = new RegisterService<>(client, isTestMode);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
Expand Down Expand Up @@ -175,8 +169,6 @@ List<List<ChannelData<T>>> getData() {

this.registerService = new RegisterService<>(client, isTestMode);
this.counter = new AtomicLong(0);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
Expand Down Expand Up @@ -204,36 +196,55 @@ private CompletableFuture<Void> statsFuture() {

/**
* @param isForce if true will flush regardless of other conditions
* @param timeDiffMillis Time in milliseconds since the last flush
* @param tablesToFlush list of tables to flush
* @param flushStartTime the time when the flush started
* @return
*/
private CompletableFuture<Void> distributeFlush(boolean isForce, long timeDiffMillis) {
private CompletableFuture<Void> distributeFlush(
boolean isForce, Set<String> tablesToFlush, Long flushStartTime) {
return CompletableFuture.runAsync(
() -> {
logFlushTask(isForce, timeDiffMillis);
distributeFlushTasks();
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
return;
logFlushTask(isForce, tablesToFlush, flushStartTime);
distributeFlushTasks(tablesToFlush);
long flushEndTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is confusing, this is the end time of the previous flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to prevFlushEndTime.

tablesToFlush.forEach(
table -> {
this.channelCache.setLastFlushTime(table, flushEndTime);
this.channelCache.setNeedFlush(table, false);
});
},
this.flushWorker);
}

/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
private void logFlushTask(boolean isForce, long timeDiffMillis) {
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long flushStartTime) {
boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush);
long currentTime = System.currentTimeMillis();

final String tablesToFlushLogFormat =
tablesToFlush.stream()
.map(
table ->
String.format(
"(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)",
table,
channelCache.getNeedFlush(table),
channelCache.getLastFlushTime(table).isPresent()
? flushStartTime - channelCache.getLastFlushTime(table).get()
: "N/A",
channelCache.getLastFlushTime(table).isPresent()
? currentTime - channelCache.getLastFlushTime(table).get()
: "N/A"))
.collect(Collectors.joining(", "));

final String flushTaskLogFormat =
String.format(
"Submit forced or ad-hoc flush task on client=%s, isForce=%s,"
+ " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
this.owningClient.getName(),
isForce,
this.isNeedFlush,
timeDiffMillis,
System.currentTimeMillis() - this.lastFlushTime);
"Submit forced or ad-hoc flush task on client=%s, isForce=%s," + " Tables=[%s]",
this.owningClient.getName(), isForce, tablesToFlushLogFormat);
if (logger.isTraceEnabled()) {
logger.logTrace(flushTaskLogFormat);
}
if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) {
if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) {
logger.logDebug(flushTaskLogFormat);
}
}
Expand All @@ -249,27 +260,56 @@ private CompletableFuture<Void> registerFuture() {
}

/**
* Kick off a flush job and distribute the tasks if one of the following conditions is met:
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* Kick off a flush job and distribute the tasks. The flush service behaves differently based on
* the max chunks in blob:
*
* <ul>
* <li>The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed
* together if one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* </ul>
* <li>The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if
* one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers with the same target table as the channel have reached the
* flush size
* <li>Periodical background flush of the target table when a time interval has reached
* </ul>
* </ul>
*
* @param isForce
* @return Completable future that will return when the blobs are registered successfully, or null
* if none of the conditions is met above
*/
CompletableFuture<Void> flush(boolean isForce) {
long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime;

if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
&& (this.isNeedFlush
|| timeDiffMillis
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {
long flushStartTime = System.currentTimeMillis();

Set<String> tablesToFlush =
this.channelCache.keySet().stream()
.filter(
key ->
isForce
|| (this.channelCache.getLastFlushTime(key).isPresent()
&& flushStartTime - this.channelCache.getLastFlushTime(key).get()
>= this.owningClient
.getParameterProvider()
.getCachedMaxClientLagInMs())
|| this.channelCache.getNeedFlush(key))
.collect(Collectors.toSet());

// Flush every table together when it's interleaving chunk is allowed
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1
&& !tablesToFlush.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the check before populating tablesToFlush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Preserve the client level lastFlushTime and isNeedFlush to avoid checking table level flush info when interleaving is enabled which might cause performance change. Preserve old logging format when interleaving is enabled to avoid logging too much information.

cc: @sfc-gh-hmadan

tablesToFlush.addAll(this.channelCache.keySet());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this, If the previous code block already picked up the minimal set of tables needing flush?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, even if interleaving is enabled, I'd prefer to keep the above logic for flushing and wait until the MaxClientLag for each channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aimed to maintain the original interleaving behavior, where all channels are flushed if any channel needs it. With independent flushing intervals, we might miss the chance to combine multiple chunks into the same BDEC. A potential workaround is to discretize timestamps and reduce jitter on lastFlushTime in interleaving mode. This can increase the chances of combining multiple chunks into the same blob. What do you think?


if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) {
return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
.thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime))
.thenCompose((v) -> this.registerFuture());
}
return this.statsFuture();
Expand Down Expand Up @@ -352,12 +392,17 @@ private void createWorkers() {
/**
* Distribute the flush tasks by iterating through all the channels in the channel cache and kick
* off a build blob work when certain size has reached or we have reached the end
*
* @param tablesToFlush list of tables to flush
*/
void distributeFlushTasks() {
void distributeFlushTasks(Set<String> tablesToFlush) {
Iterator<
Map.Entry<
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
itr = this.channelCache.iterator();
itr =
this.channelCache.entrySet().stream()
.filter(e -> tablesToFlush.contains(e.getKey()))
.iterator();
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

Expand Down Expand Up @@ -630,9 +675,13 @@ void shutdown() throws InterruptedException {
}
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
this.isNeedFlush = true;
/**
* Set the flag to indicate that a flush is needed
*
* @param fullyQualifiedTableName the fully qualified table name
*/
void setNeedFlush(String fullyQualifiedTableName) {
this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -413,7 +413,7 @@ public InsertValidationResponse insertRows(
// if a large number of rows are inserted
if (this.rowBuffer.getSize()
>= this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) {
this.owningClient.setNeedFlush();
this.owningClient.setNeedFlush(this.channelFlushContext.getFullyQualifiedTableName());
}

return response;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -820,8 +820,8 @@ CompletableFuture<Void> flush(boolean closing) {
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
this.flushService.setNeedFlush();
void setNeedFlush(String fullyQualifiedTableName) {
this.flushService.setNeedFlush(fullyQualifiedTableName);
}

/** Remove the channel in the channel cache if the channel sequencer matches */
Expand Down
Loading
Loading