Skip to content

Commit 7df9251

Browse files
committed
Allow any ArrowReader implementation to be use for reading Arrow data
#627
1 parent 5e43ec9 commit 7df9251

File tree

4 files changed

+40
-5
lines changed

4 files changed

+40
-5
lines changed

dataframe-arrow/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ dependencies {
2424
testImplementation(libs.kotestAssertions) {
2525
exclude("org.jetbrains.kotlin", "kotlin-stdlib-jdk8")
2626
}
27+
testImplementation(libs.arrow.c.data)
28+
testImplementation(libs.duckdb.jdbc)
2729
}
2830

2931
kotlinPublications {

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ internal fun DataFrame.Companion.readArrowImpl(
317317
add(df)
318318
}
319319
}
320-
is ArrowStreamReader -> {
320+
else -> {
321321
val root = reader.vectorSchemaRoot
322322
val schema = root.schema
323323
while (reader.loadNextBatch()) {

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.apache.arrow.vector.TimeStampSecVector
1111
import org.apache.arrow.vector.VectorSchemaRoot
1212
import org.apache.arrow.vector.ipc.ArrowFileReader
1313
import org.apache.arrow.vector.ipc.ArrowFileWriter
14+
import org.apache.arrow.vector.ipc.ArrowReader
1415
import org.apache.arrow.vector.ipc.ArrowStreamReader
1516
import org.apache.arrow.vector.ipc.ArrowStreamWriter
1617
import org.apache.arrow.vector.types.FloatingPointPrecision
@@ -21,6 +22,9 @@ import org.apache.arrow.vector.types.pojo.FieldType
2122
import org.apache.arrow.vector.types.pojo.Schema
2223
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
2324
import org.apache.arrow.vector.util.Text
25+
import org.duckdb.DuckDBConnection
26+
import org.duckdb.DuckDBResultSet
27+
import org.jetbrains.kotlinx.dataframe.AnyFrame
2428
import org.jetbrains.kotlinx.dataframe.DataColumn
2529
import org.jetbrains.kotlinx.dataframe.DataFrame
2630
import org.jetbrains.kotlinx.dataframe.api.NullabilityOptions
@@ -34,12 +38,14 @@ import org.jetbrains.kotlinx.dataframe.api.pathOf
3438
import org.jetbrains.kotlinx.dataframe.api.remove
3539
import org.jetbrains.kotlinx.dataframe.api.toColumn
3640
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException
41+
import org.junit.Assert
3742
import org.junit.Test
3843
import java.io.ByteArrayInputStream
3944
import java.io.ByteArrayOutputStream
4045
import java.io.File
4146
import java.net.URL
4247
import java.nio.channels.Channels
48+
import java.sql.DriverManager
4349
import java.time.LocalDate
4450
import java.time.LocalDateTime
4551
import java.time.ZoneOffset
@@ -558,23 +564,26 @@ internal class ArrowKtTest {
558564
}
559565
}
560566

561-
@Test
562-
fun testArrowReaderExtension() {
567+
private fun expectedSimpleDataFrame(): AnyFrame {
563568
val dates = listOf(
564-
LocalDateTime.of(2023, 11, 23, 9, 30, 25),
569+
LocalDateTime.of(2020, 11, 23, 9, 30, 25),
565570
LocalDateTime.of(2015, 5, 25, 14, 20, 13),
566571
LocalDateTime.of(2013, 6, 19, 11, 20, 13),
567572
LocalDateTime.of(2000, 1, 1, 0, 0, 0)
568573
)
569574

570-
val expected = dataFrameOf(
575+
return dataFrameOf(
571576
"string" to listOf("a", "b", "c", "d"),
572577
"int" to listOf(1, 2, 3, 4),
573578
"float" to listOf(1.0f, 2.0f, 3.0f, 4.0f),
574579
"double" to listOf(1.0, 2.0, 3.0, 4.0),
575580
"datetime" to dates
576581
)
582+
}
577583

584+
@Test
585+
fun testArrowReaderExtension() {
586+
val expected = expectedSimpleDataFrame()
578587
val featherChannel = ByteArrayReadableSeekableByteChannel(expected.saveArrowFeatherToByteArray())
579588
val arrowFileReader = ArrowFileReader(featherChannel, RootAllocator())
580589
arrowFileReader.toDataFrame() shouldBe expected
@@ -583,4 +592,24 @@ internal class ArrowKtTest {
583592
val arrowStreamReader = ArrowStreamReader(ipcInputStream, RootAllocator())
584593
arrowStreamReader.toDataFrame() shouldBe expected
585594
}
595+
596+
@Test
597+
fun testDuckDBArrowIntegration() {
598+
val expected = expectedSimpleDataFrame()
599+
val query = """
600+
select 'a' as string, 1 as int, CAST(1.0 as FLOAT) as float, CAST(1.0 as DOUBLE) as double, TIMESTAMP '2020-11-23 09:30:25' as datetime
601+
UNION ALL SELECT 'b', 2, 2.0, 2.0, TIMESTAMP '2015-05-25 14:20:13'
602+
UNION ALL SELECT 'c', 3, 3.0, 3.0, TIMESTAMP '2013-06-19 11:20:13'
603+
UNION ALL SELECT 'd', 4, 4.0, 4.0, TIMESTAMP '2000-01-01 00:00:00'
604+
""".trimIndent()
605+
606+
Class.forName("org.duckdb.DuckDBDriver")
607+
val conn = DriverManager.getConnection("jdbc:duckdb:") as DuckDBConnection
608+
conn.use {
609+
val resultSet = it.createStatement().executeQuery(query) as DuckDBResultSet
610+
val dbArrowReader = resultSet.arrowExportStream(RootAllocator(), 256) as ArrowReader
611+
Assert.assertTrue(dbArrowReader.javaClass.name.equals("org.apache.arrow.c.ArrowArrayStreamReader"))
612+
DataFrame.readArrow(dbArrowReader) shouldBe expected
613+
}
614+
}
586615
}

gradle/libs.versions.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ shadow = "8.1.1"
5656
android-gradle-api = "7.3.1" # Can't be updated to 7.4.0+ due to Java 8 compatibility
5757
ktor-server-netty = "2.3.8"
5858
kotlin-compile-testing = "1.5.0"
59+
duckdb = "0.10.0"
5960

6061
[libraries]
6162
ksp-gradle = { group = "com.google.devtools.ksp", name = "symbol-processing-gradle-plugin", version.ref = "ksp" }
@@ -97,6 +98,8 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
9798
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
9899
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
99100
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
101+
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }
102+
100103

101104
kotlinpoet = { group = "com.squareup", name = "kotlinpoet", version.ref = "kotlinpoet" }
102105
swagger = { group = "io.swagger.parser.v3", name = "swagger-parser", version.ref = "openapi" }
@@ -120,6 +123,7 @@ kotlin-jupyter-test-kit = { group = "org.jetbrains.kotlinx", name = "kotlin-jupy
120123

121124
dataframe-symbol-processor = { group = "org.jetbrains.kotlinx.dataframe", name = "symbol-processor-all" }
122125

126+
duckdb-jdbc = { group = "org.duckdb", name = "duckdb_jdbc", version.ref= "duckdb"}
123127

124128
[plugins]
125129
jupyter-api = { id = "org.jetbrains.kotlin.jupyter.api", version.ref = "kotlinJupyter" }

0 commit comments

Comments
 (0)