@@ -3,7 +3,16 @@ package org.jetbrains.kotlinx.dataframe.io
33import io.kotest.assertions.throwables.shouldThrow
44import io.kotest.matchers.collections.shouldContain
55import io.kotest.matchers.shouldBe
6+ import org.apache.arrow.memory.RootAllocator
7+ import org.apache.arrow.vector.TimeStampMicroVector
8+ import org.apache.arrow.vector.TimeStampMilliVector
9+ import org.apache.arrow.vector.TimeStampNanoVector
10+ import org.apache.arrow.vector.TimeStampSecVector
11+ import org.apache.arrow.vector.VectorSchemaRoot
12+ import org.apache.arrow.vector.ipc.ArrowFileWriter
13+ import org.apache.arrow.vector.ipc.ArrowStreamWriter
614import org.apache.arrow.vector.types.FloatingPointPrecision
15+ import org.apache.arrow.vector.types.TimeUnit
716import org.apache.arrow.vector.types.pojo.ArrowType
817import org.apache.arrow.vector.types.pojo.Field
918import org.apache.arrow.vector.types.pojo.FieldType
@@ -23,10 +32,13 @@ import org.jetbrains.kotlinx.dataframe.api.remove
2332import org.jetbrains.kotlinx.dataframe.api.toColumn
2433import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException
2534import org.junit.Test
35+ import java.io.ByteArrayOutputStream
2636import java.io.File
2737import java.net.URL
38+ import java.nio.channels.Channels
2839import java.time.LocalDate
2940import java.time.LocalDateTime
41+ import java.time.ZoneOffset
3042import java.util.Locale
3143import kotlin.reflect.typeOf
3244
@@ -273,4 +285,87 @@ internal class ArrowKtTest {
273285 val data = dataFrame.saveArrowFeatherToByteArray()
274286 DataFrame .readArrowFeather(data) shouldBe dataFrame
275287 }
288+
289+ @Test
290+ fun testTimeStamp (){
291+ val dates = listOf (
292+ LocalDateTime .of(2023 ,11 ,23 ,9 ,30 ,25 ),
293+ LocalDateTime .of(2015 ,5 ,25 ,14 ,20 ,13 ),
294+ LocalDateTime .of(2013 ,6 ,19 ,11 ,20 ,13 )
295+ )
296+
297+ val dataFrame = dataFrameOf(
298+ " ts_nano" to dates,
299+ " ts_micro" to dates,
300+ " ts_milli" to dates,
301+ " ts_sec" to dates)
302+
303+ DataFrame .readArrowFeather(writeArrowTimestamp(dates)) shouldBe dataFrame
304+ DataFrame .readArrowIPC(writeArrowTimestamp(dates,true )) shouldBe dataFrame
305+
306+ }
307+
308+ private fun writeArrowTimestamp (dates : List <LocalDateTime >,streaming : Boolean =false) :ByteArray {
309+ RootAllocator ().use { allocator ->
310+ val timeStampMilli = Field (
311+ " ts_milli" ,
312+ FieldType .nullable(ArrowType .Timestamp (TimeUnit .MILLISECOND , null )),
313+ null
314+ )
315+
316+ val timeStampMicro = Field (
317+ " ts_micro" ,
318+ FieldType .nullable(ArrowType .Timestamp (TimeUnit .MICROSECOND , null )),
319+ null
320+ )
321+
322+ val timeStampNano = Field (
323+ " ts_nano" ,
324+ FieldType .nullable(ArrowType .Timestamp (TimeUnit .NANOSECOND , null )),
325+ null
326+ )
327+
328+ val timeStampSec = Field (
329+ " ts_sec" ,
330+ FieldType .nullable(ArrowType .Timestamp (TimeUnit .SECOND , null )),
331+ null
332+ )
333+ val schemaTimeStamp = Schema (
334+ listOf (timeStampNano,timeStampMicro,timeStampMilli,timeStampSec)
335+ )
336+ VectorSchemaRoot .create(schemaTimeStamp, allocator).use { vectorSchemaRoot ->
337+ val timeStampMilliVector = vectorSchemaRoot.getVector(" ts_milli" ) as TimeStampMilliVector
338+ val timeStampNanoVector = vectorSchemaRoot.getVector(" ts_nano" ) as TimeStampNanoVector
339+ val timeStampMicroVector = vectorSchemaRoot.getVector(" ts_micro" ) as TimeStampMicroVector
340+ val timeStampSecVector = vectorSchemaRoot.getVector(" ts_sec" ) as TimeStampSecVector
341+ timeStampMilliVector.allocateNew(dates.size)
342+ timeStampNanoVector.allocateNew(dates.size)
343+ timeStampMicroVector.allocateNew(dates.size)
344+ timeStampSecVector.allocateNew(dates.size)
345+
346+
347+ dates.forEachIndexed { index, localDateTime ->
348+ val instant = localDateTime.toInstant(ZoneOffset .UTC )
349+ timeStampNanoVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano
350+ timeStampMicroVector[index] = instant.toEpochMilli() * 1_000L
351+ timeStampMilliVector[index] = instant.toEpochMilli()
352+ timeStampSecVector[index] = instant.toEpochMilli() / 1_000L
353+ }
354+ vectorSchemaRoot.setRowCount(dates.size)
355+ val bos = ByteArrayOutputStream ()
356+ bos.use { out ->
357+ val arrowWriter = if (streaming){
358+ ArrowStreamWriter (vectorSchemaRoot, null , Channels .newChannel(out ))
359+ }else {
360+ ArrowFileWriter (vectorSchemaRoot, null , Channels .newChannel(out ))
361+ }
362+ arrowWriter .use { writer ->
363+ writer.start()
364+ writer.writeBatch()
365+ }
366+ }
367+ return bos.toByteArray()
368+ }
369+ }
370+ }
276371}
0 commit comments