diff --git a/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md b/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md index 3f88d4e13d305..1d50a050c5098 100644 --- a/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md +++ b/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md @@ -61,7 +61,7 @@ With this connector, you can |Spark Version | Scala Version | Java Version | TsFile | |------------- | ------------- | ------------ |------------ | -| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`| +| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`| > Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile. diff --git a/pom.xml b/pom.xml index d8544b9f0ed98..9c77be200b57d 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ 1.7.12 1.1.11 2.9.9 - 2.0.1 + 2.4.3 2.5 4.0 0.9.3 diff --git a/spark/README.md b/spark/README.md index 3f88d4e13d305..1d50a050c5098 100644 --- a/spark/README.md +++ b/spark/README.md @@ -61,7 +61,7 @@ With this connector, you can |Spark Version | Scala Version | Java Version | TsFile | |------------- | ------------- | ------------ |------------ | -| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`| +| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`| > Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile. diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala index 09156f7f68ceb..4ad3c663d3c21 100755 --- a/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala +++ b/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala @@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary import org.apache.iotdb.tsfile.write.record.TSRecord import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint import org.apache.iotdb.tsfile.write.schema.{FileSchema, MeasurementSchema, SchemaBuilder} -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -297,12 +297,12 @@ object Converter { * @param row given spark sql row * @return TSRecord */ - def toTsRecord(row: Row): List[TSRecord] = { - val schema = row.schema - val time = row.getAs[Long](QueryConstant.RESERVED_TIME) + def toTsRecord(row: InternalRow, dataSchema: StructType): List[TSRecord] = { + val time = row.getLong(0) val deviceToRecord = scala.collection.mutable.Map[String, TSRecord]() + var index = 1 - schema.fields.filter(f => { + dataSchema.fields.filter(f => { !QueryConstant.RESERVED_TIME.equals(f.name) }).foreach(f => { val name = f.name @@ -315,20 +315,20 @@ object Converter { val tsRecord: TSRecord = deviceToRecord.getOrElse(device, new TSRecord(time, device)) val dataType = getTsDataType(f.dataType) - val index = row.fieldIndex(name) if (!row.isNullAt(index)) { val value = f.dataType match { - case BooleanType => row.getAs[Boolean](name) - case IntegerType => row.getAs[Int](name) - case LongType => row.getAs[Long](name) - case FloatType => row.getAs[Float](name) - case DoubleType => row.getAs[Double](name) - case StringType => row.getAs[String](name) + case BooleanType => row.getBoolean(index) + case IntegerType => row.getInt(index) + case LongType => row.getLong(index) + case FloatType => row.getFloat(index) + case DoubleType => row.getDouble(index) + case StringType => row.getString(index) case other => throw new UnsupportedOperationException(s"Unsupported type $other") } val dataPoint = DataPoint.getDataPoint(dataType, measurement, value.toString) tsRecord.addTuple(dataPoint) } + index += 1 }) deviceToRecord.values.toList } diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala index 0ced194d0b2d5..88a6755082a40 100644 --- a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala +++ b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala @@ -22,9 +22,9 @@ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} import org.apache.iotdb.tsfile.io.TsFileOutputFormat import org.apache.iotdb.tsfile.write.record.TSRecord -import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.InternalRow private[tsfile] class TsFileOutputWriter( pathStr: String, @@ -37,9 +37,9 @@ private[tsfile] class TsFileOutputWriter( new TsFileOutputFormat(fileSchema).getRecordWriter(context) } - override def write(row: Row): Unit = { + override def write(row: InternalRow): Unit = { if (row != null) { - val tsRecord = Converter.toTsRecord(row) + val tsRecord = Converter.toTsRecord(row, dataSchema) tsRecord.foreach(r => { recordWriter.write(NullWritable.get(), r) }) diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala index d78d093f5d1b1..8b0079a7dafd3 100644 --- a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala +++ b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala @@ -27,9 +27,12 @@ private[tsfile] class TsFileWriterFactory(options: Map[String, String]) extends override def newInstance( path: String, - bucketId: Option[Int], dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { new TsFileOutputWriter(path, dataSchema, options, context) } + + override def getFileExtension(context: TaskAttemptContext): String = { + null + } } diff --git a/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala b/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala index 9f2f36d94fced..86bbd82fbcd3d 100644 --- a/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala +++ b/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala @@ -32,7 +32,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader import org.apache.iotdb.tsfile.read.common.Field import org.apache.iotdb.tsfile.utils.Binary import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericRowWithSchema} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.junit.Assert @@ -223,8 +224,8 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll { fields.add(StructField("device_2.sensor_2", IntegerType, true)) val schema = StructType(fields) - var row: GenericRowWithSchema = new GenericRowWithSchema(Array(1L, null, 1.2f, 20, 19, 2.3f, 11), schema) - val records = Converter.toTsRecord(row) + val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11)) + val records = Converter.toTsRecord(row, schema) Assert.assertEquals(2, records.size) Assert.assertEquals(1, records(0).time)