Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
Expand All @@ -65,14 +63,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
public static final String FORMAT_CONFIG = "format";
private static final String FORMAT_DEFAULT = "";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH,
"The field containing the timestamp, or empty if the entire value is a timestamp")
.define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"The desired timestamp representation: string, unix, Date, Time, or Timestamp")
.define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM,
"A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string "
+ "or used to parse the input if the input is a string.");
public static final String UNIX_PRECISION_CONFIG = "unix.precision";
private static final String UNIX_PRECISION_DEFAULT = "milliseconds";

private static final String PURPOSE = "converting timestamp formats";

Expand All @@ -81,14 +73,37 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
private static final String TYPE_DATE = "Date";
private static final String TYPE_TIME = "Time";
private static final String TYPE_TIMESTAMP = "Timestamp";
private static final Set<String> VALID_TYPES = new HashSet<>(Arrays.asList(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP));

private static final String UNIX_PRECISION_MILLIS = "milliseconds";
private static final String UNIX_PRECISION_MICROS = "microseconds";
private static final String UNIX_PRECISION_NANOS = "nanoseconds";
private static final String UNIX_PRECISION_SECONDS = "seconds";

private static final TimeZone UTC = TimeZone.getTimeZone("UTC");

public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema();
public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema();
public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema();

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH,
"The field containing the timestamp, or empty if the entire value is a timestamp")
.define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.ValidString.in(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP),
ConfigDef.Importance.HIGH,
"The desired timestamp representation: string, unix, Date, Time, or Timestamp")
.define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM,
"A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string "
+ "or used to parse the input if the input is a string.")
.define(UNIX_PRECISION_CONFIG, ConfigDef.Type.STRING, UNIX_PRECISION_DEFAULT,
ConfigDef.ValidString.in(
UNIX_PRECISION_NANOS, UNIX_PRECISION_MICROS,
UNIX_PRECISION_MILLIS, UNIX_PRECISION_SECONDS),
ConfigDef.Importance.LOW,
"The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. " +
"Used to generate the output when type=unix or used to parse the input if the input is a Long." +
"Note: This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components.");

