Skip to content

Commit 0afdab3

Browse files
Zesong SunJialin Qiao
authored andcommitted
[IOTDB-91] Improve tsfile-spark-connector to support spark 2.4.3 (#227)
* update tsfile-spark-connector to support spark 2.4.3
1 parent fbd0117 commit 0afdab3

File tree

7 files changed

+26
-22
lines changed

7 files changed

+26
-22
lines changed

docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ With this connector, you can
6161

6262
|Spark Version | Scala Version | Java Version | TsFile |
6363
|------------- | ------------- | ------------ |------------ |
64-
| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
64+
| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
6565

6666
> Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.
6767

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
<slf4j.version>1.7.12</slf4j.version>
5656
<logback.version>1.1.11</logback.version>
5757
<joda.version>2.9.9</joda.version>
58-
<spark.version>2.0.1</spark.version>
58+
<spark.version>2.4.3</spark.version>
5959
<common.io.version>2.5</common.io.version>
6060
<commons.collections4>4.0</commons.collections4>
6161
<thrift.version>0.9.3</thrift.version>

spark/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ With this connector, you can
6161

6262
|Spark Version | Scala Version | Java Version | TsFile |
6363
|------------- | ------------- | ------------ |------------ |
64-
| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
64+
| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
6565

6666
> Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.
6767

spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary
3636
import org.apache.iotdb.tsfile.write.record.TSRecord
3737
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
3838
import org.apache.iotdb.tsfile.write.schema.{FileSchema, MeasurementSchema, SchemaBuilder}
39-
import org.apache.spark.sql.Row
39+
import org.apache.spark.sql.catalyst.InternalRow
4040
import org.apache.spark.sql.sources._
4141
import org.apache.spark.sql.types._
4242

@@ -297,12 +297,12 @@ object Converter {
297297
* @param row given spark sql row
298298
* @return TSRecord
299299
*/
300-
def toTsRecord(row: Row): List[TSRecord] = {
301-
val schema = row.schema
302-
val time = row.getAs[Long](QueryConstant.RESERVED_TIME)
300+
def toTsRecord(row: InternalRow, dataSchema: StructType): List[TSRecord] = {
301+
val time = row.getLong(0)
303302
val deviceToRecord = scala.collection.mutable.Map[String, TSRecord]()
303+
var index = 1
304304

305-
schema.fields.filter(f => {
305+
dataSchema.fields.filter(f => {
306306
!QueryConstant.RESERVED_TIME.equals(f.name)
307307
}).foreach(f => {
308308
val name = f.name
@@ -315,20 +315,20 @@ object Converter {
315315
val tsRecord: TSRecord = deviceToRecord.getOrElse(device, new TSRecord(time, device))
316316

317317
val dataType = getTsDataType(f.dataType)
318-
val index = row.fieldIndex(name)
319318
if (!row.isNullAt(index)) {
320319
val value = f.dataType match {
321-
case BooleanType => row.getAs[Boolean](name)
322-
case IntegerType => row.getAs[Int](name)
323-
case LongType => row.getAs[Long](name)
324-
case FloatType => row.getAs[Float](name)
325-
case DoubleType => row.getAs[Double](name)
326-
case StringType => row.getAs[String](name)
320+
case BooleanType => row.getBoolean(index)
321+
case IntegerType => row.getInt(index)
322+
case LongType => row.getLong(index)
323+
case FloatType => row.getFloat(index)
324+
case DoubleType => row.getDouble(index)
325+
case StringType => row.getString(index)
327326
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
328327
}
329328
val dataPoint = DataPoint.getDataPoint(dataType, measurement, value.toString)
330329
tsRecord.addTuple(dataPoint)
331330
}
331+
index += 1
332332
})
333333
deviceToRecord.values.toList
334334
}

spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import org.apache.hadoop.io.NullWritable
2222
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
2323
import org.apache.iotdb.tsfile.io.TsFileOutputFormat
2424
import org.apache.iotdb.tsfile.write.record.TSRecord
25-
import org.apache.spark.sql.Row
2625
import org.apache.spark.sql.execution.datasources.OutputWriter
2726
import org.apache.spark.sql.types._
27+
import org.apache.spark.sql.catalyst.InternalRow
2828

2929
private[tsfile] class TsFileOutputWriter(
3030
pathStr: String,
@@ -37,9 +37,9 @@ private[tsfile] class TsFileOutputWriter(
3737
new TsFileOutputFormat(fileSchema).getRecordWriter(context)
3838
}
3939

40-
override def write(row: Row): Unit = {
40+
override def write(row: InternalRow): Unit = {
4141
if (row != null) {
42-
val tsRecord = Converter.toTsRecord(row)
42+
val tsRecord = Converter.toTsRecord(row, dataSchema)
4343
tsRecord.foreach(r => {
4444
recordWriter.write(NullWritable.get(), r)
4545
})

spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ private[tsfile] class TsFileWriterFactory(options: Map[String, String]) extends
2727

2828
override def newInstance(
2929
path: String,
30-
bucketId: Option[Int],
3130
dataSchema: StructType,
3231
context: TaskAttemptContext): OutputWriter = {
3332
new TsFileOutputWriter(path, dataSchema, options, context)
3433
}
34+
35+
override def getFileExtension(context: TaskAttemptContext): String = {
36+
null
37+
}
3538
}

spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader
3232
import org.apache.iotdb.tsfile.read.common.Field
3333
import org.apache.iotdb.tsfile.utils.Binary
3434
import org.apache.spark.sql.SparkSession
35-
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
35+
import org.apache.spark.sql.catalyst.InternalRow
36+
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericRowWithSchema}
3637
import org.apache.spark.sql.sources._
3738
import org.apache.spark.sql.types._
3839
import org.junit.Assert
@@ -223,8 +224,8 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
223224
fields.add(StructField("device_2.sensor_2", IntegerType, true))
224225
val schema = StructType(fields)
225226

226-
var row: GenericRowWithSchema = new GenericRowWithSchema(Array(1L, null, 1.2f, 20, 19, 2.3f, 11), schema)
227-
val records = Converter.toTsRecord(row)
227+
val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11))
228+
val records = Converter.toTsRecord(row, schema)
228229

229230
Assert.assertEquals(2, records.size)
230231
Assert.assertEquals(1, records(0).time)

0 commit comments

Comments
 (0)