diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index a8d5cecb64a3b..3afd11522c70e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -37,14 +37,12 @@ import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; @@ -65,14 +63,8 @@ public abstract class TimestampConverter> implements public static final String FORMAT_CONFIG = "format"; private static final String FORMAT_DEFAULT = ""; - public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH, - "The field containing the timestamp, or empty if the entire value is a timestamp") - .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, - "The desired timestamp representation: string, unix, Date, Time, or Timestamp") - .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM, - "A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string " - + "or used to parse the input if the input is a string."); + public static final String UNIX_PRECISION_CONFIG = "unix.precision"; + private static final String UNIX_PRECISION_DEFAULT = "milliseconds"; private static final String PURPOSE = "converting timestamp formats"; @@ -81,7 +73,11 @@ public abstract class TimestampConverter> implements private static final String TYPE_DATE = "Date"; private static final String TYPE_TIME = "Time"; private static final String TYPE_TIMESTAMP = "Timestamp"; - private static final Set VALID_TYPES = new HashSet<>(Arrays.asList(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP)); + + private static final String UNIX_PRECISION_MILLIS = "milliseconds"; + private static final String UNIX_PRECISION_MICROS = "microseconds"; + private static final String UNIX_PRECISION_NANOS = "nanoseconds"; + private static final String UNIX_PRECISION_SECONDS = "seconds"; private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); @@ -89,6 +85,25 @@ public abstract class TimestampConverter> implements public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH, + "The field containing the timestamp, or empty if the entire value is a timestamp") + .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.ValidString.in(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP), + ConfigDef.Importance.HIGH, + "The desired timestamp representation: string, unix, Date, Time, or Timestamp") + .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM, + "A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string " + + "or used to parse the input if the input is a string.") + .define(UNIX_PRECISION_CONFIG, ConfigDef.Type.STRING, UNIX_PRECISION_DEFAULT, + ConfigDef.ValidString.in( + UNIX_PRECISION_NANOS, UNIX_PRECISION_MICROS, + UNIX_PRECISION_MILLIS, UNIX_PRECISION_SECONDS), + ConfigDef.Importance.LOW, + "The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. " + + "Used to generate the output when type=unix or used to parse the input if the input is a Long." + + "Note: This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components."); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -139,7 +154,18 @@ public String toType(Config config, Date orig) { public Date toRaw(Config config, Object orig) { if (!(orig instanceof Long)) throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass()); - return Timestamp.toLogical(Timestamp.SCHEMA, (Long) orig); + Long unixTime = (Long) orig; + switch (config.unixPrecision) { + case UNIX_PRECISION_SECONDS: + return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime)); + case UNIX_PRECISION_MICROS: + return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime)); + case UNIX_PRECISION_NANOS: + return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime)); + case UNIX_PRECISION_MILLIS: + default: + return Timestamp.toLogical(Timestamp.SCHEMA, unixTime); + } } @Override @@ -149,7 +175,18 @@ public Schema typeSchema(boolean isOptional) { @Override public Long toType(Config config, Date orig) { - return Timestamp.fromLogical(Timestamp.SCHEMA, orig); + Long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, orig); + switch (config.unixPrecision) { + case UNIX_PRECISION_SECONDS: + return TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis); + case UNIX_PRECISION_MICROS: + return TimeUnit.MILLISECONDS.toMicros(unixTimeMillis); + case UNIX_PRECISION_NANOS: + return TimeUnit.MILLISECONDS.toNanos(unixTimeMillis); + case UNIX_PRECISION_MILLIS: + default: + return unixTimeMillis; + } } }); @@ -230,14 +267,16 @@ public Date toType(Config config, Date orig) { // This is a bit unusual, but allows the transformation config to be passed to static anonymous classes to customize // their behavior private static class Config { - Config(String field, String type, SimpleDateFormat format) { + Config(String field, String type, SimpleDateFormat format, String unixPrecision) { this.field = field; this.type = type; this.format = format; + this.unixPrecision = unixPrecision; } String field; String type; SimpleDateFormat format; + String unixPrecision; } private Config config; private Cache schemaUpdateCache; @@ -249,12 +288,9 @@ public void configure(Map configs) { final String field = simpleConfig.getString(FIELD_CONFIG); final String type = simpleConfig.getString(TARGET_TYPE_CONFIG); String formatPattern = simpleConfig.getString(FORMAT_CONFIG); + final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG); schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); - if (!VALID_TYPES.contains(type)) { - throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are " - + Utils.join(VALID_TYPES, ", ") + "."); - } if (type.equals(TYPE_STRING) && Utils.isBlank(formatPattern)) { throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps"); } @@ -268,7 +304,7 @@ public void configure(Map configs) { + formatPattern, e); } } - config = new Config(field, type, format); + config = new Config(field, type, format, unixPrecision); } @Override diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java index 212b9eefb444a..2c746d3f154e8 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java @@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public class TimestampConverterTest { private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); @@ -47,6 +48,9 @@ public class TimestampConverterTest { private static final Calendar DATE; private static final Calendar DATE_PLUS_TIME; private static final long DATE_PLUS_TIME_UNIX; + private static final long DATE_PLUS_TIME_UNIX_MICROS; + private static final long DATE_PLUS_TIME_UNIX_NANOS; + private static final long DATE_PLUS_TIME_UNIX_SECONDS; private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z"; private static final String DATE_PLUS_TIME_STRING; @@ -70,8 +74,14 @@ public class TimestampConverterTest { DATE_PLUS_TIME.setTimeInMillis(0L); DATE_PLUS_TIME.add(Calendar.DATE, 1); DATE_PLUS_TIME.add(Calendar.MILLISECOND, 1234); - + // 86 401 234 milliseconds DATE_PLUS_TIME_UNIX = DATE_PLUS_TIME.getTime().getTime(); + // 86 401 234 123 microseconds + DATE_PLUS_TIME_UNIX_MICROS = DATE_PLUS_TIME_UNIX * 1000 + 123; + // 86 401 234 123 456 nanoseconds + DATE_PLUS_TIME_UNIX_NANOS = DATE_PLUS_TIME_UNIX_MICROS * 1000 + 456; + // 86401 seconds + DATE_PLUS_TIME_UNIX_SECONDS = DATE_PLUS_TIME.getTimeInMillis() / 1000; DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC"; } @@ -95,6 +105,22 @@ public void testConfigInvalidTargetType() { () -> xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"))); } + @Test + public void testConfigInvalidUnixPrecision() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "invalid"); + assertThrows(ConfigException.class, () -> xformValue.configure(config)); + } + + @Test + public void testConfigValidUnixPrecision() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); + assertDoesNotThrow(() -> xformValue.configure(config)); + } + @Test public void testConfigMissingFormat() { assertThrows(ConfigException.class, @@ -526,6 +552,123 @@ public void testWithSchemaFieldConversion() { assertEquals("test", ((Struct) transformed.value()).get("other")); } + @Test + public void testWithSchemaFieldConversion_Micros() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "microseconds"); + xformValue.configure(config); + + // ts field is a unix timestamp with microseconds precision + Schema structWithTimestampFieldSchema = SchemaBuilder.struct() + .field("ts", Schema.INT64_SCHEMA) + .build(); + Struct original = new Struct(structWithTimestampFieldSchema); + original.put("ts", DATE_PLUS_TIME_UNIX_MICROS); + + SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original)); + + Schema expectedSchema = SchemaBuilder.struct() + .field("ts", Timestamp.SCHEMA) + .build(); + assertEquals(expectedSchema, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts")); + } + + @Test + public void testWithSchemaFieldConversion_Nanos() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "nanoseconds"); + xformValue.configure(config); + + // ts field is a unix timestamp with microseconds precision + Schema structWithTimestampFieldSchema = SchemaBuilder.struct() + .field("ts", Schema.INT64_SCHEMA) + .build(); + Struct original = new Struct(structWithTimestampFieldSchema); + original.put("ts", DATE_PLUS_TIME_UNIX_NANOS); + + SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original)); + + Schema expectedSchema = SchemaBuilder.struct() + .field("ts", Timestamp.SCHEMA) + .build(); + assertEquals(expectedSchema, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts")); + } + + @Test + public void testWithSchemaFieldConversion_Seconds() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); + xformValue.configure(config); + + // ts field is a unix timestamp with seconds precision + Schema structWithTimestampFieldSchema = SchemaBuilder.struct() + .field("ts", Schema.INT64_SCHEMA) + .build(); + Struct original = new Struct(structWithTimestampFieldSchema); + original.put("ts", DATE_PLUS_TIME_UNIX_SECONDS); + + SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original)); + + Calendar expectedDate = GregorianCalendar.getInstance(UTC); + expectedDate.setTimeInMillis(0L); + expectedDate.add(Calendar.DATE, 1); + expectedDate.add(Calendar.SECOND, 1); + + Schema expectedSchema = SchemaBuilder.struct() + .field("ts", Timestamp.SCHEMA) + .build(); + assertEquals(expectedSchema, transformed.valueSchema()); + assertEquals(expectedDate.getTime(), ((Struct) transformed.value()).get("ts")); + } + + @Test + public void testSchemalessStringToUnix_Micros() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "microseconds"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING)); + + assertNull(transformed.valueSchema()); + // Conversion loss as expected, sub-millisecond part is not stored in pivot java.util.Date + assertEquals(86401234000L, transformed.value()); + } + + @Test + public void testSchemalessStringToUnix_Nanos() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "nanoseconds"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING)); + + assertNull(transformed.valueSchema()); + // Conversion loss as expected, sub-millisecond part is not stored in pivot java.util.Date + assertEquals(86401234000000L, transformed.value()); + } + + @Test + public void testSchemalessStringToUnix_Seconds() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING)); + + assertNull(transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME_UNIX_SECONDS, transformed.value()); + } // Validate Key implementation in addition to Value