private interface TimestampTranslator {
/**
* Convert from the type-specific format to the universal java.util.Date format
Expand Down Expand Up @@ -139,7 +154,18 @@ public String toType(Config config, Date orig) {
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Long))
throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
return Timestamp.toLogical(Timestamp.SCHEMA, (Long) orig);
Long unixTime = (Long) orig;
switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS:
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime));
case UNIX_PRECISION_MICROS:
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
case UNIX_PRECISION_NANOS:
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime));
case UNIX_PRECISION_MILLIS:
default:
return Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
}
}

@Override
Expand All @@ -149,7 +175,18 @@ public Schema typeSchema(boolean isOptional) {

@Override
public Long toType(Config config, Date orig) {
return Timestamp.fromLogical(Timestamp.SCHEMA, orig);
Long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, orig);
switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS:
return TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
case UNIX_PRECISION_MICROS:
return TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
case UNIX_PRECISION_NANOS:
return TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
case UNIX_PRECISION_MILLIS:
default:
return unixTimeMillis;
}
}
});

Expand Down Expand Up @@ -230,14 +267,16 @@ public Date toType(Config config, Date orig) {
// This is a bit unusual, but allows the transformation config to be passed to static anonymous classes to customize
// their behavior
private static class Config {
Config(String field, String type, SimpleDateFormat format) {
Config(String field, String type, SimpleDateFormat format, String unixPrecision) {
this.field = field;
this.type = type;
this.format = format;
this.unixPrecision = unixPrecision;
}
String field;
String type;
SimpleDateFormat format;
String unixPrecision;
}
private Config config;
private Cache<Schema, Schema> schemaUpdateCache;
Expand All @@ -249,12 +288,9 @@ public void configure(Map<String, ?> configs) {
final String field = simpleConfig.getString(FIELD_CONFIG);
final String type = simpleConfig.getString(TARGET_TYPE_CONFIG);
String formatPattern = simpleConfig.getString(FORMAT_CONFIG);
final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));

if (!VALID_TYPES.contains(type)) {
throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are "
+ Utils.join(VALID_TYPES, ", ") + ".");
}
if (type.equals(TYPE_STRING) && Utils.isBlank(formatPattern)) {
throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps");
}
Expand All @@ -268,7 +304,7 @@ public void configure(Map<String, ?> configs) {
+ formatPattern, e);
}
}
config = new Config(field, type, format);
config = new Config(field, type, format, unixPrecision);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

public class TimestampConverterTest {
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
Expand All @@ -47,6 +48,9 @@ public class TimestampConverterTest {
private static final Calendar DATE;
private static final Calendar DATE_PLUS_TIME;
private static final long DATE_PLUS_TIME_UNIX;
private static final long DATE_PLUS_TIME_UNIX_MICROS;
private static final long DATE_PLUS_TIME_UNIX_NANOS;
private static final long DATE_PLUS_TIME_UNIX_SECONDS;
private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z";
private static final String DATE_PLUS_TIME_STRING;

Expand All @@ -70,8 +74,14 @@ public class TimestampConverterTest {
DATE_PLUS_TIME.setTimeInMillis(0L);
DATE_PLUS_TIME.add(Calendar.DATE, 1);
DATE_PLUS_TIME.add(Calendar.MILLISECOND, 1234);

// 86 401 234 milliseconds
DATE_PLUS_TIME_UNIX = DATE_PLUS_TIME.getTime().getTime();
// 86 401 234 123 microseconds
DATE_PLUS_TIME_UNIX_MICROS = DATE_PLUS_TIME_UNIX * 1000 + 123;
// 86 401 234 123 456 nanoseconds
DATE_PLUS_TIME_UNIX_NANOS = DATE_PLUS_TIME_UNIX_MICROS * 1000 + 456;
// 86401 seconds
DATE_PLUS_TIME_UNIX_SECONDS = DATE_PLUS_TIME.getTimeInMillis() / 1000;
DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC";
}

Expand All @@ -95,6 +105,22 @@ public void testConfigInvalidTargetType() {
() -> xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid")));
}

@Test
public void testConfigInvalidUnixPrecision() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "invalid");
assertThrows(ConfigException.class, () -> xformValue.configure(config));
}

@Test
public void testConfigValidUnixPrecision() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds");
assertDoesNotThrow(() -> xformValue.configure(config));
}

@Test
public void testConfigMissingFormat() {
assertThrows(ConfigException.class,
Expand Down Expand Up @@ -526,6 +552,123 @@ public void testWithSchemaFieldConversion() {
assertEquals("test", ((Struct) transformed.value()).get("other"));
}

@Test
public void testWithSchemaFieldConversion_Micros() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "microseconds");
xformValue.configure(config);

// ts field is a unix timestamp with microseconds precision
Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
.field("ts", Schema.INT64_SCHEMA)
.build();
Struct original = new Struct(structWithTimestampFieldSchema);
original.put("ts", DATE_PLUS_TIME_UNIX_MICROS);

SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original));

Schema expectedSchema = SchemaBuilder.struct()
.field("ts", Timestamp.SCHEMA)
.build();
assertEquals(expectedSchema, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts"));
}

@Test
public void testWithSchemaFieldConversion_Nanos() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "nanoseconds");
xformValue.configure(config);

// ts field is a unix timestamp with microseconds precision
Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
.field("ts", Schema.INT64_SCHEMA)
.build();
Struct original = new Struct(structWithTimestampFieldSchema);
original.put("ts", DATE_PLUS_TIME_UNIX_NANOS);

SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original));

Schema expectedSchema = SchemaBuilder.struct()
.field("ts", Timestamp.SCHEMA)
.build();
assertEquals(expectedSchema, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts"));
}

@Test
public void testWithSchemaFieldConversion_Seconds() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds");
xformValue.configure(config);

// ts field is a unix timestamp with seconds precision
Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
.field("ts", Schema.INT64_SCHEMA)
.build();
Struct original = new Struct(structWithTimestampFieldSchema);
original.put("ts", DATE_PLUS_TIME_UNIX_SECONDS);

SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original));

Calendar expectedDate = GregorianCalendar.getInstance(UTC);
expectedDate.setTimeInMillis(0L);
expectedDate.add(Calendar.DATE, 1);
expectedDate.add(Calendar.SECOND, 1);

Schema expectedSchema = SchemaBuilder.struct()
.field("ts", Timestamp.SCHEMA)
.build();
assertEquals(expectedSchema, transformed.valueSchema());
assertEquals(expectedDate.getTime(), ((Struct) transformed.value()).get("ts"));
}

@Test
public void testSchemalessStringToUnix_Micros() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "microseconds");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING));

assertNull(transformed.valueSchema());
// Conversion loss as expected, sub-millisecond part is not stored in pivot java.util.Date
assertEquals(86401234000L, transformed.value());
}

@Test
public void testSchemalessStringToUnix_Nanos() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "nanoseconds");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING));

assertNull(transformed.valueSchema());
// Conversion loss as expected, sub-millisecond part is not stored in pivot java.util.Date
assertEquals(86401234000000L, transformed.value());
}

@Test
public void testSchemalessStringToUnix_Seconds() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING));

assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX_SECONDS, transformed.value());
}

// Validate Key implementation in addition to Value

Expand Down