Skip to content

Commit

Permalink
Merge pull request #55 from sekikn/replace-parquet-tools
Browse files Browse the repository at this point in the history
Replace parquet-tools with parquet-avro
  • Loading branch information
civitaspo authored Feb 14, 2023
2 parents b2f7f0c + 53ced7f commit 55c60de
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 18 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ dependencies {
}
testImplementation "org.embulk:embulk-core:0.9.23:tests"
testImplementation "org.scalatest:scalatest_2.13:3.1.1"
testImplementation 'org.apache.parquet:parquet-tools:1.11.0'
testImplementation 'org.apache.parquet:parquet-avro:1.11.0'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-avro:2.14.0'
testImplementation 'org.apache.hadoop:hadoop-client:2.9.2'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import com.amazonaws.services.s3.transfer.{
TransferManagerBuilder
}
import com.google.inject.{Binder, Guice, Module, Stage}
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path => HadoopPath}
import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetReader}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.MessageType
import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord}
import org.embulk.{TestPluginSourceModule, TestUtilityModule}
import org.embulk.config.{
ConfigLoader,
Expand Down Expand Up @@ -229,27 +230,16 @@ abstract class EmbulkPluginTestHelper
)
) { reader => messageTypeTest(reader.getFileMetaData.getSchema) }

val reader: ParquetReader[SimpleRecord] = ParquetReader
val reader: ParquetReader[GenericRecord] = ParquetReader
.builder(
new SimpleReadSupport(),
new AvroReadSupport[GenericRecord](),
new HadoopPath(pathString)
)
.build()

def read(
reader: ParquetReader[SimpleRecord],
records: Seq[Seq[AnyRef]] = Seq()
): Seq[Seq[AnyRef]] = {
val simpleRecord: SimpleRecord = reader.read()
if (simpleRecord != null) {
val r: Seq[AnyRef] = simpleRecord.getValues
.map(_.getValue)
return read(reader, records :+ r)
}
records
}
try read(reader)
finally reader.close()
Iterator.continually(reader.read()).takeWhile(_ != null).map(
record => record.getSchema.getFields.map(f => record.get(f.name()))
).toSeq
}

def loadConfigSourceFromYamlString(yaml: String): ConfigSource =
Expand Down

0 comments on commit 55c60de

Please sign in to comment.