Skip to content

Commit

Permalink
SNOW-1859651 Salt prefix for files mapped from different topics (#1035)
Browse files Browse the repository at this point in the history
Given two topics that are setup to be ingested to a single table using Snowpipe ingestion, each of the topics has an assigned cleaner for deleting already ingested files. This change prevents these cleaners from incorrectly cleaning files from the topic that should be handled by the other cleaner.
Any topic that is mapped by the topic2table.map should have filename prefix salted with the topic as it is otherwise complicated (and unnecessary) to determine which other topic can have a clash on cleaning filenames.
  • Loading branch information
sfc-gh-dseweryn authored Jan 8, 2025
1 parent bf4ff0a commit ad29f14
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 30 deletions.
67 changes: 62 additions & 5 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,37 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
return !isSnowpipeIngestion(config);
}

/**
* Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was
* resolved by using the map passed to appropriate function. {@link
* Utils#generateTableName(String, Map)}
*/
public static class GeneratedName {
private final String name;
private final boolean isNameFromMap;

private GeneratedName(String name, boolean isNameFromMap) {
this.name = name;
this.isNameFromMap = isNameFromMap;
}

private static GeneratedName fromMap(String name) {
return new GeneratedName(name, true);
}

public static GeneratedName generated(String name) {
return new GeneratedName(name, false);
}

public String getName() {
return name;
}

public boolean isNameFromMap() {
return isNameFromMap;
}
}

/**
* modify invalid application name in config and return the generated application name
*
Expand All @@ -438,7 +469,7 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
public static void convertAppName(Map<String, String> config) {
String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, "");
// If appName is empty the following call will throw error
String validAppName = generateValidName(appName, new HashMap<String, String>());
String validAppName = generateValidName(appName, new HashMap<>());

config.put(SnowflakeSinkConnectorConfig.NAME, validAppName);
}
Expand All @@ -454,6 +485,20 @@ public static String tableName(String topic, Map<String, String> topic2table) {
return generateValidName(topic, topic2table);
}

/**
* Verify topic name and generate a valid table name. The returned GeneratedName has a flag
* isNameFromMap that indicates if the name was retrieved from the passed topic2table map which
* has particular outcomes for the SnowflakeSinkServiceV1
*
* @param topic input topic name
* @param topic2table topic to table map
* @return return GeneratedName with valid table name and a flag whether the name was taken from
* the topic2table
*/
public static GeneratedName generateTableName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table);
}

/**
* verify topic name, and generate valid table/application name
*
Expand All @@ -462,23 +507,35 @@ public static String tableName(String topic, Map<String, String> topic2table) {
* @return valid table/application name
*/
public static String generateValidName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table).name;
}

/**
* verify topic name, and generate valid table/application name
*
* @param topic input topic name
* @param topic2table topic to table map
* @return valid generated table/application name
*/
private static GeneratedName generateValidNameFromMap(
String topic, Map<String, String> topic2table) {
final String PLACE_HOLDER = "_";
if (topic == null || topic.isEmpty()) {
throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
}
if (topic2table.containsKey(topic)) {
return topic2table.get(topic);
return GeneratedName.fromMap(topic2table.get(topic));
}

// try matching regex tables
for (String regexTopic : topic2table.keySet()) {
if (topic.matches(regexTopic)) {
return topic2table.get(regexTopic);
return GeneratedName.fromMap(topic2table.get(regexTopic));
}
}

if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
return GeneratedName.generated(topic);
}
int hash = Math.abs(topic.hashCode());

Expand Down Expand Up @@ -507,7 +564,7 @@ public static String generateValidName(String topic, Map<String, String> topic2t
result.append(PLACE_HOLDER);
result.append(hash);

return result.toString();
return GeneratedName.generated(result.toString());
}

public static Map<String, String> parseTopicToTableMap(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) {
BigInteger partitionPart = BigInteger.valueOf(partition);
if (!Strings.isNullOrEmpty(topic)) {
// if topic is provided as part of the file prefix,
// 1. lets calculate stable hash code out of it,
// 1. let's calculate stable hash code out of it,
// 2. bit shift it by 16 bits left,
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// partitions per topic).
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
Expand Down Expand Up @@ -131,6 +130,17 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
*/
@Override
public void startPartition(final String tableName, final TopicPartition topicPartition) {
Utils.GeneratedName generatedTableName =
Utils.generateTableName(topicPartition.topic(), topic2TableMap);
if (!tableName.equals(generatedTableName.getName())) {
LOGGER.warn(
"tableNames do not match, this is acceptable in tests but not in production! Resorting to"
+ " originalName and assuming no potential clashes on file prefixes. original={},"
+ " recalculated={}",
tableName,
generatedTableName.getName());
generatedTableName = Utils.GeneratedName.generated(tableName);
}
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
Expand All @@ -142,7 +152,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar
pipes.put(
nameIndex,
new ServiceContext(
tableName,
generatedTableName,
stageName,
pipeName,
topicPartition.topic(),
Expand Down Expand Up @@ -486,44 +496,38 @@ private class ServiceContext {
private boolean forceCleanerFileReset = false;

private ServiceContext(
String tableName,
Utils.GeneratedName generatedTableName,
String stageName,
String pipeName,
String topicName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
this.pipeName = pipeName;
this.tableName = tableName;
this.tableName = generatedTableName.getName();
this.stageName = stageName;
this.conn = conn;
this.fileNames = new LinkedList<>();
this.cleanerFileNames = new LinkedList<>();
this.buffer = new SnowpipeBuffer();
this.ingestionService = conn.buildIngestService(stageName, pipeName);
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// condition
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure the file
// prefix is unique per topic - otherwise, file cleaners for different topics will try to
// clean the same prefixed files creating a race condition and a potential to delete
// not yet ingested files created by another topic
if (generatedTableName.isNameFromMap() && !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " '"
+ SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED
+ "' to true",
"The table {} may be used as ingestion target by multiple topics - including this one"
+ " '{}'.\nTo prevent potential data loss consider setting '{}' to true",
tableName,
topicName,
tableName);
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
{
final String topicForPrefix =
generatedTableName.isNameFromMap() && enableStageFilePrefixExtension ? topicName : "";
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition);
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicForPrefix, partition);
}
this.processedOffset = new AtomicLong(-1);
this.flushedOffset = new AtomicLong(-1);
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,37 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void testGenerateTableName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");

String topic0 = "ab@cd";
Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table);
Assert.assertEquals("abcd", generatedTableName1.getName());
Assert.assertTrue(generatedTableName1.isNameFromMap());

String topic1 = "1234";
Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table);
Assert.assertEquals("_1234", generatedTableName2.getName());
Assert.assertTrue(generatedTableName2.isNameFromMap());

String topic2 = "bc*def";
Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table);
Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.getName());
Assert.assertFalse(generatedTableName3.isNameFromMap());

String topic3 = "12345";
Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table);
Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.getName());
Assert.assertFalse(generatedTableName4.isNameFromMap());

TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));
//noinspection DataFlowIssue
TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table));
}

@Test
public void testTableNameRegex() {
String catTable = "cat_table";
Expand Down

0 comments on commit ad29f14

Please sign in to comment.