Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ With this connector, you can

|Spark Version | Scala Version | Java Version | TsFile |
|------------- | ------------- | ------------ |------------ |
| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|

> Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<slf4j.version>1.7.12</slf4j.version>
<logback.version>1.1.11</logback.version>
<joda.version>2.9.9</joda.version>
<spark.version>2.0.1</spark.version>
<spark.version>2.4.3</spark.version>
<common.io.version>2.5</common.io.version>
<commons.collections4>4.0</commons.collections4>
<thrift.version>0.9.3</thrift.version>
Expand Down
2 changes: 1 addition & 1 deletion spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ With this connector, you can

|Spark Version | Scala Version | Java Version | TsFile |
|------------- | ------------- | ------------ |------------ |
| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|

> Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.

Expand Down
24 changes: 12 additions & 12 deletions spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
import org.apache.iotdb.tsfile.write.schema.{FileSchema, MeasurementSchema, SchemaBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -297,12 +297,12 @@ object Converter {
* @param row given spark sql row
* @return TSRecord
*/
def toTsRecord(row: Row): List[TSRecord] = {
val schema = row.schema
val time = row.getAs[Long](QueryConstant.RESERVED_TIME)
def toTsRecord(row: InternalRow, dataSchema: StructType): List[TSRecord] = {
val time = row.getLong(0)
val deviceToRecord = scala.collection.mutable.Map[String, TSRecord]()
var index = 1

schema.fields.filter(f => {
dataSchema.fields.filter(f => {
!QueryConstant.RESERVED_TIME.equals(f.name)
}).foreach(f => {
val name = f.name
Expand All @@ -315,20 +315,20 @@ object Converter {
val tsRecord: TSRecord = deviceToRecord.getOrElse(device, new TSRecord(time, device))

val dataType = getTsDataType(f.dataType)
val index = row.fieldIndex(name)
if (!row.isNullAt(index)) {
val value = f.dataType match {
case BooleanType => row.getAs[Boolean](name)
case IntegerType => row.getAs[Int](name)
case LongType => row.getAs[Long](name)
case FloatType => row.getAs[Float](name)
case DoubleType => row.getAs[Double](name)
case StringType => row.getAs[String](name)
case BooleanType => row.getBoolean(index)
case IntegerType => row.getInt(index)
case LongType => row.getLong(index)
case FloatType => row.getFloat(index)
case DoubleType => row.getDouble(index)
case StringType => row.getString(index)
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
val dataPoint = DataPoint.getDataPoint(dataType, measurement, value.toString)
tsRecord.addTuple(dataPoint)
}
index += 1
})
deviceToRecord.values.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.iotdb.tsfile.io.TsFileOutputFormat
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.InternalRow

private[tsfile] class TsFileOutputWriter(
pathStr: String,
Expand All @@ -37,9 +37,9 @@ private[tsfile] class TsFileOutputWriter(
new TsFileOutputFormat(fileSchema).getRecordWriter(context)
}

override def write(row: Row): Unit = {
override def write(row: InternalRow): Unit = {
if (row != null) {
val tsRecord = Converter.toTsRecord(row)
val tsRecord = Converter.toTsRecord(row, dataSchema)
tsRecord.foreach(r => {
recordWriter.write(NullWritable.get(), r)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ private[tsfile] class TsFileWriterFactory(options: Map[String, String]) extends

override def newInstance(
path: String,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new TsFileOutputWriter(path, dataSchema, options, context)
}

override def getFileExtension(context: TaskAttemptContext): String = {
null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader
import org.apache.iotdb.tsfile.read.common.Field
import org.apache.iotdb.tsfile.utils.Binary
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericRowWithSchema}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.junit.Assert
Expand Down Expand Up @@ -223,8 +224,8 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
fields.add(StructField("device_2.sensor_2", IntegerType, true))
val schema = StructType(fields)

var row: GenericRowWithSchema = new GenericRowWithSchema(Array(1L, null, 1.2f, 20, 19, 2.3f, 11), schema)
val records = Converter.toTsRecord(row)
val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11))
val records = Converter.toTsRecord(row, schema)

Assert.assertEquals(2, records.size)
Assert.assertEquals(1, records(0).time)
Expand Down