From ebd4f6cf3b8dbb06efd29632d7aa88fc3482174a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 6 Oct 2019 00:41:17 +0900 Subject: [PATCH 01/19] Create FUNDING.yml --- .github/FUNDING.yml | 1 + 1 file changed, 1 insertion(+) create mode 100644 .github/FUNDING.yml diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..0c97a49 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +custom: https://www.paypal.me/civitaspo From 410e9d258795c643f5bffa6c0b3385034d4c4073 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 31 Oct 2019 09:38:53 +0900 Subject: [PATCH 02/19] Update FUNDING.yml --- .github/FUNDING.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index 0c97a49..0b8e4cf 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1 +1 @@ -custom: https://www.paypal.me/civitaspo +github: civitaspo From 139d8e27fe6b658ee9d9f2c6802810a4b16c8956 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Tue, 5 Nov 2019 01:21:29 +0900 Subject: [PATCH 03/19] Support Logical Types older representations(OriginalTypes) roughly --- example/with_logicaltypes.yml | 35 +++++++++ .../s3_parquet/S3ParquetOutputPlugin.scala | 18 ++++- .../parquet/EmbulkMessageType.scala | 39 +++++++--- .../parquet/LogicalTypeHandler.scala | 58 ++++++++++++++ .../parquet/LogicalTypeHandlerStore.scala | 78 +++++++++++++++++++ .../parquet/ParquetFileWriteSupport.scala | 6 +- .../parquet/ParquetFileWriter.scala | 39 +++++++--- 7 files changed, 247 insertions(+), 26 deletions(-) create mode 100644 example/with_logicaltypes.yml create mode 100644 src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala create mode 100644 src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala diff --git a/example/with_logicaltypes.yml b/example/with_logicaltypes.yml new file mode 100644 index 0000000..c0b5748 --- /dev/null +++ b/example/with_logicaltypes.yml @@ -0,0 +1,35 @@ + +in: + type: file + path_prefix: ./example/data.tsv + parser: + type: csv + delimiter: "\t" + skip_header_lines: 0 + null_string: "" + columns: + - { name: id, type: long } + - { name: description, type: string } + - { name: name, type: string } + - { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z"} + - { name: payload, type: json} + stop_on_invalid_record: true + +out: + type: s3_parquet + bucket: my-bucket + path_prefix: path/to/my-obj. + file_ext: snappy.parquet + compression_codec: snappy + default_timezone: Asia/Tokyo + canned_acl: bucket-owner-full-control + column_options: + # It has higher priority than type_options.timestamp + t: + logical_type: "timestamp-micros" + type_options: + json: + logical_type: "json" + # It should be ignored by column_options on 't' column + timestamp: + logical_type: "timestamp-millis" diff --git a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala index 2c6b520..5ed9012 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala @@ -11,7 +11,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.embulk.config.{Config, ConfigDefault, ConfigDiff, ConfigException, ConfigSource, Task, TaskReport, TaskSource} import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.PluginTask import org.embulk.output.s3_parquet.aws.Aws -import org.embulk.output.s3_parquet.parquet.ParquetFileWriter +import org.embulk.output.s3_parquet.parquet.{LogicalTypeHandlerStore, ParquetFileWriter} import org.embulk.spi.{Exec, OutputPlugin, PageReader, Schema, TransactionalPageOutput} import org.embulk.spi.time.TimestampFormatter import org.embulk.spi.time.TimestampFormatter.TimestampColumnOption @@ -53,7 +53,7 @@ object S3ParquetOutputPlugin @Config("column_options") @ConfigDefault("{}") - def getColumnOptions: JMap[String, TimestampColumnOption] + def getColumnOptions: JMap[String, ColumnOptionTask] @Config("canned_acl") @ConfigDefault("\"private\"") @@ -86,8 +86,20 @@ object S3ParquetOutputPlugin @Config("catalog") @ConfigDefault("null") def getCatalog: Optional[CatalogRegistrator.Task] + + @Config("type_options") + @ConfigDefault("{}") + def getTypeOptions: JMap[String, TypeOptionTask] } + trait ColumnOptionTask extends Task with TimestampColumnOption with LogicalTypeOption + + trait TypeOptionTask extends Task with LogicalTypeOption + + trait LogicalTypeOption { + @Config("logical_type") + def getLogicalType: Optional[String] + } } class S3ParquetOutputPlugin @@ -198,9 +210,11 @@ class S3ParquetOutputPlugin val pageReader: PageReader = new PageReader(schema) val aws: Aws = Aws(task) val timestampFormatters: Seq[TimestampFormatter] = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions).toSeq + val logicalTypeHandlers = LogicalTypeHandlerStore.fromEmbulkOptions(task.getTypeOptions, task.getColumnOptions) val parquetWriter: ParquetWriter[PageReader] = ParquetFileWriter.builder() .withPath(bufferFile) .withSchema(schema) + .withLogicalTypeHandlers(logicalTypeHandlers) .withTimestampFormatters(timestampFormatters) .withCompressionCodec(task.getCompressionCodec) .withDictionaryEncoding(task.getEnableDictionaryEncoding.orElse(ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED)) diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala index 31906d4..2e57d66 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala @@ -1,6 +1,5 @@ package org.embulk.output.s3_parquet.parquet - import com.google.common.collect.ImmutableList import org.apache.parquet.schema.{MessageType, OriginalType, PrimitiveType, Type} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -16,30 +15,36 @@ object EmbulkMessageType } case class Builder(name: String = "embulk", - schema: Schema = Schema.builder().build()) + schema: Schema = Schema.builder().build(), + logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty) { def withName(name: String): Builder = { - Builder(name = name, schema = schema) + Builder(name = name, schema = schema, logicalTypeHandlers = logicalTypeHandlers) } def withSchema(schema: Schema): Builder = { - Builder(name = name, schema = schema) + Builder(name = name, schema = schema, logicalTypeHandlers = logicalTypeHandlers) + } + + def withLogicalTypeHandlers(logicalTypeHandlers: LogicalTypeHandlerStore): Builder = + { + Builder(name = name, schema = schema, logicalTypeHandlers = logicalTypeHandlers) } def build(): MessageType = { val builder: ImmutableList.Builder[Type] = ImmutableList.builder[Type]() - schema.visitColumns(EmbulkMessageTypeColumnVisitor(builder)) + schema.visitColumns(EmbulkMessageTypeColumnVisitor(builder, logicalTypeHandlers)) new MessageType("embulk", builder.build()) - } } - private case class EmbulkMessageTypeColumnVisitor(builder: ImmutableList.Builder[Type]) + private case class EmbulkMessageTypeColumnVisitor(builder: ImmutableList.Builder[Type], + logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty) extends ColumnVisitor { @@ -65,14 +70,26 @@ object EmbulkMessageType override def timestampColumn(column: Column): Unit = { - // TODO: Support OriginalType.TIME* ? - builder.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName, OriginalType.UTF8)) + val name = column.getName + + val t = logicalTypeHandlers.get(column.getType, name) match { + case Some(h) => h.newSchemaFieldType(name) + case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) + } + + builder.add(t) } override def jsonColumn(column: Column): Unit = { - // TODO: does this work? - builder.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName, OriginalType.UTF8)) + val name = column.getName + + val t = logicalTypeHandlers.get(column.getType, name) match { + case Some(h) => h.newSchemaFieldType(name) + case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) + } + + builder.add(t) } } diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala new file mode 100644 index 0000000..5b33a9b --- /dev/null +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala @@ -0,0 +1,58 @@ +package org.embulk.output.s3_parquet.parquet + +import org.apache.parquet.io.api.{Binary, RecordConsumer} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.{OriginalType, PrimitiveType, Type} +import org.embulk.spi.time.Timestamp +import org.msgpack.value.Value + +/** + * Handle Apache Parquet 'Logical Types' on schema/value conversion. + * ref. https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + * + * It focuses on only older representation because newer supported since 1.11 is not used actually yet. + * TODO Support both of older and newer representation after 1.11+ is published and other middleware supports it. + * + */ +trait LogicalTypeHandler { + def newSchemaFieldType(name: String): PrimitiveType + + def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit +} + +case class TimestampMillisLogicalTypeHandler() extends LogicalTypeHandler { + override def newSchemaFieldType(name: String): PrimitiveType = + new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MILLIS) + + override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = + orig match { + case ts: Timestamp => recordConsumer.addLong(ts.toEpochMilli) + case _ => throw new IllegalArgumentException("given mismatched type value") + } +} + +case class TimestampMicrosLogicalTypeHandler() extends LogicalTypeHandler { + override def newSchemaFieldType(name: String): PrimitiveType = + new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MICROS) + + override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = + orig match { + case ts: Timestamp => + val v = (ts.getEpochSecond * 1_000_000L) + (ts.getNano.asInstanceOf[Long] / 1_000L) + recordConsumer.addLong(v) + case _ => throw new IllegalArgumentException("given mismatched type value") + } +} + +case class JsonLogicalTypeHandler() extends LogicalTypeHandler { + override def newSchemaFieldType(name: String): PrimitiveType = + new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.JSON) + + override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = + orig match { + case msgPack: Value => + val bin = Binary.fromString(msgPack.toJson) + recordConsumer.addBinary(bin) + case _ => throw new IllegalArgumentException("given mismatched type value") + } +} diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala new file mode 100644 index 0000000..02f236e --- /dev/null +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala @@ -0,0 +1,78 @@ +package org.embulk.output.s3_parquet.parquet + +import org.embulk.spi.`type`.{Type, Types} +import java.util.{Map => JMap} + +import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ColumnOptionTask, TypeOptionTask} + +import scala.jdk.CollectionConverters._ + +/** + * A storage has mapping from logical type query (column name, type) to handler. + * + * @param fromEmbulkType + * @param fromColumnName + */ +case class LogicalTypeHandlerStore private (fromEmbulkType: Map[Type, LogicalTypeHandler], + fromColumnName: Map[String, LogicalTypeHandler]) { + + // Try column name lookup, then column type + def get(n: String, t: Type): Option[LogicalTypeHandler] = + get(n) match { + case Some(h) => Some(h) + case _ => + get(t) match { + case Some(h) => Some(h) + case _ => None + } + } + + def get(t: Type): Option[LogicalTypeHandler] = + fromEmbulkType.get(t) + + def get(n: String): Option[LogicalTypeHandler] = + fromColumnName.get(n) +} + +object LogicalTypeHandlerStore { + private val STRING_TO_EMBULK_TYPE = Map[String, Type]( + "boolean" -> Types.BOOLEAN, + "long" -> Types.LONG, + "double" -> Types.DOUBLE, + "string" -> Types.STRING, + "timestamp" -> Types.TIMESTAMP, + "json" -> Types.JSON + ) + + // Listed only older logical types that we can convert from embulk type + private val STRING_TO_LOGICAL_TYPE = Map[String, LogicalTypeHandler]( + "timestamp-millis" -> TimestampMillisLogicalTypeHandler(), + "timestamp-micros" -> TimestampMicrosLogicalTypeHandler(), + "json" -> JsonLogicalTypeHandler() + // TODO other types ... + ) + + def empty: LogicalTypeHandlerStore = + LogicalTypeHandlerStore(Map.empty[Type, LogicalTypeHandler], Map.empty[String, LogicalTypeHandler]) + + def fromEmbulkOptions(typeOpts: JMap[String, TypeOptionTask], columnOpts: JMap[String, ColumnOptionTask]): LogicalTypeHandlerStore = { + val fromEmbulkType = typeOpts.asScala + .filter(_._2.getLogicalType.isPresent) + .map[Type, LogicalTypeHandler] { case (k, v) => + val t = STRING_TO_EMBULK_TYPE(k) + val h = STRING_TO_LOGICAL_TYPE(v.getLogicalType.get) + (t, h) + } + .toMap + + val fromColumnName = columnOpts.asScala + .filter(_._2.getLogicalType.isPresent) + .map[String, LogicalTypeHandler] { case (k, v) => + val h = STRING_TO_LOGICAL_TYPE(v.getLogicalType.get) + (k, h) + } + .toMap + + LogicalTypeHandlerStore(fromEmbulkType, fromColumnName) + } +} diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala index b140ad8..b0deccf 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala @@ -13,7 +13,8 @@ import scala.jdk.CollectionConverters._ private[parquet] case class ParquetFileWriteSupport(schema: Schema, - timestampFormatters: Seq[TimestampFormatter]) + timestampFormatters: Seq[TimestampFormatter], + logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty) extends WriteSupport[PageReader] { @@ -23,6 +24,7 @@ private[parquet] case class ParquetFileWriteSupport(schema: Schema, { val messageType: MessageType = EmbulkMessageType.builder() .withSchema(schema) + .withLogicalTypeHandlers(logicalTypeHandlers) .build() val metadata: Map[String, String] = Map.empty // NOTE: When is this used? new WriteContext(messageType, metadata.asJava) @@ -30,7 +32,7 @@ private[parquet] case class ParquetFileWriteSupport(schema: Schema, override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { - currentParquetFileWriter = ParquetFileWriter(recordConsumer, schema, timestampFormatters) + currentParquetFileWriter = ParquetFileWriter(recordConsumer, schema, timestampFormatters, logicalTypeHandlers) } override def write(record: PageReader): Unit = diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala index 0d0dd26..444270e 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala @@ -15,7 +15,8 @@ object ParquetFileWriter case class Builder(path: Path = null, schema: Schema = null, - timestampFormatters: Seq[TimestampFormatter] = null) + timestampFormatters: Seq[TimestampFormatter] = null, + logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty) extends ParquetWriter.Builder[PageReader, Builder](path) { @@ -39,6 +40,11 @@ object ParquetFileWriter copy(timestampFormatters = timestampFormatters) } + def withLogicalTypeHandlers(logicalTypeHandlers: LogicalTypeHandlerStore): Builder = + { + copy(logicalTypeHandlers = logicalTypeHandlers) + } + override def self(): Builder = { this @@ -46,7 +52,7 @@ object ParquetFileWriter override def getWriteSupport(conf: Configuration): WriteSupport[PageReader] = { - ParquetFileWriteSupport(schema, timestampFormatters) + ParquetFileWriteSupport(schema, timestampFormatters, logicalTypeHandlers) } } @@ -60,10 +66,11 @@ object ParquetFileWriter private[parquet] case class ParquetFileWriter(recordConsumer: RecordConsumer, schema: Schema, - timestampFormatters: Seq[TimestampFormatter]) + timestampFormatters: Seq[TimestampFormatter], + logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty) { - def write(record: PageReader): Unit = + def write(record: PageReader): Unit = { recordConsumer.startMessage() writeRecord(record) @@ -117,11 +124,16 @@ private[parquet] case class ParquetFileWriter(recordConsumer: RecordConsumer, { nullOr(column, { withWriteFieldContext(column, { - // TODO: is a correct way to convert for parquet ? val t = record.getTimestamp(column) - val ft = timestampFormatters(column.getIndex).format(t) - val bin = Binary.fromString(ft) - recordConsumer.addBinary(bin) + + logicalTypeHandlers.get(column.getName, column.getType) match { + case Some(h) => + h.consume(t, recordConsumer) + case _ => + val ft = timestampFormatters(column.getIndex).format(t) + val bin = Binary.fromString(ft) + recordConsumer.addBinary(bin) + } }) }) } @@ -130,10 +142,15 @@ private[parquet] case class ParquetFileWriter(recordConsumer: RecordConsumer, { nullOr(column, { withWriteFieldContext(column, { - // TODO: is a correct way to convert for parquet ? val msgPack = record.getJson(column) - val bin = Binary.fromString(msgPack.toJson) - recordConsumer.addBinary(bin) + + logicalTypeHandlers.get(column.getName, column.getType) match { + case Some(h) => + h.consume(msgPack, recordConsumer) + case _ => + val bin = Binary.fromString(msgPack.toJson) + recordConsumer.addBinary(bin) + } }) }) } From d77d63b41a497c365728d18d40e5581e02ff70b7 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Tue, 12 Nov 2019 01:42:04 +0900 Subject: [PATCH 04/19] Use DataException on processing actual datum --- .../output/s3_parquet/parquet/LogicalTypeHandler.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala index 5b33a9b..e1c1865 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala @@ -3,6 +3,7 @@ package org.embulk.output.s3_parquet.parquet import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.{OriginalType, PrimitiveType, Type} +import org.embulk.spi.DataException import org.embulk.spi.time.Timestamp import org.msgpack.value.Value @@ -27,7 +28,7 @@ case class TimestampMillisLogicalTypeHandler() extends LogicalTypeHandler { override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = orig match { case ts: Timestamp => recordConsumer.addLong(ts.toEpochMilli) - case _ => throw new IllegalArgumentException("given mismatched type value") + case _ => throw new DataException("given mismatched type value") } } @@ -40,7 +41,7 @@ case class TimestampMicrosLogicalTypeHandler() extends LogicalTypeHandler { case ts: Timestamp => val v = (ts.getEpochSecond * 1_000_000L) + (ts.getNano.asInstanceOf[Long] / 1_000L) recordConsumer.addLong(v) - case _ => throw new IllegalArgumentException("given mismatched type value") + case _ => throw new DataException("given mismatched type value") } } @@ -53,6 +54,6 @@ case class JsonLogicalTypeHandler() extends LogicalTypeHandler { case msgPack: Value => val bin = Binary.fromString(msgPack.toJson) recordConsumer.addBinary(bin) - case _ => throw new IllegalArgumentException("given mismatched type value") + case _ => throw new DataException("given mismatched type value") } } From 95af1f028af99731782fef29975cada956d65e85 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Tue, 12 Nov 2019 01:42:53 +0900 Subject: [PATCH 05/19] Fix broken order of arguments --- .../embulk/output/s3_parquet/parquet/EmbulkMessageType.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala index 2e57d66..51da4f8 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala @@ -72,7 +72,7 @@ object EmbulkMessageType { val name = column.getName - val t = logicalTypeHandlers.get(column.getType, name) match { + val t = logicalTypeHandlers.get(name, column.getType) match { case Some(h) => h.newSchemaFieldType(name) case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) } @@ -84,7 +84,7 @@ object EmbulkMessageType { val name = column.getName - val t = logicalTypeHandlers.get(column.getType, name) match { + val t = logicalTypeHandlers.get(name, column.getType) match { case Some(h) => h.newSchemaFieldType(name) case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) } From d30cd2bd382b531e1eb4ab88a61b4410a4199878 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Tue, 12 Nov 2019 23:56:12 +0900 Subject: [PATCH 06/19] Support logical types extends long --- .../parquet/LogicalTypeHandler.scala | 35 +++++++++++++++---- .../parquet/LogicalTypeHandlerStore.scala | 9 ++++- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala index e1c1865..a34990a 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala @@ -18,17 +18,28 @@ import org.msgpack.value.Value trait LogicalTypeHandler { def newSchemaFieldType(name: String): PrimitiveType - def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit + def consume(orig: Any, recordConsumer: RecordConsumer): Unit +} + +private case class IntLogicalTypeHandler(ot: OriginalType) extends LogicalTypeHandler { + override def newSchemaFieldType(name: String): PrimitiveType = + new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, ot) + + override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = + orig match { + case v: Long => recordConsumer.addLong(v) + case _ => throw new DataException("given mismatched type value; expected type is long") + } } case class TimestampMillisLogicalTypeHandler() extends LogicalTypeHandler { override def newSchemaFieldType(name: String): PrimitiveType = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MILLIS) - override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { case ts: Timestamp => recordConsumer.addLong(ts.toEpochMilli) - case _ => throw new DataException("given mismatched type value") + case _ => throw new DataException("given mismatched type value; expected type is timestamp") } } @@ -36,24 +47,34 @@ case class TimestampMicrosLogicalTypeHandler() extends LogicalTypeHandler { override def newSchemaFieldType(name: String): PrimitiveType = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MICROS) - override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { case ts: Timestamp => val v = (ts.getEpochSecond * 1_000_000L) + (ts.getNano.asInstanceOf[Long] / 1_000L) recordConsumer.addLong(v) - case _ => throw new DataException("given mismatched type value") + case _ => throw new DataException("given mismatched type value; expected type is timestamp") } } +case class Int8LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_8) +case class Int16LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_16) +case class Int32LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_32) +case class Int64LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_64) + +case class Uint8LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_8) +case class Uint16LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_16) +case class Uint32LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_32) +case class Uint64LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_64) + case class JsonLogicalTypeHandler() extends LogicalTypeHandler { override def newSchemaFieldType(name: String): PrimitiveType = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.JSON) - override def consume(orig: AnyRef, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { case msgPack: Value => val bin = Binary.fromString(msgPack.toJson) recordConsumer.addBinary(bin) - case _ => throw new DataException("given mismatched type value") + case _ => throw new DataException("given mismatched type value; expected type is json") } } diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala index 02f236e..28be635 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala @@ -48,8 +48,15 @@ object LogicalTypeHandlerStore { private val STRING_TO_LOGICAL_TYPE = Map[String, LogicalTypeHandler]( "timestamp-millis" -> TimestampMillisLogicalTypeHandler(), "timestamp-micros" -> TimestampMicrosLogicalTypeHandler(), + "int8" -> Int8LogicalTypeHandler(), + "int16" -> Int16LogicalTypeHandler(), + "int32" -> Int32LogicalTypeHandler(), + "int64" -> Int64LogicalTypeHandler(), + "uint8" -> Uint8LogicalTypeHandler(), + "uint16" -> Uint16LogicalTypeHandler(), + "uint32" -> Uint32LogicalTypeHandler(), + "uint64" -> Uint64LogicalTypeHandler(), "json" -> JsonLogicalTypeHandler() - // TODO other types ... ) def empty: LogicalTypeHandlerStore = From cb2e51287436d0784501cb9f7305a13b64399a5d Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Fri, 15 Nov 2019 01:11:07 +0900 Subject: [PATCH 07/19] Add unit tests for logica type handlers --- .../parquet/EmbulkMessageType.scala | 20 ++- .../parquet/LogicalTypeHandler.scala | 49 ++++--- .../parquet/LogicalTypeHandlerStore.scala | 39 ++--- .../parquet/TestLogicalTypeHandler.scala | 74 ++++++++++ .../parquet/TestLogicalTypeHandlerStore.scala | 135 ++++++++++++++++++ 5 files changed, 278 insertions(+), 39 deletions(-) create mode 100644 src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala create mode 100644 src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala index 51da4f8..50d132e 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala @@ -55,7 +55,15 @@ object EmbulkMessageType override def longColumn(column: Column): Unit = { - builder.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, column.getName)) + val name = column.getName + val et = column.getType + + val t = logicalTypeHandlers.get(name, et) match { + case Some(h) if h.isConvertible(et) => h.newSchemaFieldType(name) + case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, column.getName) + } + + builder.add(t) } override def doubleColumn(column: Column): Unit = @@ -71,9 +79,10 @@ object EmbulkMessageType override def timestampColumn(column: Column): Unit = { val name = column.getName + val et = column.getType - val t = logicalTypeHandlers.get(name, column.getType) match { - case Some(h) => h.newSchemaFieldType(name) + val t = logicalTypeHandlers.get(name, et) match { + case Some(h) if h.isConvertible(et) => h.newSchemaFieldType(name) case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) } @@ -83,9 +92,10 @@ object EmbulkMessageType override def jsonColumn(column: Column): Unit = { val name = column.getName + val et = column.getType - val t = logicalTypeHandlers.get(name, column.getType) match { - case Some(h) => h.newSchemaFieldType(name) + val t = logicalTypeHandlers.get(name, et) match { + case Some(h) if h.isConvertible(et) => h.newSchemaFieldType(name) case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) } diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala index a34990a..08a903e 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala @@ -2,8 +2,11 @@ package org.embulk.output.s3_parquet.parquet import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.parquet.schema.{OriginalType, PrimitiveType, Type} +import org.apache.parquet.schema.{Type => PType} +import org.apache.parquet.schema.{OriginalType, PrimitiveType} import org.embulk.spi.DataException +import org.embulk.spi.`type`.{Type => EType} +import org.embulk.spi.`type`.Types import org.embulk.spi.time.Timestamp import org.msgpack.value.Value @@ -15,15 +18,19 @@ import org.msgpack.value.Value * TODO Support both of older and newer representation after 1.11+ is published and other middleware supports it. * */ -trait LogicalTypeHandler { +sealed trait LogicalTypeHandler { + def isConvertible(t: EType): Boolean + def newSchemaFieldType(name: String): PrimitiveType def consume(orig: Any, recordConsumer: RecordConsumer): Unit } -private case class IntLogicalTypeHandler(ot: OriginalType) extends LogicalTypeHandler { +abstract class IntLogicalTypeHandler(ot: OriginalType) extends LogicalTypeHandler { + override def isConvertible(t: EType): Boolean = t == Types.LONG + override def newSchemaFieldType(name: String): PrimitiveType = - new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, ot) + new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, ot) override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { @@ -32,9 +39,11 @@ private case class IntLogicalTypeHandler(ot: OriginalType) extends LogicalTypeHa } } -case class TimestampMillisLogicalTypeHandler() extends LogicalTypeHandler { +object TimestampMillisLogicalTypeHandler extends LogicalTypeHandler { + override def isConvertible(t: EType): Boolean = t == Types.TIMESTAMP + override def newSchemaFieldType(name: String): PrimitiveType = - new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MILLIS) + new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MILLIS) override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { @@ -43,9 +52,11 @@ case class TimestampMillisLogicalTypeHandler() extends LogicalTypeHandler { } } -case class TimestampMicrosLogicalTypeHandler() extends LogicalTypeHandler { +object TimestampMicrosLogicalTypeHandler extends LogicalTypeHandler { + override def isConvertible(t: EType): Boolean = t == Types.TIMESTAMP + override def newSchemaFieldType(name: String): PrimitiveType = - new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MICROS) + new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MICROS) override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { @@ -56,19 +67,21 @@ case class TimestampMicrosLogicalTypeHandler() extends LogicalTypeHandler { } } -case class Int8LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_8) -case class Int16LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_16) -case class Int32LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_32) -case class Int64LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.INT_64) +object Int8LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_8) +object Int16LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_16) +object Int32LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_32) +object Int64LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_64) + +object Uint8LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_8) +object Uint16LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_16) +object Uint32LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_32) +object Uint64LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_64) -case class Uint8LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_8) -case class Uint16LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_16) -case class Uint32LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_32) -case class Uint64LogicalTypeHandler() extends IntLogicalTypeHandler(OriginalType.UINT_64) +object JsonLogicalTypeHandler extends LogicalTypeHandler { + override def isConvertible(t: EType): Boolean = t == Types.JSON -case class JsonLogicalTypeHandler() extends LogicalTypeHandler { override def newSchemaFieldType(name: String): PrimitiveType = - new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.JSON) + new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.JSON) override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = orig match { diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala index 28be635..f00bc3f 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala @@ -3,6 +3,7 @@ package org.embulk.output.s3_parquet.parquet import org.embulk.spi.`type`.{Type, Types} import java.util.{Map => JMap} +import org.embulk.config.ConfigException import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ColumnOptionTask, TypeOptionTask} import scala.jdk.CollectionConverters._ @@ -46,17 +47,17 @@ object LogicalTypeHandlerStore { // Listed only older logical types that we can convert from embulk type private val STRING_TO_LOGICAL_TYPE = Map[String, LogicalTypeHandler]( - "timestamp-millis" -> TimestampMillisLogicalTypeHandler(), - "timestamp-micros" -> TimestampMicrosLogicalTypeHandler(), - "int8" -> Int8LogicalTypeHandler(), - "int16" -> Int16LogicalTypeHandler(), - "int32" -> Int32LogicalTypeHandler(), - "int64" -> Int64LogicalTypeHandler(), - "uint8" -> Uint8LogicalTypeHandler(), - "uint16" -> Uint16LogicalTypeHandler(), - "uint32" -> Uint32LogicalTypeHandler(), - "uint64" -> Uint64LogicalTypeHandler(), - "json" -> JsonLogicalTypeHandler() + "timestamp-millis" -> TimestampMillisLogicalTypeHandler, + "timestamp-micros" -> TimestampMicrosLogicalTypeHandler, + "int8" -> Int8LogicalTypeHandler, + "int16" -> Int16LogicalTypeHandler, + "int32" -> Int32LogicalTypeHandler, + "int64" -> Int64LogicalTypeHandler, + "uint8" -> Uint8LogicalTypeHandler, + "uint16" -> Uint16LogicalTypeHandler, + "uint32" -> Uint32LogicalTypeHandler, + "uint64" -> Uint64LogicalTypeHandler, + "json" -> JsonLogicalTypeHandler ) def empty: LogicalTypeHandlerStore = @@ -66,17 +67,23 @@ object LogicalTypeHandlerStore { val fromEmbulkType = typeOpts.asScala .filter(_._2.getLogicalType.isPresent) .map[Type, LogicalTypeHandler] { case (k, v) => - val t = STRING_TO_EMBULK_TYPE(k) - val h = STRING_TO_LOGICAL_TYPE(v.getLogicalType.get) - (t, h) + val t = STRING_TO_EMBULK_TYPE.get(k) + val h = STRING_TO_LOGICAL_TYPE.get(v.getLogicalType.get) + (t, h) match { + case (Some(tt), Some(hh)) => (tt, hh) + case _ => throw new ConfigException("invalid logical types in type_options") + } } .toMap val fromColumnName = columnOpts.asScala .filter(_._2.getLogicalType.isPresent) .map[String, LogicalTypeHandler] { case (k, v) => - val h = STRING_TO_LOGICAL_TYPE(v.getLogicalType.get) - (k, h) + val h = STRING_TO_LOGICAL_TYPE.get(v.getLogicalType.get) + h match { + case Some(hh) => (k, hh) + case _ => throw new ConfigException("invalid logical types in column_options") + } } .toMap diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala new file mode 100644 index 0000000..0c5d100 --- /dev/null +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala @@ -0,0 +1,74 @@ +package org.embulk.output.s3_parquet.parquet + +import org.embulk.spi.DataException +import org.embulk.spi.`type`.Types +import org.junit.runner.RunWith +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner + +import scala.util.Try + +@RunWith(classOf[JUnitRunner]) +class TestLogicalTypeHandler extends FunSuite { + + test("IntLogicalTypeHandler.isConvertible() returns true for long") { + val h = Int8LogicalTypeHandler + + assert(h.isConvertible(Types.LONG)) + assert(!h.isConvertible(Types.BOOLEAN)) + } + + test("IntLogicalTypeHandler.consume() raises DataException if given type is not long") { + val h = Int8LogicalTypeHandler + val actual = Try(h.consume("invalid", null)) + + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } + + + test("TimestampMillisLogicalTypeHandler.isConvertible() returns true for timestamp") { + val h = TimestampMillisLogicalTypeHandler + + assert(h.isConvertible(Types.TIMESTAMP)) + assert(!h.isConvertible(Types.BOOLEAN)) + } + + test("TimestampMillisLogicalTypeHandler.consume() raises DataException if given type is not timestamp") { + val h = TimestampMillisLogicalTypeHandler + val actual = Try(h.consume("invalid", null)) + + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } + + + test("TimestampMicrosLogicalTypeHandler.isConvertible() returns true for timestamp") { + val h = TimestampMicrosLogicalTypeHandler + + assert(h.isConvertible(Types.TIMESTAMP)) + assert(!h.isConvertible(Types.BOOLEAN)) + } + + test("TimestampMicrosLogicalTypeHandler.consume() raises DataException if given type is not timestamp") { + val h = TimestampMicrosLogicalTypeHandler + val actual = Try(h.consume("invalid", null)) + + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } + + test("JsonLogicalTypeHandler.isConvertible() returns true for json") { + val h = JsonLogicalTypeHandler + + assert(h.isConvertible(Types.JSON)) + assert(!h.isConvertible(Types.BOOLEAN)) + } + + test("JsonLogicalTypeHandler.consume() raises DataException if given type is not json") { + val h = JsonLogicalTypeHandler + val actual = Try(h.consume("invalid", null)) + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } +} diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala new file mode 100644 index 0000000..19d7afd --- /dev/null +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala @@ -0,0 +1,135 @@ +package org.embulk.output.s3_parquet.parquet + +import java.util.Optional + +import com.google.common.base.{Optional => GOptional} +import org.embulk.config.{ConfigException, TaskSource} +import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ColumnOptionTask, TypeOptionTask} +import org.embulk.spi.`type`.{Type => EType} +import org.embulk.spi.`type`.Types +import org.junit.runner.RunWith +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner + +import scala.jdk.CollectionConverters._ +import scala.util.Try + +@RunWith(classOf[JUnitRunner]) +class TestLogicalTypeHandlerStore extends FunSuite { + test("empty() returns empty maps") { + val rv = LogicalTypeHandlerStore.empty + + assert(rv.fromColumnName.isEmpty) + assert(rv.fromEmbulkType.isEmpty) + } + + test("fromEmbulkOptions() returns handlers for valid option tasks") { + val typeOpts = Map[String, TypeOptionTask]( + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + ).asJava + val columnOpts = Map[String, ColumnOptionTask]( + "col1" -> DummyColumnOptionTask(Optional.of[String]("timestamp-micros")), + ).asJava + + val expected1 = Map[EType, LogicalTypeHandler]( + Types.TIMESTAMP -> TimestampMillisLogicalTypeHandler, + ) + val expected2 = Map[String, LogicalTypeHandler]( + "col1" -> TimestampMicrosLogicalTypeHandler, + ) + + val rv = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + assert(rv.fromEmbulkType == expected1) + assert(rv.fromColumnName == expected2) + } + + test("fromEmbulkOptions() raises ConfigException if invalid option tasks given") { + val emptyTypeOpts = Map.empty[String, TypeOptionTask].asJava + val emptyColumnOpts = Map.empty[String, ColumnOptionTask].asJava + + val invalidTypeOpts = Map[String, TypeOptionTask]( + "unknown-embulk-type-name" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("unknown-parquet-logical-type-name")), + ).asJava + val invalidColumnOpts = Map[String, ColumnOptionTask]( + "col1" -> DummyColumnOptionTask(Optional.of[String]("unknown-parquet-logical-type-name")), + ).asJava + + val try1 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(invalidTypeOpts, emptyColumnOpts)) + assert(try1.isFailure) + assert(try1.failed.get.isInstanceOf[ConfigException]) + + val try2 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(emptyTypeOpts, invalidColumnOpts)) + assert(try2.isFailure) + assert(try2.failed.get.isInstanceOf[ConfigException]) + + val try3 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(invalidTypeOpts, invalidColumnOpts)) + assert(try3.isFailure) + assert(try3.failed.get.isInstanceOf[ConfigException]) + } + + test("get() returns a handler matched with primary column name condition") { + val typeOpts = Map[String, TypeOptionTask]( + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + ).asJava + val columnOpts = Map[String, ColumnOptionTask]( + "col1" -> DummyColumnOptionTask(Optional.of[String]("timestamp-micros")), + ).asJava + + val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + // It matches both of column name and embulk type, and column name should be primary + val expected = Some(TimestampMicrosLogicalTypeHandler) + val actual = handlers.get("col1", Types.TIMESTAMP) + + assert(actual == expected) + } + + test("get() returns a handler matched with type name condition") { + val typeOpts = Map[String, TypeOptionTask]( + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + ).asJava + val columnOpts = Map.empty[String, ColumnOptionTask].asJava + + val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + // It matches column name + val expected = Some(TimestampMillisLogicalTypeHandler) + val actual = handlers.get("col1", Types.TIMESTAMP) + + assert(actual == expected) + } + + test("get() returns None if not matched") { + val typeOpts = Map.empty[String, TypeOptionTask].asJava + val columnOpts = Map.empty[String, ColumnOptionTask].asJava + + val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + // It matches embulk type + val actual = handlers.get("col1", Types.TIMESTAMP) + + assert(actual.isEmpty) + } + + private case class DummyTypeOptionTask(lt: Optional[String]) extends TypeOptionTask { + override def getLogicalType: Optional[String] = lt + + override def validate(): Unit = {} + + override def dump(): TaskSource = null + } + + private case class DummyColumnOptionTask(lt: Optional[String]) extends ColumnOptionTask { + override def getTimeZoneId: GOptional[String] = GOptional.absent[String] + + override def getFormat: GOptional[String] = GOptional.absent[String] + + override def getLogicalType: Optional[String] = lt + + override def validate(): Unit = {} + + override def dump(): TaskSource = null + } +} From ddb9ac29f72f986314a0152516d96a7f1dd00e4d Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Fri, 15 Nov 2019 02:01:48 +0900 Subject: [PATCH 08/19] Allow non-timestamp column_option elements --- .../embulk/output/s3_parquet/S3ParquetOutputPlugin.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala index 5ed9012..2229624 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala @@ -9,7 +9,7 @@ import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.embulk.config.{Config, ConfigDefault, ConfigDiff, ConfigException, ConfigSource, Task, TaskReport, TaskSource} -import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.PluginTask +import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ColumnOptionTask, PluginTask} import org.embulk.output.s3_parquet.aws.Aws import org.embulk.output.s3_parquet.parquet.{LogicalTypeHandlerStore, ParquetFileWriter} import org.embulk.spi.{Exec, OutputPlugin, PageReader, Schema, TransactionalPageOutput} @@ -160,9 +160,12 @@ class S3ParquetOutputPlugin // column_options task.getColumnOptions.forEach { (k: String, - _) => + opt: ColumnOptionTask) => val c = schema.lookupColumn(k) - if (!c.getType.getName.equals("timestamp")) throw new ConfigException(s"column:$k is not 'timestamp' type.") + val useTimestampOption = opt.getFormat.isPresent || opt.getTimeZoneId.isPresent + if (!c.getType.getName.equals("timestamp") && useTimestampOption) { + throw new ConfigException(s"column:$k is not 'timestamp' type.") + } } // canned_acl From 3685ee585119f4c1eb8b2d66d148dbff37cc3068 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Fri, 15 Nov 2019 02:41:23 +0900 Subject: [PATCH 09/19] Apply style --- .../s3_parquet/S3ParquetOutputPlugin.scala | 9 +- .../parquet/EmbulkMessageType.scala | 7 +- .../parquet/LogicalTypeHandler.scala | 108 ++++++-- .../parquet/LogicalTypeHandlerStore.scala | 37 ++- .../parquet/ParquetFileWriter.scala | 82 +++--- .../parquet/TestLogicalTypeHandler.scala | 94 ++++--- .../parquet/TestLogicalTypeHandlerStore.scala | 262 ++++++++++-------- 7 files changed, 351 insertions(+), 248 deletions(-) diff --git a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala index 2229624..2892bcb 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala @@ -92,11 +92,14 @@ object S3ParquetOutputPlugin def getTypeOptions: JMap[String, TypeOptionTask] } - trait ColumnOptionTask extends Task with TimestampColumnOption with LogicalTypeOption + trait ColumnOptionTask + extends Task with TimestampColumnOption with LogicalTypeOption - trait TypeOptionTask extends Task with LogicalTypeOption + trait TypeOptionTask + extends Task with LogicalTypeOption - trait LogicalTypeOption { + trait LogicalTypeOption + { @Config("logical_type") def getLogicalType: Optional[String] } diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala index 50d132e..a4c9892 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/EmbulkMessageType.scala @@ -1,5 +1,6 @@ package org.embulk.output.s3_parquet.parquet + import com.google.common.collect.ImmutableList import org.apache.parquet.schema.{MessageType, OriginalType, PrimitiveType, Type} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -60,7 +61,7 @@ object EmbulkMessageType val t = logicalTypeHandlers.get(name, et) match { case Some(h) if h.isConvertible(et) => h.newSchemaFieldType(name) - case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, column.getName) + case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, column.getName) } builder.add(t) @@ -83,7 +84,7 @@ object EmbulkMessageType val t = logicalTypeHandlers.get(name, et) match { case Some(h) if h.isConvertible(et) => h.newSchemaFieldType(name) - case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) + case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) } builder.add(t) @@ -96,7 +97,7 @@ object EmbulkMessageType val t = logicalTypeHandlers.get(name, et) match { case Some(h) if h.isConvertible(et) => h.newSchemaFieldType(name) - case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) + case _ => new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.UTF8) } builder.add(t) diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala index 08a903e..108bcae 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandler.scala @@ -1,5 +1,6 @@ package org.embulk.output.s3_parquet.parquet + import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.{Type => PType} @@ -10,6 +11,7 @@ import org.embulk.spi.`type`.Types import org.embulk.spi.time.Timestamp import org.msgpack.value.Value + /** * Handle Apache Parquet 'Logical Types' on schema/value conversion. * ref. https://github.com/apache/parquet-format/blob/master/LogicalTypes.md @@ -18,76 +20,126 @@ import org.msgpack.value.Value * TODO Support both of older and newer representation after 1.11+ is published and other middleware supports it. * */ -sealed trait LogicalTypeHandler { +sealed trait LogicalTypeHandler +{ def isConvertible(t: EType): Boolean def newSchemaFieldType(name: String): PrimitiveType - def consume(orig: Any, recordConsumer: RecordConsumer): Unit + def consume(orig: Any, + recordConsumer: RecordConsumer): Unit } -abstract class IntLogicalTypeHandler(ot: OriginalType) extends LogicalTypeHandler { - override def isConvertible(t: EType): Boolean = t == Types.LONG +abstract class IntLogicalTypeHandler(ot: OriginalType) + extends LogicalTypeHandler +{ + override def isConvertible(t: EType): Boolean = + { + t == Types.LONG + } override def newSchemaFieldType(name: String): PrimitiveType = + { new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, ot) + } - override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, + recordConsumer: RecordConsumer): Unit = + { orig match { case v: Long => recordConsumer.addLong(v) - case _ => throw new DataException("given mismatched type value; expected type is long") + case _ => throw new DataException("given mismatched type value; expected type is long") } + } } -object TimestampMillisLogicalTypeHandler extends LogicalTypeHandler { - override def isConvertible(t: EType): Boolean = t == Types.TIMESTAMP +object TimestampMillisLogicalTypeHandler + extends LogicalTypeHandler +{ + override def isConvertible(t: EType): Boolean = + { + t == Types.TIMESTAMP + } override def newSchemaFieldType(name: String): PrimitiveType = + { new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MILLIS) + } - override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, + recordConsumer: RecordConsumer): Unit = + { orig match { case ts: Timestamp => recordConsumer.addLong(ts.toEpochMilli) - case _ => throw new DataException("given mismatched type value; expected type is timestamp") + case _ => throw new DataException("given mismatched type value; expected type is timestamp") } + } } -object TimestampMicrosLogicalTypeHandler extends LogicalTypeHandler { - override def isConvertible(t: EType): Boolean = t == Types.TIMESTAMP +object TimestampMicrosLogicalTypeHandler + extends LogicalTypeHandler +{ + override def isConvertible(t: EType): Boolean = + { + t == Types.TIMESTAMP + } override def newSchemaFieldType(name: String): PrimitiveType = + { new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.INT64, name, OriginalType.TIMESTAMP_MICROS) + } - override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, + recordConsumer: RecordConsumer): Unit = + { orig match { case ts: Timestamp => val v = (ts.getEpochSecond * 1_000_000L) + (ts.getNano.asInstanceOf[Long] / 1_000L) recordConsumer.addLong(v) - case _ => throw new DataException("given mismatched type value; expected type is timestamp") + case _ => throw new DataException("given mismatched type value; expected type is timestamp") } + } } -object Int8LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_8) -object Int16LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_16) -object Int32LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_32) -object Int64LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.INT_64) - -object Uint8LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_8) -object Uint16LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_16) -object Uint32LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_32) -object Uint64LogicalTypeHandler extends IntLogicalTypeHandler(OriginalType.UINT_64) - -object JsonLogicalTypeHandler extends LogicalTypeHandler { - override def isConvertible(t: EType): Boolean = t == Types.JSON +object Int8LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.INT_8) +object Int16LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.INT_16) +object Int32LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.INT_32) +object Int64LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.INT_64) + +object Uint8LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.UINT_8) +object Uint16LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.UINT_16) +object Uint32LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.UINT_32) +object Uint64LogicalTypeHandler + extends IntLogicalTypeHandler(OriginalType.UINT_64) + +object JsonLogicalTypeHandler + extends LogicalTypeHandler +{ + override def isConvertible(t: EType): Boolean = + { + t == Types.JSON + } override def newSchemaFieldType(name: String): PrimitiveType = + { new PrimitiveType(PType.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, name, OriginalType.JSON) + } - override def consume(orig: Any, recordConsumer: RecordConsumer): Unit = + override def consume(orig: Any, + recordConsumer: RecordConsumer): Unit = + { orig match { case msgPack: Value => val bin = Binary.fromString(msgPack.toJson) recordConsumer.addBinary(bin) - case _ => throw new DataException("given mismatched type value; expected type is json") + case _ => throw new DataException("given mismatched type value; expected type is json") } + } } diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala index f00bc3f..d2c2d91 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/LogicalTypeHandlerStore.scala @@ -1,5 +1,6 @@ package org.embulk.output.s3_parquet.parquet + import org.embulk.spi.`type`.{Type, Types} import java.util.{Map => JMap} @@ -8,34 +9,44 @@ import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ColumnOptionTask, Typ import scala.jdk.CollectionConverters._ + /** * A storage has mapping from logical type query (column name, type) to handler. * * @param fromEmbulkType * @param fromColumnName */ -case class LogicalTypeHandlerStore private (fromEmbulkType: Map[Type, LogicalTypeHandler], - fromColumnName: Map[String, LogicalTypeHandler]) { +case class LogicalTypeHandlerStore private(fromEmbulkType: Map[Type, LogicalTypeHandler], + fromColumnName: Map[String, LogicalTypeHandler]) +{ // Try column name lookup, then column type - def get(n: String, t: Type): Option[LogicalTypeHandler] = + def get(n: String, + t: Type): Option[LogicalTypeHandler] = + { get(n) match { case Some(h) => Some(h) - case _ => + case _ => get(t) match { case Some(h) => Some(h) - case _ => None + case _ => None } } + } def get(t: Type): Option[LogicalTypeHandler] = + { fromEmbulkType.get(t) + } def get(n: String): Option[LogicalTypeHandler] = + { fromColumnName.get(n) + } } -object LogicalTypeHandlerStore { +object LogicalTypeHandlerStore +{ private val STRING_TO_EMBULK_TYPE = Map[String, Type]( "boolean" -> Types.BOOLEAN, "long" -> Types.LONG, @@ -43,7 +54,7 @@ object LogicalTypeHandlerStore { "string" -> Types.STRING, "timestamp" -> Types.TIMESTAMP, "json" -> Types.JSON - ) + ) // Listed only older logical types that we can convert from embulk type private val STRING_TO_LOGICAL_TYPE = Map[String, LogicalTypeHandler]( @@ -58,12 +69,16 @@ object LogicalTypeHandlerStore { "uint32" -> Uint32LogicalTypeHandler, "uint64" -> Uint64LogicalTypeHandler, "json" -> JsonLogicalTypeHandler - ) + ) def empty: LogicalTypeHandlerStore = + { LogicalTypeHandlerStore(Map.empty[Type, LogicalTypeHandler], Map.empty[String, LogicalTypeHandler]) + } - def fromEmbulkOptions(typeOpts: JMap[String, TypeOptionTask], columnOpts: JMap[String, ColumnOptionTask]): LogicalTypeHandlerStore = { + def fromEmbulkOptions(typeOpts: JMap[String, TypeOptionTask], + columnOpts: JMap[String, ColumnOptionTask]): LogicalTypeHandlerStore = + { val fromEmbulkType = typeOpts.asScala .filter(_._2.getLogicalType.isPresent) .map[Type, LogicalTypeHandler] { case (k, v) => @@ -71,7 +86,7 @@ object LogicalTypeHandlerStore { val h = STRING_TO_LOGICAL_TYPE.get(v.getLogicalType.get) (t, h) match { case (Some(tt), Some(hh)) => (tt, hh) - case _ => throw new ConfigException("invalid logical types in type_options") + case _ => throw new ConfigException("invalid logical types in type_options") } } .toMap @@ -82,7 +97,7 @@ object LogicalTypeHandlerStore { val h = STRING_TO_LOGICAL_TYPE.get(v.getLogicalType.get) h match { case Some(hh) => (k, hh) - case _ => throw new ConfigException("invalid logical types in column_options") + case _ => throw new ConfigException("invalid logical types in column_options") } } .toMap diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala index 444270e..a9f88c9 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriter.scala @@ -20,35 +20,35 @@ object ParquetFileWriter extends ParquetWriter.Builder[PageReader, Builder](path) { - def withPath(path: Path): Builder = - { - copy(path = path) - } - - def withPath(pathString: String): Builder = - { - copy(path = new Path(pathString)) - } - - def withSchema(schema: Schema): Builder = - { - copy(schema = schema) - } - - def withTimestampFormatters(timestampFormatters: Seq[TimestampFormatter]): Builder = - { - copy(timestampFormatters = timestampFormatters) - } - - def withLogicalTypeHandlers(logicalTypeHandlers: LogicalTypeHandlerStore): Builder = - { - copy(logicalTypeHandlers = logicalTypeHandlers) - } - - override def self(): Builder = - { - this - } + def withPath(path: Path): Builder = + { + copy(path = path) + } + + def withPath(pathString: String): Builder = + { + copy(path = new Path(pathString)) + } + + def withSchema(schema: Schema): Builder = + { + copy(schema = schema) + } + + def withTimestampFormatters(timestampFormatters: Seq[TimestampFormatter]): Builder = + { + copy(timestampFormatters = timestampFormatters) + } + + def withLogicalTypeHandlers(logicalTypeHandlers: LogicalTypeHandlerStore): Builder = + { + copy(logicalTypeHandlers = logicalTypeHandlers) + } + + override def self(): Builder = + { + this + } override def getWriteSupport(conf: Configuration): WriteSupport[PageReader] = { @@ -56,10 +56,10 @@ object ParquetFileWriter } } - def builder(): Builder = - { - Builder() - } + def builder(): Builder = + { + Builder() + } } @@ -70,7 +70,7 @@ private[parquet] case class ParquetFileWriter(recordConsumer: RecordConsumer, logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty) { - def write(record: PageReader): Unit = + def write(record: PageReader): Unit = { recordConsumer.startMessage() writeRecord(record) @@ -126,13 +126,13 @@ private[parquet] case class ParquetFileWriter(recordConsumer: RecordConsumer, withWriteFieldContext(column, { val t = record.getTimestamp(column) - logicalTypeHandlers.get(column.getName, column.getType) match { + logicalTypeHandlers.get(column.getName, column.getType) match { case Some(h) => - h.consume(t, recordConsumer) - case _ => - val ft = timestampFormatters(column.getIndex).format(t) - val bin = Binary.fromString(ft) - recordConsumer.addBinary(bin) + h.consume(t, recordConsumer) + case _ => + val ft = timestampFormatters(column.getIndex).format(t) + val bin = Binary.fromString(ft) + recordConsumer.addBinary(bin) } }) }) @@ -147,7 +147,7 @@ private[parquet] case class ParquetFileWriter(recordConsumer: RecordConsumer, logicalTypeHandlers.get(column.getName, column.getType) match { case Some(h) => h.consume(msgPack, recordConsumer) - case _ => + case _ => val bin = Binary.fromString(msgPack.toJson) recordConsumer.addBinary(bin) } diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala index 0c5d100..9cde6d1 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala @@ -1,5 +1,6 @@ package org.embulk.output.s3_parquet.parquet + import org.embulk.spi.DataException import org.embulk.spi.`type`.Types import org.junit.runner.RunWith @@ -8,67 +9,70 @@ import org.scalatest.junit.JUnitRunner import scala.util.Try + @RunWith(classOf[JUnitRunner]) -class TestLogicalTypeHandler extends FunSuite { +class TestLogicalTypeHandler + extends FunSuite +{ - test("IntLogicalTypeHandler.isConvertible() returns true for long") { - val h = Int8LogicalTypeHandler + test("IntLogicalTypeHandler.isConvertible() returns true for long") { + val h = Int8LogicalTypeHandler - assert(h.isConvertible(Types.LONG)) - assert(!h.isConvertible(Types.BOOLEAN)) - } + assert(h.isConvertible(Types.LONG)) + assert(!h.isConvertible(Types.BOOLEAN)) + } - test("IntLogicalTypeHandler.consume() raises DataException if given type is not long") { - val h = Int8LogicalTypeHandler - val actual = Try(h.consume("invalid", null)) + test("IntLogicalTypeHandler.consume() raises DataException if given type is not long") { + val h = Int8LogicalTypeHandler + val actual = Try(h.consume("invalid", null)) - assert(actual.isFailure) - assert(actual.failed.get.isInstanceOf[DataException]) - } + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } - test("TimestampMillisLogicalTypeHandler.isConvertible() returns true for timestamp") { - val h = TimestampMillisLogicalTypeHandler + test("TimestampMillisLogicalTypeHandler.isConvertible() returns true for timestamp") { + val h = TimestampMillisLogicalTypeHandler - assert(h.isConvertible(Types.TIMESTAMP)) - assert(!h.isConvertible(Types.BOOLEAN)) - } + assert(h.isConvertible(Types.TIMESTAMP)) + assert(!h.isConvertible(Types.BOOLEAN)) + } - test("TimestampMillisLogicalTypeHandler.consume() raises DataException if given type is not timestamp") { - val h = TimestampMillisLogicalTypeHandler - val actual = Try(h.consume("invalid", null)) + test("TimestampMillisLogicalTypeHandler.consume() raises DataException if given type is not timestamp") { + val h = TimestampMillisLogicalTypeHandler + val actual = Try(h.consume("invalid", null)) - assert(actual.isFailure) - assert(actual.failed.get.isInstanceOf[DataException]) - } + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } - test("TimestampMicrosLogicalTypeHandler.isConvertible() returns true for timestamp") { - val h = TimestampMicrosLogicalTypeHandler + test("TimestampMicrosLogicalTypeHandler.isConvertible() returns true for timestamp") { + val h = TimestampMicrosLogicalTypeHandler - assert(h.isConvertible(Types.TIMESTAMP)) - assert(!h.isConvertible(Types.BOOLEAN)) - } + assert(h.isConvertible(Types.TIMESTAMP)) + assert(!h.isConvertible(Types.BOOLEAN)) + } - test("TimestampMicrosLogicalTypeHandler.consume() raises DataException if given type is not timestamp") { - val h = TimestampMicrosLogicalTypeHandler - val actual = Try(h.consume("invalid", null)) + test("TimestampMicrosLogicalTypeHandler.consume() raises DataException if given type is not timestamp") { + val h = TimestampMicrosLogicalTypeHandler + val actual = Try(h.consume("invalid", null)) - assert(actual.isFailure) - assert(actual.failed.get.isInstanceOf[DataException]) - } + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } - test("JsonLogicalTypeHandler.isConvertible() returns true for json") { - val h = JsonLogicalTypeHandler + test("JsonLogicalTypeHandler.isConvertible() returns true for json") { + val h = JsonLogicalTypeHandler - assert(h.isConvertible(Types.JSON)) - assert(!h.isConvertible(Types.BOOLEAN)) - } + assert(h.isConvertible(Types.JSON)) + assert(!h.isConvertible(Types.BOOLEAN)) + } - test("JsonLogicalTypeHandler.consume() raises DataException if given type is not json") { - val h = JsonLogicalTypeHandler - val actual = Try(h.consume("invalid", null)) - assert(actual.isFailure) - assert(actual.failed.get.isInstanceOf[DataException]) - } + test("JsonLogicalTypeHandler.consume() raises DataException if given type is not json") { + val h = JsonLogicalTypeHandler + val actual = Try(h.consume("invalid", null)) + assert(actual.isFailure) + assert(actual.failed.get.isInstanceOf[DataException]) + } } diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala index 19d7afd..9a35038 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala @@ -1,5 +1,6 @@ package org.embulk.output.s3_parquet.parquet + import java.util.Optional import com.google.common.base.{Optional => GOptional} @@ -14,122 +15,149 @@ import org.scalatest.junit.JUnitRunner import scala.jdk.CollectionConverters._ import scala.util.Try -@RunWith(classOf[JUnitRunner]) -class TestLogicalTypeHandlerStore extends FunSuite { - test("empty() returns empty maps") { - val rv = LogicalTypeHandlerStore.empty - - assert(rv.fromColumnName.isEmpty) - assert(rv.fromEmbulkType.isEmpty) - } - - test("fromEmbulkOptions() returns handlers for valid option tasks") { - val typeOpts = Map[String, TypeOptionTask]( - "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), - ).asJava - val columnOpts = Map[String, ColumnOptionTask]( - "col1" -> DummyColumnOptionTask(Optional.of[String]("timestamp-micros")), - ).asJava - - val expected1 = Map[EType, LogicalTypeHandler]( - Types.TIMESTAMP -> TimestampMillisLogicalTypeHandler, - ) - val expected2 = Map[String, LogicalTypeHandler]( - "col1" -> TimestampMicrosLogicalTypeHandler, - ) - - val rv = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) - - assert(rv.fromEmbulkType == expected1) - assert(rv.fromColumnName == expected2) - } - - test("fromEmbulkOptions() raises ConfigException if invalid option tasks given") { - val emptyTypeOpts = Map.empty[String, TypeOptionTask].asJava - val emptyColumnOpts = Map.empty[String, ColumnOptionTask].asJava - - val invalidTypeOpts = Map[String, TypeOptionTask]( - "unknown-embulk-type-name" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), - "timestamp" -> DummyTypeOptionTask(Optional.of[String]("unknown-parquet-logical-type-name")), - ).asJava - val invalidColumnOpts = Map[String, ColumnOptionTask]( - "col1" -> DummyColumnOptionTask(Optional.of[String]("unknown-parquet-logical-type-name")), - ).asJava - - val try1 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(invalidTypeOpts, emptyColumnOpts)) - assert(try1.isFailure) - assert(try1.failed.get.isInstanceOf[ConfigException]) - - val try2 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(emptyTypeOpts, invalidColumnOpts)) - assert(try2.isFailure) - assert(try2.failed.get.isInstanceOf[ConfigException]) - - val try3 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(invalidTypeOpts, invalidColumnOpts)) - assert(try3.isFailure) - assert(try3.failed.get.isInstanceOf[ConfigException]) - } - - test("get() returns a handler matched with primary column name condition") { - val typeOpts = Map[String, TypeOptionTask]( - "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), - ).asJava - val columnOpts = Map[String, ColumnOptionTask]( - "col1" -> DummyColumnOptionTask(Optional.of[String]("timestamp-micros")), - ).asJava - - val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) - - // It matches both of column name and embulk type, and column name should be primary - val expected = Some(TimestampMicrosLogicalTypeHandler) - val actual = handlers.get("col1", Types.TIMESTAMP) - - assert(actual == expected) - } - - test("get() returns a handler matched with type name condition") { - val typeOpts = Map[String, TypeOptionTask]( - "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), - ).asJava - val columnOpts = Map.empty[String, ColumnOptionTask].asJava - val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) - - // It matches column name - val expected = Some(TimestampMillisLogicalTypeHandler) - val actual = handlers.get("col1", Types.TIMESTAMP) - - assert(actual == expected) - } - - test("get() returns None if not matched") { - val typeOpts = Map.empty[String, TypeOptionTask].asJava - val columnOpts = Map.empty[String, ColumnOptionTask].asJava - - val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) - - // It matches embulk type - val actual = handlers.get("col1", Types.TIMESTAMP) - - assert(actual.isEmpty) - } - - private case class DummyTypeOptionTask(lt: Optional[String]) extends TypeOptionTask { - override def getLogicalType: Optional[String] = lt - - override def validate(): Unit = {} - - override def dump(): TaskSource = null - } - - private case class DummyColumnOptionTask(lt: Optional[String]) extends ColumnOptionTask { - override def getTimeZoneId: GOptional[String] = GOptional.absent[String] - - override def getFormat: GOptional[String] = GOptional.absent[String] - - override def getLogicalType: Optional[String] = lt - - override def validate(): Unit = {} - - override def dump(): TaskSource = null - } +@RunWith(classOf[JUnitRunner]) +class TestLogicalTypeHandlerStore + extends FunSuite +{ + test("empty() returns empty maps") { + val rv = LogicalTypeHandlerStore.empty + + assert(rv.fromColumnName.isEmpty) + assert(rv.fromEmbulkType.isEmpty) + } + + test("fromEmbulkOptions() returns handlers for valid option tasks") { + val typeOpts = Map[String, TypeOptionTask]( + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + ).asJava + val columnOpts = Map[String, ColumnOptionTask]( + "col1" -> DummyColumnOptionTask(Optional.of[String]("timestamp-micros")), + ).asJava + + val expected1 = Map[EType, LogicalTypeHandler]( + Types.TIMESTAMP -> TimestampMillisLogicalTypeHandler, + ) + val expected2 = Map[String, LogicalTypeHandler]( + "col1" -> TimestampMicrosLogicalTypeHandler, + ) + + val rv = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + assert(rv.fromEmbulkType == expected1) + assert(rv.fromColumnName == expected2) + } + + test("fromEmbulkOptions() raises ConfigException if invalid option tasks given") { + val emptyTypeOpts = Map.empty[String, TypeOptionTask].asJava + val emptyColumnOpts = Map.empty[String, ColumnOptionTask].asJava + + val invalidTypeOpts = Map[String, TypeOptionTask]( + "unknown-embulk-type-name" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("unknown-parquet-logical-type-name")), + ).asJava + val invalidColumnOpts = Map[String, ColumnOptionTask]( + "col1" -> DummyColumnOptionTask(Optional.of[String]("unknown-parquet-logical-type-name")), + ).asJava + + val try1 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(invalidTypeOpts, emptyColumnOpts)) + assert(try1.isFailure) + assert(try1.failed.get.isInstanceOf[ConfigException]) + + val try2 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(emptyTypeOpts, invalidColumnOpts)) + assert(try2.isFailure) + assert(try2.failed.get.isInstanceOf[ConfigException]) + + val try3 = Try(LogicalTypeHandlerStore.fromEmbulkOptions(invalidTypeOpts, invalidColumnOpts)) + assert(try3.isFailure) + assert(try3.failed.get.isInstanceOf[ConfigException]) + } + + test("get() returns a handler matched with primary column name condition") { + val typeOpts = Map[String, TypeOptionTask]( + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + ).asJava + val columnOpts = Map[String, ColumnOptionTask]( + "col1" -> DummyColumnOptionTask(Optional.of[String]("timestamp-micros")), + ).asJava + + val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + // It matches both of column name and embulk type, and column name should be primary + val expected = Some(TimestampMicrosLogicalTypeHandler) + val actual = handlers.get("col1", Types.TIMESTAMP) + + assert(actual == expected) + } + + test("get() returns a handler matched with type name condition") { + val typeOpts = Map[String, TypeOptionTask]( + "timestamp" -> DummyTypeOptionTask(Optional.of[String]("timestamp-millis")), + ).asJava + val columnOpts = Map.empty[String, ColumnOptionTask].asJava + + val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + // It matches column name + val expected = Some(TimestampMillisLogicalTypeHandler) + val actual = handlers.get("col1", Types.TIMESTAMP) + + assert(actual == expected) + } + + test("get() returns None if not matched") { + val typeOpts = Map.empty[String, TypeOptionTask].asJava + val columnOpts = Map.empty[String, ColumnOptionTask].asJava + + val handlers = LogicalTypeHandlerStore.fromEmbulkOptions(typeOpts, columnOpts) + + // It matches embulk type + val actual = handlers.get("col1", Types.TIMESTAMP) + + assert(actual.isEmpty) + } + + private case class DummyTypeOptionTask(lt: Optional[String]) + extends TypeOptionTask + { + override def getLogicalType: Optional[String] = + { + lt + } + + override def validate(): Unit = + {} + + override def dump(): TaskSource = + { + null + } + } + + private case class DummyColumnOptionTask(lt: Optional[String]) + extends ColumnOptionTask + { + override def getTimeZoneId: GOptional[String] = + { + GOptional.absent[String] + } + + override def getFormat: GOptional[String] = + { + GOptional.absent[String] + } + + override def getLogicalType: Optional[String] = + { + lt + } + + override def validate(): Unit = + {} + + override def dump(): TaskSource = + { + null + } + } } From 46dc85dddeba2c96ffc53d30b3b87f4d7ae38330 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Fri, 15 Nov 2019 02:42:13 +0900 Subject: [PATCH 10/19] documentations --- README.md | 6 +++++- build.gradle | 2 +- example/with_logicaltypes.yml | 10 +++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 9dae260..a011337 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ - **column_options**: a map whose keys are name of columns, and values are configuration with following parameters (optional) - **timezone**: timezone if type of this column is timestamp. If not set, **default_timezone** is used. (string, optional) - **format**: timestamp format if type of this column is timestamp. If not set, **default_timestamp_format**: is used. (string, optional) + - **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int*`, `uint*`) (string, optional) - **canned_acl**: grants one of [canned ACLs](https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL) for created objects (string, default: `private`) - **block_size**: The block size is the size of a row group being buffered in memory. This limits the memory usage when writing. Larger values will improve the I/O when reading but consume more memory when writing. (int, default: `134217728` (128MB)) - **page_size**: The page size is for compression. When reading, each page can be decompressed independently. A block is composed of pages. The page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. (int, default: `1048576` (1MB)) @@ -75,6 +76,8 @@ - **user** proxy user (string, optional) - **password** proxy password (string, optional) - **buffer_dir**: buffer directory for parquet files to be uploaded on S3 (string, default: Create a Temporary Directory) +- **type_options**: a map whose keys are name of embulk type(`boolean`, `long`, `double`, `string`, `timestamp`, `json`), and values are configuration with following parameters (optional) + - **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int*`, `uint*`) (string, optional) ## Example @@ -92,7 +95,8 @@ out: ## Note -* The current implementation does not support [LogicalTypes](https://github.com/apache/parquet-format/blob/2b38663/LogicalTypes.md). I will implement them later as **column_options**. So, currently **timestamp** type and **json** type are stored as UTF-8 String. Please be careful. +* The current Parquet [LogicalTypes](https://github.com/apache/parquet-format/blob/2b38663/LogicalTypes.md) implementation does only old representation. +* Some kind of LogicalTypes are sometimes not supported on your middleware. Be careful to giving logical type name. ## Development diff --git a/build.gradle b/build.gradle index d385bb1..389305d 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ configurations { provided } -version = "0.0.3" +version = "0.0.4" sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/example/with_logicaltypes.yml b/example/with_logicaltypes.yml index c0b5748..37f8b46 100644 --- a/example/with_logicaltypes.yml +++ b/example/with_logicaltypes.yml @@ -18,18 +18,14 @@ in: out: type: s3_parquet bucket: my-bucket - path_prefix: path/to/my-obj. + path_prefix: path/to/my-obj-2. file_ext: snappy.parquet compression_codec: snappy default_timezone: Asia/Tokyo canned_acl: bucket-owner-full-control column_options: - # It has higher priority than type_options.timestamp - t: - logical_type: "timestamp-micros" + id: + logical_type: "uint64" type_options: - json: - logical_type: "json" - # It should be ignored by column_options on 't' column timestamp: logical_type: "timestamp-millis" From 0fcab5cd6b70bba7bafcb7de6bdae10f29f63734 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 13:02:45 +0900 Subject: [PATCH 11/19] Add Github Actions CI settings --- .github/workflows/release.yml | 33 +++++++++++++++++++++++++++++++++ .github/workflows/test.yml | 19 +++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 .github/workflows/release.yml create mode 100644 .github/workflows/test.yml diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..8483eff --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,33 @@ +name: Release CI + +on: + pull_request: + branches: + - master + types: + - closed + +jobs: + release: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Test with Gradle + if: github.event.pull_request.merged == true + run: ./gradlew test + - name: Release the new gem + if: github.event.pull_request.merged == true + run: | + mkdir -p $HOME/.gem + touch $HOME/.gem/credentials + chmod 0600 $HOME/.gem/credentials + printf -- "---\n:rubygems_api_key: ${RUBYGEMS_API_KEY}\n" > $HOME/.gem/credentials + ./gradlew gemPush + env: + RUBYGEMS_API_KEY: ${{secrets.RUBYGEMS_API_KEY}} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..1029d4c --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,19 @@ +name: Test CI + +on: + - push + +jobs: + test: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Test with Gradle + run: ./gradlew test + From 8c7271c5b920f3701044c0c24f12a8f8474ac857 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 13:55:03 +0900 Subject: [PATCH 12/19] Use localstack docker directly, not Java API. --- README.md | 3 +- build.gradle | 1 - .../TestS3ParquetOutputPlugin.scala | 61 ++++++++++--------- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index a011337..acedb9c 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,8 @@ $ embulk run example/config.yml -Ilib ### Run test: ```shell +## Run fake S3 with localstack +$ docker run -it --rm -p 4572:4572 -e SERVICES=s3 localstack/localstack $ ./gradlew test ``` @@ -125,7 +127,6 @@ Fix [build.gradle](./build.gradle), then ```shell $ ./gradlew gemPush - ``` ## ChangeLog diff --git a/build.gradle b/build.gradle index 389305d..b837d2e 100644 --- a/build.gradle +++ b/build.gradle @@ -35,7 +35,6 @@ dependencies { testCompile 'org.scalatest:scalatest_2.13:3.0.8' testCompile 'org.embulk:embulk-test:0.9.17' testCompile 'org.embulk:embulk-standards:0.9.17' - testCompile 'cloud.localstack:localstack-utils:0.1.15' testCompile 'org.apache.parquet:parquet-tools:1.10.1' testCompile 'org.apache.hadoop:hadoop-client:2.9.2' } diff --git a/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala b/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala index ee7f5c8..7f328e3 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala @@ -4,9 +4,9 @@ package org.embulk.output.s3_parquet import java.io.File import java.nio.file.FileSystems -import cloud.localstack.{DockerTestUtils, Localstack, TestUtils} -import cloud.localstack.docker.LocalstackDocker -import cloud.localstack.docker.annotation.LocalstackDockerConfiguration +import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import com.amazonaws.services.s3.transfer.TransferManagerBuilder import com.google.common.io.Resources import org.apache.hadoop.fs.{Path => HadoopPath} @@ -33,22 +33,11 @@ class TestS3ParquetOutputPlugin { val RESOURCE_NAME_PREFIX: String = "org/embulk/output/s3_parquet/" - val BUCKET_NAME: String = "my-bucket" - - val LOCALSTACK_DOCKER: LocalstackDocker = LocalstackDocker.INSTANCE - - override protected def beforeAll(): Unit = - { - Localstack.teardownInfrastructure() - LOCALSTACK_DOCKER.startup(LocalstackDockerConfiguration.DEFAULT) - super.beforeAll() - } - - override protected def afterAll(): Unit = - { - LOCALSTACK_DOCKER.stop() - super.afterAll() - } + val TEST_S3_ENDPOINT: String = "http://localhost:4572" + val TEST_S3_REGION: String = "us-east-1" + val TEST_S3_ACCESS_KEY_ID: String = "test" + val TEST_S3_SECRET_ACCESS_KEY: String = "test" + val TEST_BUCKET_NAME: String = "my-bucket" @(Rule@getter) val embulk: TestingEmbulk = TestingEmbulk.builder() @@ -56,7 +45,11 @@ class TestS3ParquetOutputPlugin .build() before { - DockerTestUtils.getClientS3.createBucket(BUCKET_NAME) + withLocalStackS3Client(_.createBucket(TEST_BUCKET_NAME)) + } + + after { + withLocalStackS3Client(_.deleteBucket(TEST_BUCKET_NAME)) } def defaultOutConfig(): ConfigSource = @@ -64,11 +57,11 @@ class TestS3ParquetOutputPlugin embulk.newConfig() .set("type", "s3_parquet") .set("endpoint", "http://localhost:4572") // See https://github.com/localstack/localstack#overview - .set("bucket", BUCKET_NAME) + .set("bucket", TEST_BUCKET_NAME) .set("path_prefix", "path/to/p") .set("auth_method", "basic") - .set("access_key_id", TestUtils.TEST_ACCESS_KEY) - .set("secret_access_key", TestUtils.TEST_SECRET_KEY) + .set("access_key_id", TEST_S3_ACCESS_KEY_ID) + .set("secret_access_key", TEST_S3_SECRET_ACCESS_KEY) .set("path_style_access_enabled", true) .set("default_timezone", "Asia/Tokyo") } @@ -111,12 +104,14 @@ class TestS3ParquetOutputPlugin def readParquetFile(bucket: String, key: String): Seq[Map[String, String]] = { - val xfer = TransferManagerBuilder.standard() - .withS3Client(DockerTestUtils.getClientS3) - .build() val createdParquetFile = embulk.createTempFile("in") - try xfer.download(bucket, key, createdParquetFile.toFile).waitForCompletion() - finally xfer.shutdownNow() + withLocalStackS3Client {s3 => + val xfer = TransferManagerBuilder.standard() + .withS3Client(s3) + .build() + try xfer.download(bucket, key, createdParquetFile.toFile).waitForCompletion() + finally xfer.shutdownNow() + } val reader: ParquetReader[SimpleRecord] = ParquetReader .builder(new SimpleReadSupport(), new HadoopPath(createdParquetFile.toString)) @@ -146,4 +141,14 @@ class TestS3ParquetOutputPlugin FileSystems.getDefault.getPath(new File(url.toURI).getAbsolutePath) } + private def withLocalStackS3Client[A](f: AmazonS3 => A): A = { + val client: AmazonS3 = AmazonS3ClientBuilder.standard + .withEndpointConfiguration(new EndpointConfiguration(TEST_S3_ENDPOINT, TEST_S3_REGION)) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(TEST_S3_ACCESS_KEY_ID, TEST_S3_SECRET_ACCESS_KEY))) + .withPathStyleAccessEnabled(true) + .build() + + try f(client) + finally client.shutdown() + } } From 50a145867831fe0d3b884834c02d78bbd0a994d9 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 13:59:52 +0900 Subject: [PATCH 13/19] Github Actions run with localstack --- .github/workflows/release.yml | 7 +++++++ .github/workflows/test.yml | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8483eff..8ba2595 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,6 +11,13 @@ jobs: release: runs-on: ubuntu-latest + services: + localstack: + image: localstack/localstack + ports: + - 4572:4572 + env: + SERVICES: s3 steps: - uses: actions/checkout@v1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1029d4c..4c77bab 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,6 +7,13 @@ jobs: test: runs-on: ubuntu-latest + services: + localstack: + image: localstack/localstack + ports: + - 4572:4572 + env: + SERVICES: s3 steps: - uses: actions/checkout@v1 From d066f9c3c6a3afccb44a13670353d092b4c8d0f6 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 14:09:11 +0900 Subject: [PATCH 14/19] Add status badges --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index acedb9c..52a8b6c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # S3 Parquet output plugin for Embulk +![Release CI Status Badge](https://github.com/civitaspo/embulk-output-s3_parquet/workflows/Release%20CI/badge.svg) ![Test CI Status Badge](https://github.com/civitaspo/embulk-output-s3_parquet/workflows/Test%20CI/badge.svg) + [Embulk](https://github.com/embulk/embulk/) output plugin to dump records as [Apache Parquet](https://parquet.apache.org/) files on S3. ## Overview From b90633b1c1bb19b9c65375279bce139636eaf078 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 16:40:38 +0900 Subject: [PATCH 15/19] Support LogicalTypes for Glue Data Catalog --- README.md | 30 ++++++++++++++-- example/with_catalog.yml | 36 +++++++++++++++++++ .../s3_parquet/CatalogRegistrator.scala | 34 +++++++++++++++--- .../s3_parquet/S3ParquetOutputPlugin.scala | 20 ++++++++++- 4 files changed, 111 insertions(+), 9 deletions(-) create mode 100644 example/with_catalog.yml diff --git a/README.md b/README.md index 52a8b6c..8bdfaa6 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ - **column_options**: a map whose keys are name of columns, and values are configuration with following parameters (optional) - **timezone**: timezone if type of this column is timestamp. If not set, **default_timezone** is used. (string, optional) - **format**: timestamp format if type of this column is timestamp. If not set, **default_timestamp_format**: is used. (string, optional) - - **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int*`, `uint*`) (string, optional) + - **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int8`, `int16`, `int32`, `int64`, `uint8`, `uint16`, `uint32`, `uint64`) (string, optional) - **canned_acl**: grants one of [canned ACLs](https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL) for created objects (string, default: `private`) - **block_size**: The block size is the size of a row group being buffered in memory. This limits the memory usage when writing. Larger values will improve the I/O when reading but consume more memory when writing. (int, default: `134217728` (128MB)) - **page_size**: The page size is for compression. When reading, each page can be decompressed independently. A block is composed of pages. The page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. (int, default: `1048576` (1MB)) @@ -67,7 +67,31 @@ - **database**: The name of the database (string, required) - **table**: The name of the table (string, required) - **column_options**: a key-value pairs where key is a column name and value is options for the column. (string to options map, default: `{}`) - - **type**: type of a column when this plugin creates new tables (e.g. `STRING`, `BIGINT`) (string, default: depends on input column type. `BIGINT` if input column type is `long`, `BOOLEAN` if boolean, `DOUBLE` if `double`, `STRING` if `string`, `STRING` if `timestamp`, `STRING` if `json`) + - **type**: type of column when this plugin creates new tables (e.g. `string`, `bigint`) (string, default: depends on the input embulk column type, or the parquet logical type. See the below table) + + |embulk column type|glue data type| + |:---|:---| + |long|bigint| + |boolean|boolean| + |double|double| + |string|string| + |timestamp|string| + |json|string| + + |parquet logical type|glue data type|note| + |:---|:---|:---| + |timestamp-millis|timestamp|| + |timestamp-micros|long|Glue cannot recognize timestamp-micros.| + |int8|tinyint|| + |int16|smallint|| + |int32|int|| + |int64|bigint|| + |uint8|smallint|Glue tinyint is a minimum value of -2^7 and a maximum value of 2^7-1| + |uint16|int|Glue smallint is a minimum value of -2^15 and a maximum value of 2^15-1.| + |uint32|bigint|Glue int is a minimum value of-2^31 and a maximum value of 2^31-1.| + |uint64|ConfigException|Glue bigint supports only a 64-bit signed integer.| + |json|string|| + - **operation_if_exists**: operation if the table already exist. Available operations are `"delete"` and `"skip"` (string, default: `"delete"`) - **endpoint**: The AWS Service endpoint (string, optional) - **region**: The AWS region (string, optional) @@ -79,7 +103,7 @@ - **password** proxy password (string, optional) - **buffer_dir**: buffer directory for parquet files to be uploaded on S3 (string, default: Create a Temporary Directory) - **type_options**: a map whose keys are name of embulk type(`boolean`, `long`, `double`, `string`, `timestamp`, `json`), and values are configuration with following parameters (optional) - - **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int*`, `uint*`) (string, optional) + - **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int8`, `int16`, `int32`, `int64`, `uint8`, `uint16`, `uint32`, `uint64`) (string, optional) ## Example diff --git a/example/with_catalog.yml b/example/with_catalog.yml new file mode 100644 index 0000000..6431f52 --- /dev/null +++ b/example/with_catalog.yml @@ -0,0 +1,36 @@ + +in: + type: file + path_prefix: ./example/data.tsv + parser: + type: csv + delimiter: "\t" + skip_header_lines: 0 + null_string: "" + columns: + - { name: id, type: long } + - { name: description, type: string } + - { name: name, type: string } + - { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z"} + - { name: payload, type: json} + stop_on_invalid_record: true + +out: + type: s3_parquet + bucket: dev-baikal-workspace + path_prefix: path/to/my-obj-2. + file_ext: snappy.parquet + compression_codec: snappy + default_timezone: Asia/Tokyo + canned_acl: bucket-owner-full-control + column_options: + id: + logical_type: "int64" + payload: + logical_type: "json" + type_options: + timestamp: + logical_type: "timestamp-millis" + catalog: + database: example_db + table: example_tbl diff --git a/src/main/scala/org/embulk/output/s3_parquet/CatalogRegistrator.scala b/src/main/scala/org/embulk/output/s3_parquet/CatalogRegistrator.scala index e6817cb..27ff4de 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/CatalogRegistrator.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/CatalogRegistrator.scala @@ -51,9 +51,10 @@ object CatalogRegistrator schema: Schema, location: String, compressionCodec: CompressionCodecName, - loggerOption: Option[Logger] = None): CatalogRegistrator = + loggerOption: Option[Logger] = None, + parquetColumnLogicalTypes: Map[String, String] = Map.empty): CatalogRegistrator = { - new CatalogRegistrator(aws, task, schema, location, compressionCodec, loggerOption) + new CatalogRegistrator(aws, task, schema, location, compressionCodec, loggerOption, parquetColumnLogicalTypes) } } @@ -62,7 +63,8 @@ class CatalogRegistrator(aws: Aws, schema: Schema, location: String, compressionCodec: CompressionCodecName, - loggerOption: Option[Logger] = None) + loggerOption: Option[Logger] = None, + parquetColumnLogicalTypes: Map[String, String] = Map.empty) { val logger: Logger = loggerOption.getOrElse(LoggerFactory.getLogger(classOf[CatalogRegistrator])) @@ -150,14 +152,36 @@ class CatalogRegistrator(aws: Aws, schema.getColumns.asScala.toSeq.map { c => val cType: String = if (columnOptions.contains(c.getName)) columnOptions(c.getName).getType - else convertEmbulkType2GlueType(c.getType) + else if (parquetColumnLogicalTypes.contains(c.getName)) convertParquetLogicalTypeToGlueType(parquetColumnLogicalTypes(c.getName)) + else convertEmbulkTypeToGlueType(c.getType) new Column() .withName(c.getName) .withType(cType) } } - private def convertEmbulkType2GlueType(t: Type): String = + private def convertParquetLogicalTypeToGlueType(t: String): String = + { + t match { + case "timestamp-millis" => "timestamp" + case "timestamp-micros" => "bigint" // Glue cannot recognize timestamp-micros. + case "int8" => "tinyint" + case "int16" => "smallint" + case "int32" => "int" + case "int64" => "bigint" + case "uint8" => "smallint" // Glue tinyint is a minimum value of -2^7 and a maximum value of 2^7-1 + case "uint16" => "int" // Glue smallint is a minimum value of -2^15 and a maximum value of 2^15-1. + case "uint32" => "bigint" // Glue int is a minimum value of-2^31 and a maximum value of 2^31-1. + case "uint64" => throw new ConfigException("Cannot convert uint64 to Glue data types automatically" + + " because the Glue bigint supports a 64-bit signed integer." + + " Please use `catalog.column_options` to define the type.") + case "json" => "string" + case _ => throw new ConfigException(s"Unsupported a parquet logical type: $t. Please use `catalog.column_options` to define the type.") + } + + } + + private def convertEmbulkTypeToGlueType(t: Type): String = { t match { case _: BooleanType => "boolean" diff --git a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala index 2892bcb..53b623f 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/S3ParquetOutputPlugin.scala @@ -18,6 +18,9 @@ import org.embulk.spi.time.TimestampFormatter.TimestampColumnOption import org.embulk.spi.util.Timestamps import org.slf4j.{Logger, LoggerFactory} +import scala.jdk.CollectionConverters._ +import scala.util.chaining._ + object S3ParquetOutputPlugin { @@ -132,11 +135,26 @@ class S3ParquetOutputPlugin } task.getCatalog.ifPresent { catalog => val location = s"s3://${task.getBucket}/${task.getPathPrefix.replaceFirst("(.*/)[^/]+$", "$1")}" + val parquetColumnLogicalTypes: Map[String, String] = Map.newBuilder[String, String].pipe {builder => + val cOptions = task.getColumnOptions.asScala + val tOptions = task.getTypeOptions.asScala + schema.getColumns.asScala.foreach {c => + cOptions.get(c.getName) + if (cOptions.contains(c.getName) && cOptions(c.getName).getLogicalType.isPresent) { + builder.addOne(c.getName -> cOptions(c.getName).getLogicalType.get()) + } + else if (tOptions.contains(c.getType.getName) && tOptions(c.getType.getName).getLogicalType.isPresent) { + builder.addOne(c.getName -> tOptions(c.getType.getName).getLogicalType.get()) + } + } + builder.result() + } val cr = CatalogRegistrator(aws = Aws(task), task = catalog, schema = schema, location = location, - compressionCodec = task.getCompressionCodec) + compressionCodec = task.getCompressionCodec, + parquetColumnLogicalTypes = parquetColumnLogicalTypes) cr.run() } From f672696af78e456106ef2e788acb3e759d5f14d1 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 21:09:39 +0900 Subject: [PATCH 16/19] Update dependencies (embulk 0.9.17 -> 0.9.20, scala 2.13.0 -> 2.13.1, aws-sdk 1.11.592 -> 1.11.676) --- build.gradle | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index b837d2e..8cd8a32 100644 --- a/build.gradle +++ b/build.gradle @@ -19,12 +19,12 @@ sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { - compile "org.embulk:embulk-core:0.9.17" - provided "org.embulk:embulk-core:0.9.17" + compile "org.embulk:embulk-core:0.9.20" + provided "org.embulk:embulk-core:0.9.20" - compile 'org.scala-lang:scala-library:2.13.0' + compile 'org.scala-lang:scala-library:2.13.1' ['glue', 's3', 'sts'].each { v -> - compile "com.amazonaws:aws-java-sdk-${v}:1.11.592" + compile "com.amazonaws:aws-java-sdk-${v}:1.11.676" } ['column', 'common', 'encoding', 'format', 'hadoop', 'jackson'].each { v -> compile "org.apache.parquet:parquet-${v}:1.10.1" @@ -33,8 +33,8 @@ dependencies { compile 'org.xerial.snappy:snappy-java:1.1.7.3' testCompile 'org.scalatest:scalatest_2.13:3.0.8' - testCompile 'org.embulk:embulk-test:0.9.17' - testCompile 'org.embulk:embulk-standards:0.9.17' + testCompile 'org.embulk:embulk-test:0.9.20' + testCompile 'org.embulk:embulk-standards:0.9.20' testCompile 'org.apache.parquet:parquet-tools:1.10.1' testCompile 'org.apache.hadoop:hadoop-client:2.9.2' } From c686abf00986e4da925ccad672af4d21d684512f Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 22:11:01 +0900 Subject: [PATCH 17/19] Support auth_method: web_identity_token for the EKS "IAM Roles for Service Accounts" feature. --- README.md | 10 ++++++---- .../output/s3_parquet/aws/AwsCredentials.scala | 12 +++++++++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 8bdfaa6..50c4c08 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ - **page_size**: The page size is for compression. When reading, each page can be decompressed independently. A block is composed of pages. The page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. (int, default: `1048576` (1MB)) - **max_padding_size**: The max size (bytes) to write as padding and the min size of a row group (int, default: `8388608` (8MB)) - **enable_dictionary_encoding**: The boolean value is to enable/disable dictionary encoding. (boolean, default: `true`) -- **auth_method**: name of mechanism to authenticate requests (`"basic"`, `"env"`, `"instance"`, `"profile"`, `"properties"`, `"anonymous"`, or `"session"`, default: `"default"`) +- **auth_method**: name of mechanism to authenticate requests (`"basic"`, `"env"`, `"instance"`, `"profile"`, `"properties"`, `"anonymous"`, `"session"`, `"web_identity_token"`, default: `"default"`) - `"basic"`: uses **access_key_id** and **secret_access_key** to authenticate. - `"env"`: uses `AWS_ACCESS_KEY_ID` (or `AWS_ACCESS_KEY`) and `AWS_SECRET_KEY` (or `AWS_SECRET_ACCESS_KEY`) environment variables. - `"instance"`: uses EC2 instance profile or attached ECS task role. @@ -47,6 +47,7 @@ - `"anonymous"`: uses anonymous access. This auth method can access only public files. - `"session"`: uses temporary-generated **access_key_id**, **secret_access_key** and **session_token**. - `"assume_role"`: uses temporary-generated credentials by assuming **role_arn** role. + - `"web_identity_token"`: uses temporary-generated credentials by assuming **role_arn** role with web identity. - `"default"`: uses AWS SDK's default strategy to look up available credentials from runtime environment. This method behaves like the combination of the following methods. 1. `"env"` 1. `"properties"` @@ -57,10 +58,11 @@ - **access_key_id**: aws access key id. this is required when **auth_method** is `"basic"` or `"session"`. (string, optional) - **secret_access_key**: aws secret access key. this is required when **auth_method** is `"basic"` or `"session"`. (string, optional) - **session_token**: aws session token. this is required when **auth_method** is `"session"`. (string, optional) -- **role_arn**: arn of the role to assume. this is required for **auth_method** is `"assume_role"`. (string, optional) -- **role_session_name**: an identifier for the assumed role session. this is required when **auth_method** is `"assume_role"`. (string, optional) +- **role_arn**: arn of the role to assume. this is required for **auth_method** is `"assume_role"` or `"web_identity_token"`. (string, optional) +- **role_session_name**: an identifier for the assumed role session. this is required when **auth_method** is `"assume_role"` or `"web_identity_token"`. (string, optional) - **role_external_id**: a unique identifier that is used by third parties when assuming roles in their customers' accounts. this is optionally used for **auth_method**: `"assume_role"`. (string, optional) -- **role_session_duration_seconds**: duration, in seconds, of the role session. this is optionally used for **auth_method**: `"assume_role"`. (int, optional) +- **role_session_duration_seconds**: duration, in seconds, of the role session. this is optionally used for **auth_method**: `"assume_role"`. (int, optional) +- **web_identity_token_file**: the absolute path to the web identity token file. this is required when **auth_method** is `"web_identity_token"`. (string, optional) - **scope_down_policy**: an iam policy in json format. this is optionally used for **auth_method**: `"assume_role"`. (string, optional) - **catalog**: Register a table if this option is specified (optional) - **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional) diff --git a/src/main/scala/org/embulk/output/s3_parquet/aws/AwsCredentials.scala b/src/main/scala/org/embulk/output/s3_parquet/aws/AwsCredentials.scala index 19f823d..d20177a 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/aws/AwsCredentials.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/aws/AwsCredentials.scala @@ -3,7 +3,7 @@ package org.embulk.output.s3_parquet.aws import java.util.Optional -import com.amazonaws.auth.{AnonymousAWSCredentials, AWSCredentialsProvider, AWSStaticCredentialsProvider, BasicAWSCredentials, BasicSessionCredentials, DefaultAWSCredentialsProviderChain, EC2ContainerCredentialsProviderWrapper, EnvironmentVariableCredentialsProvider, STSAssumeRoleSessionCredentialsProvider, SystemPropertiesCredentialsProvider} +import com.amazonaws.auth.{AnonymousAWSCredentials, AWSCredentialsProvider, AWSStaticCredentialsProvider, BasicAWSCredentials, BasicSessionCredentials, DefaultAWSCredentialsProviderChain, EC2ContainerCredentialsProviderWrapper, EnvironmentVariableCredentialsProvider, STSAssumeRoleSessionCredentialsProvider, SystemPropertiesCredentialsProvider, WebIdentityTokenCredentialsProvider} import com.amazonaws.auth.profile.{ProfileCredentialsProvider, ProfilesConfigFile} import org.embulk.config.{Config, ConfigDefault, ConfigException} import org.embulk.output.s3_parquet.aws.AwsCredentials.Task @@ -60,6 +60,9 @@ object AwsCredentials @ConfigDefault("null") def getScopeDownPolicy: Optional[String] + @Config("web_identity_token_file") + @ConfigDefault("null") + def getWebIdentityTokenFile: Optional[String] } def apply(task: Task): AwsCredentials = @@ -119,6 +122,13 @@ class AwsCredentials(task: Task) builder.build() + case "web_identity_token" => + WebIdentityTokenCredentialsProvider.builder() + .roleArn(getRequiredOption(task.getRoleArn, "role_arn")) + .roleSessionName(getRequiredOption(task.getRoleSessionName, "role_session_name")) + .webIdentityTokenFile(getRequiredOption(task.getWebIdentityTokenFile, "web_identity_token_file")) + .build() + case "default" => new DefaultAWSCredentialsProviderChain From ceed1111c8322e59db26dff5e09a1c3f1ec10c40 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 22:14:21 +0900 Subject: [PATCH 18/19] Use `org.scalatestplus.junit.JUnitRunner` instead of `org.scalatest.junit.JUnitRunner`. --- .../TestS3ParquetOutputPlugin.scala | 2 +- .../parquet/TestLogicalTypeHandler.scala | 2 +- .../parquet/TestLogicalTypeHandlerStore.scala | 63 +++++++++---------- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala b/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala index 7f328e3..3d29532 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala @@ -18,7 +18,7 @@ import org.embulk.test.{EmbulkTests, TestingEmbulk} import org.junit.Rule import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, DiagrammedAssertions, FunSuite} -import org.scalatest.junit.JUnitRunner +import org.scalatestplus.junit.JUnitRunner import scala.annotation.meta.getter import scala.jdk.CollectionConverters._ diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala index 9cde6d1..d8a4b73 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala @@ -5,7 +5,7 @@ import org.embulk.spi.DataException import org.embulk.spi.`type`.Types import org.junit.runner.RunWith import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner +import org.scalatestplus.junit.JUnitRunner import scala.util.Try diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala index 9a35038..db0aa0d 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala @@ -6,11 +6,10 @@ import java.util.Optional import com.google.common.base.{Optional => GOptional} import org.embulk.config.{ConfigException, TaskSource} import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ColumnOptionTask, TypeOptionTask} -import org.embulk.spi.`type`.{Type => EType} -import org.embulk.spi.`type`.Types +import org.embulk.spi.`type`.{Types, Type => EType} import org.junit.runner.RunWith import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner +import org.scalatestplus.junit.JUnitRunner import scala.jdk.CollectionConverters._ import scala.util.Try @@ -120,44 +119,44 @@ class TestLogicalTypeHandlerStore private case class DummyTypeOptionTask(lt: Optional[String]) extends TypeOptionTask { - override def getLogicalType: Optional[String] = - { - lt - } + override def getLogicalType: Optional[String] = + { + lt + } - override def validate(): Unit = + override def validate(): Unit = {} - override def dump(): TaskSource = - { - null - } + override def dump(): TaskSource = + { + null + } } private case class DummyColumnOptionTask(lt: Optional[String]) extends ColumnOptionTask { - override def getTimeZoneId: GOptional[String] = - { - GOptional.absent[String] - } - - override def getFormat: GOptional[String] = - { - GOptional.absent[String] - } - - override def getLogicalType: Optional[String] = - { - lt - } - - override def validate(): Unit = + override def getTimeZoneId: GOptional[String] = + { + GOptional.absent[String] + } + + override def getFormat: GOptional[String] = + { + GOptional.absent[String] + } + + override def getLogicalType: Optional[String] = + { + lt + } + + override def validate(): Unit = {} - override def dump(): TaskSource = - { - null - } + override def dump(): TaskSource = + { + null + } } } From fa4f26824a7424b27bd3aeedf1e47ca6266d8137 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 17 Nov 2019 17:14:34 +0900 Subject: [PATCH 19/19] Ship v0.1.0 --- CHANGELOG.md | 9 +++++++++ README.md | 4 ++++ build.gradle | 2 +- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a4fd65..d1018d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +0.1.0 (2019-11-17) +================== + +* [New Feature] Support Logical Types older representations(OriginalTypes) #12 +* [Enhancement] Add Github Actions CI settings #13 +* [Enhancement] Support LogicalTypes for Glue Data Catalog #14 +* [Enhancement] Update dependencies #15 +* [New Feature] Support `auth_method: web_identity_token` #15 + 0.0.3 (2019-07-17) ================== diff --git a/README.md b/README.md index 50c4c08..000ceaa 100644 --- a/README.md +++ b/README.md @@ -160,3 +160,7 @@ $ ./gradlew gemPush ## ChangeLog [CHANGELOG.md](./CHANGELOG.md) + +## Contributors + +- @syucream diff --git a/build.gradle b/build.gradle index 8cd8a32..8f33f8c 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ configurations { provided } -version = "0.0.4" +version = "0.1.0" sourceCompatibility = 1.8 targetCompatibility = 1.8