Skip to content

Commit

Permalink
Add support for Timestamp type fields
Browse files Browse the repository at this point in the history
Update dependencies as well to the newest possible versions.
The java.time.Instant converter was added only to the driver version
0.12.0 and works only with Tarantool versions 2.11.0+, so using the
Timestamp values will lead to errors on previous Tarantool versions.
The converter does not execute any checks for that for simplicity and to
avoid extra performance impact.
  • Loading branch information
akudiyar committed Aug 1, 2023
1 parent 8db3472 commit d5f45bd
Show file tree
Hide file tree
Showing 18 changed files with 100 additions and 84 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ jobs:
matrix:
include:
- scala: "2.11.12"
tarantool: "2.8"
tarantool: "2.11"
router_port: "3301"
router_api_port: "8081"
- scala: "2.12.16"
tarantool: "2.8"
tarantool: "2.11"
router_port: "3331"
router_api_port: "8381"
- scala: "2.13.10"
tarantool: "2.8"
tarantool: "2.11"
router_port: "3361"
router_api_port: "8681"
steps:
Expand Down
20 changes: 11 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ ThisBuild / developers := List(
ThisBuild / scalaVersion := scala211

val commonDependencies = Seq(
"io.tarantool" % "cartridge-driver" % "0.10.1",
"junit" % "junit" % "4.12" % Test,
"com.github.sbt" % "junit-interface" % "0.12" % Test,
"org.scalatest" %% "scalatest" % "3.2.14" % Test,
"io.tarantool" % "cartridge-driver" % "0.12.0",
"junit" % "junit" % "4.13.2" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test,
"org.scalatest" %% "scalatest" % "3.2.16" % Test,
"org.scalamock" %% "scalamock" % "5.2.0" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.12" % Test,
"ch.qos.logback" % "logback-core" % "1.2.5" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.5" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.17" % Test,
"ch.qos.logback" % "logback-core" % "1.2.12" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.12" % Test,
"org.apache.derby" % "derby" % "10.11.1.1" % Test,
"io.tarantool" % "testcontainers-java-tarantool" % "0.5.3" % Test
"io.tarantool" % "testcontainers-java-tarantool" % "1.0.0" % Test,
"org.msgpack" % "msgpack-core" % "0.9.0" % Test
).map(
_.exclude("io.netty", "netty-all")
.exclude("io.netty", "netty-transport")
Expand Down Expand Up @@ -116,7 +117,8 @@ lazy val root = (project in file("."))
// Test frameworks options
testOptions ++= Seq(
Tests.Argument(TestFrameworks.JUnit, "-v"),
Tests.Setup(() => System.setSecurityManager(null)) // SPARK-22918
Tests.Setup(() => System.setSecurityManager(null)), // SPARK-22918
Tests.Argument("-oF")
),
// Publishing settings
publishTo := {
Expand Down
22 changes: 13 additions & 9 deletions src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import java.lang.{
Short => JShort
}
import java.util.{ArrayList => JList, HashMap => JMap}
import java.sql.Timestamp
import java.time.Instant
import scala.collection.JavaConverters.{mapAsJavaMapConverter, seqAsJavaListConverter}

/**
Expand Down Expand Up @@ -75,15 +77,16 @@ object MapFunctions {

def dataTypeToJavaClass(dataType: DataType): Class[_] =
dataType match {
case StringType => classOf[java.lang.String]
case LongType => classOf[java.lang.Long]
case IntegerType => classOf[java.lang.Integer]
case ShortType => classOf[java.lang.Integer]
case ByteType => classOf[java.lang.Integer]
case BooleanType => classOf[java.lang.Boolean]
case DoubleType => classOf[java.lang.Double]
case FloatType => classOf[java.lang.Float]
case _: DecimalType => classOf[java.math.BigDecimal]
case StringType => classOf[java.lang.String]
case LongType => classOf[java.lang.Long]
case IntegerType => classOf[java.lang.Integer]
case ShortType => classOf[java.lang.Integer]
case ByteType => classOf[java.lang.Integer]
case BooleanType => classOf[java.lang.Boolean]
case DoubleType => classOf[java.lang.Double]
case FloatType => classOf[java.lang.Float]
case _: DecimalType => classOf[java.math.BigDecimal]
case _: TimestampType => classOf[java.time.Instant]
case mapType: MapType =>
val keyClass = dataTypeToJavaClass(mapType.keyType)
val valueClass = dataTypeToJavaClass(mapType.valueType)
Expand Down Expand Up @@ -170,6 +173,7 @@ object MapFunctions {
case value: Long => value.asInstanceOf[JLong]
case value: Float => value.asInstanceOf[JFloat]
case value: Double => value.asInstanceOf[JDouble]
case value: Timestamp => value.toInstant().asInstanceOf[Instant]
case value: Any => identity(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ object TarantoolFieldTypes extends Enumeration {
val BOOLEAN: TarantoolFieldType = TarantoolFieldType("boolean", DataTypes.BooleanType)
val DECIMAL: TarantoolFieldType = TarantoolFieldType("decimal", createDecimalType())
val UUID: TarantoolFieldType = TarantoolFieldType("uuid", DataTypes.StringType)
val DATETIME: TarantoolFieldType = TarantoolFieldType("datetime", DataTypes.TimestampType)

val ARRAY: TarantoolFieldType =
TarantoolFieldType("array", DataTypes.createArrayType(DataTypes.StringType, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class JavaSparkContextFunctionsTest extends SharedJavaSparkContext {
@Before
public void beforeEach() {
try {
container.executeScript("test_setup.lua").get();
container.executeScript("test_setup.lua");
} catch (Exception e) {
throw new RuntimeException("Failed to set up test: ", e);
}
Expand All @@ -32,7 +32,7 @@ public void beforeEach() {
@After
public void afterEach() {
try {
container.executeScript("test_teardown.lua").get();
container.executeScript("test_teardown.lua");
} catch (Exception e) {
throw new RuntimeException("Failed to set up test: ", e);
}
Expand Down
34 changes: 20 additions & 14 deletions src/test/resources/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
FROM tgagor/centos:stream8 AS tarantool-base
ARG TARANTOOL_VERSION=2.8
ARG TARANTOOL_VERSION=2.11.0
FROM tarantool/tarantool:${TARANTOOL_VERSION}-centos7 AS cartridge-base

ARG TARANTOOL_SERVER_USER="tarantool"
ARG TARANTOOL_SERVER_UID=1000
ARG TARANTOOL_SERVER_GROUP="tarantool"
ARG TARANTOOL_SERVER_GID=1000
ARG TARANTOOL_WORKDIR="/app"
ARG TARANTOOL_RUNDIR="/tmp/run"
ARG TARANTOOL_DATADIR="/tmp/data"
ARG TARANTOOL_INSTANCES_FILE="./instances.yml"
ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR
ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR
ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR
ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE
RUN curl -L https://tarantool.io/installer.sh | VER=$TARANTOOL_VERSION /bin/bash -s -- --repo-only && \
yum -y install cmake make gcc gcc-c++ git unzip tarantool tarantool-devel cartridge-cli && \
# a yum bug requires setting ulimit, see https://bugzilla.redhat.com/show_bug.cgi?id=1537564
RUN ulimit -n 1024 &&\
yum -y install cmake make gcc gcc-c++ git unzip cartridge-cli && \
yum clean all
RUN groupadd -g $TARANTOOL_SERVER_GID $TARANTOOL_SERVER_GROUP && \
useradd -u $TARANTOOL_SERVER_UID -g $TARANTOOL_SERVER_GID -m -s /bin/bash $TARANTOOL_SERVER_USER \
|| true
USER $TARANTOOL_SERVER_USER:$TARANTOOL_SERVER_GROUP
RUN cartridge version

FROM tarantool-base AS cartridge-base
FROM cartridge-base AS cartridge-app
ARG TARANTOOL_WORKDIR="/app"
ARG TARANTOOL_RUNDIR="/tmp/run"
ARG TARANTOOL_DATADIR="/tmp/data"
ARG TARANTOOL_LOGDIR="/tmp/log"
ARG TARANTOOL_INSTANCES_FILE="./instances.yml"
ARG TARANTOOL_CLUSTER_COOKIE="testapp-cluster-cookie"
ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR
ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR
ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR
ENV TARANTOOL_LOGDIR=$TARANTOOL_LOGDIR
ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE
ENV TARANTOOL_CLUSTER_COOKIE=$TARANTOOL_CLUSTER_COOKIE
WORKDIR $TARANTOOL_WORKDIR
CMD cartridge build && cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR --cfg=$TARANTOOL_INSTANCES_FILE
CMD cartridge build && \
cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR \
--log-dir=$TARANTOOL_LOGDIR --cfg=$TARANTOOL_INSTANCES_FILE
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object TestSpaceMetadata {
"discount" -> TestFieldMetadata("discount", "number", 6),
"favourite_constant" -> TestFieldMetadata("favourite_constant", "double", 7),
"married" -> TestFieldMetadata("married", "boolean", 8),
"updated" -> TestFieldMetadata("updated", "unsigned", 9)
"updated" -> TestFieldMetadata("updated", "datetime", 9)
)

def apply(): TarantoolSpaceMetadata =
Expand All @@ -103,7 +103,7 @@ object TestSpaceWithArrayMetadata {
private val spaceFieldMetadata = Map(
"order_id" -> TestFieldMetadata("order_id", "string", 0),
"order_items" -> TestFieldMetadata("order_items", "array", 1),
"updated" -> TestFieldMetadata("updated", "integer", 2)
"updated" -> TestFieldMetadata("updated", "datetime", 2)
)

def apply(): TarantoolSpaceMetadata =
Expand All @@ -115,7 +115,7 @@ object TestSpaceWithMapMetadata {
private val spaceFieldMetadata = Map(
"id" -> TestFieldMetadata("order_id", "string", 0),
"settings" -> TestFieldMetadata("order_items", "map", 1),
"updated" -> TestFieldMetadata("updated", "integer", 2)
"updated" -> TestFieldMetadata("updated", "datetime", 2)
)

def apply(): TarantoolSpaceMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.slf4j.{Logger, LoggerFactory}
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.{TarantoolCartridgeContainer => JavaTarantoolCartridgeContainer}
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.Container

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -51,8 +52,12 @@ case class TarantoolCartridgeContainer(
def getRouterPort: Int = container.getRouterPort
def getAPIPort: Int = container.getAPIPort

def executeScript(scriptResourcePath: String): CompletableFuture[util.List[_]] =
container.executeScript(scriptResourcePath)
def executeScript(scriptResourcePath: String): Unit = {
val result = container.executeScript(scriptResourcePath)
if (result.getExitCode != 0) {
throw new RuntimeException("Failed to execute script. Details:\n" + result.getStderr)
}
}
}

object TarantoolCartridgeContainer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class TarantoolSparkReadClusterTest extends AnyFunSuite with Matchers with Taran

override protected def beforeEach(): Unit = {
super.beforeEach()
SharedSparkContext.container.executeScript("test_setup.lua").get
SharedSparkContext.container.executeScript("test_setup.lua")
}

override protected def afterEach(): Unit = {
SharedSparkContext.container.executeScript("test_teardown.lua").get
SharedSparkContext.container.executeScript("test_teardown.lua")
super.afterEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara
thrownException.getMessage should include("already exists in Tarantool")

// Clear the data and check that they are written in ErrorIfExists mode
SharedSparkContext.container.executeScript("test_teardown.lua").get()
SharedSparkContext.container.executeScript("test_teardown.lua")

df = SharedSparkContext.spark.createDataFrame(
SharedSparkContext.spark.sparkContext.parallelize(
Expand Down Expand Up @@ -175,7 +175,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara
actual.foreach(item => item.getString("order_type") should endWith("444"))

// Clear the data and check if they are written in Ignore mode
SharedSparkContext.container.executeScript("test_teardown.lua").get()
SharedSparkContext.container.executeScript("test_teardown.lua")

df.write
.format("org.apache.spark.sql.tarantool")
Expand Down
Loading

0 comments on commit d5f45bd

Please sign in to comment.