From 072de827623e7ad588ad8ab839823ed2dd67f24e Mon Sep 17 00:00:00 2001 From: Anastasios Skarlatidis Date: Fri, 8 May 2020 17:14:37 +0300 Subject: [PATCH] Updates dsql - adds supoort for Instant - adds configuration zone-id, in order to correctly convert Instant, Timestam, LocalDateTime and LocalDate to Druid SQL TIMESTAMP (y-MM-dd HH:mm:dd) - adds unit test to verify the creation of parameterized sql query - updates documentation (for zone-id) - moves SQLQuerySpec from package ing.wbaa.druid.sql to ing.wbaa.druid GH-90 --- README.md | 2 ++ docs/sql.md | 22 ++++++++++++++- src/main/resources/reference.conf | 2 ++ .../scala/ing/wbaa/druid/DruidConfig.scala | 28 ++++++++++++++++++- .../scala/ing/wbaa/druid/DruidQuery.scala | 11 +------- src/main/scala/ing/wbaa/druid/SQL.scala | 25 +++++++++++------ .../wbaa/druid/{sql => }/SQLQuerySpec.scala | 27 ++++++++++++------ 7 files changed, 87 insertions(+), 30 deletions(-) rename src/test/scala/ing/wbaa/druid/{sql => }/SQLQuerySpec.scala (70%) diff --git a/README.md b/README.md index 6a88c63..1576d48 100644 --- a/README.md +++ b/README.md @@ -275,6 +275,8 @@ druid = { response-parsing-timeout = 5 seconds response-parsing-timeout = ${?DRUID_RESPONSE_PARSING_TIMEOUT} + + zone-id = "UTC" } ``` diff --git a/docs/sql.md b/docs/sql.md index 56a3e79..94358c4 100644 --- a/docs/sql.md +++ b/docs/sql.md @@ -18,7 +18,7 @@ case class Result(count: Double) val result: Future[Result] = response.map(_.list[Result]) ``` -Function `sql`, allows multiline queries: +Function `dsql`, allows multiline queries: ```scala import ing.wbaa.druid.SQL._ @@ -104,8 +104,28 @@ instances of `LocalDateTime` and appear as types of SQL `TIMESTAMP` with values | Boolean | BOOLEAN | | java.time.LocalDate | DATE | | java.time.LocalDateTime | TIMESTAMP | +| java.time.Instant | TIMESTAMP | | java.sql.Timestamp | TIMESTAMP | +For all temporal types (`LocalDate`, `LocalDateTime`, `Instant` and `Timestamp`) the default zone id is the one that is +defined in the configuration. + +``` +zone-id = "UTC" // when null/not-set, defaults to ZoneId.systemDefault() +``` + +By default `zone-id` parameter is set to `UTC`. The parameter is optional and when is null or not-set, +Scruid defaults to `ZoneId.systemDefault()`. + +Similar to any other configuration parameter of Scruid, `zone-id` can be specified either from `application.conf` or +overridden programmatically: + +```scala + +implicit val druidConf = DruidConfig( + zoneId = ZoneId.of("GMT+2") +) +``` ### Context parameters diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 483a3ae..35f9e29 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -53,6 +53,8 @@ druid = { response-parsing-timeout = 5 seconds response-parsing-timeout = ${?DRUID_RESPONSE_PARSING_TIMEOUT} + + zone-id = "UTC" // when null/not-set, defaults to ZoneId.systemDefault() } diff --git a/src/main/scala/ing/wbaa/druid/DruidConfig.scala b/src/main/scala/ing/wbaa/druid/DruidConfig.scala index dc722e6..43d98fd 100644 --- a/src/main/scala/ing/wbaa/druid/DruidConfig.scala +++ b/src/main/scala/ing/wbaa/druid/DruidConfig.scala @@ -18,6 +18,8 @@ package ing.wbaa.druid import java.net.URI +import java.time.ZoneId +import java.time.format.DateTimeFormatter import akka.actor.ActorSystem import com.typesafe.config.{ Config, ConfigException, ConfigFactory } @@ -40,6 +42,7 @@ class DruidConfig(val hosts: Seq[QueryHost], val clientBackend: Class[_ <: DruidClient], val clientConfig: Config, val scanQueryLegacyMode: Boolean, + val zoneId: ZoneId, val system: ActorSystem) { def copy( hosts: Seq[QueryHost] = this.hosts, @@ -50,7 +53,8 @@ class DruidConfig(val hosts: Seq[QueryHost], responseParsingTimeout: FiniteDuration = this.responseParsingTimeout, clientBackend: Class[_ <: DruidClient] = this.clientBackend, clientConfig: Config = this.clientConfig, - scanQueryLegacyMode: Boolean = this.scanQueryLegacyMode + scanQueryLegacyMode: Boolean = this.scanQueryLegacyMode, + zoneId: ZoneId = this.zoneId ): DruidConfig = new DruidConfig(hosts, secure, @@ -61,6 +65,7 @@ class DruidConfig(val hosts: Seq[QueryHost], clientBackend, clientConfig, scanQueryLegacyMode, + zoneId, system) lazy val client: DruidClient = { @@ -76,11 +81,24 @@ class DruidConfig(val hosts: Seq[QueryHost], clientConstructor(this) } + + lazy val FormatterDate: DateTimeFormatter = DateTimeFormatter + .ofPattern(DruidConfig.PatternDate) + .withZone(zoneId) + + lazy val FormatterDateTime: DateTimeFormatter = DateTimeFormatter + .ofPattern(DruidConfig.PatternDateTime) + .withZone(zoneId) } case class QueryHost(host: String, port: Int) object DruidConfig { + + final val PatternDate = "y-MM-dd" + + final val PatternDateTime = "y-MM-dd HH:mm:ss" + private final val URISchemeSepPattern = "://".r private val config = ConfigFactory.load() @@ -103,6 +121,7 @@ object DruidConfig { Class.forName(druidConfig.getString("client-backend")).asInstanceOf[Class[DruidClient]], clientConfig: Config = druidConfig.getConfig("client-config"), scanQueryLegacyMode: Boolean = druidConfig.getBoolean("scan-query-legacy-mode"), + zoneId: ZoneId = extractZoneIdFromConfig, system: ActorSystem = ActorSystem("scruid-actor-system") ): DruidConfig = new DruidConfig(hosts, @@ -114,8 +133,15 @@ object DruidConfig { clientBackend, clientConfig, scanQueryLegacyMode, + zoneId, system) + private def extractZoneIdFromConfig: ZoneId = + try ZoneId.of(druidConfig.getString("zone-id")) + catch { + case _: com.typesafe.config.ConfigException.Missing => ZoneId.systemDefault() + } + /** * Extract query node hosts with their ports from the specified configuration * diff --git a/src/main/scala/ing/wbaa/druid/DruidQuery.scala b/src/main/scala/ing/wbaa/druid/DruidQuery.scala index c6cdaba..3a71486 100644 --- a/src/main/scala/ing/wbaa/druid/DruidQuery.scala +++ b/src/main/scala/ing/wbaa/druid/DruidQuery.scala @@ -17,8 +17,7 @@ package ing.wbaa.druid -import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter +import java.time.{ ZoneId, ZonedDateTime } import akka.NotUsed import akka.stream.scaladsl.Source @@ -330,14 +329,6 @@ object SQLQueryParameterType extends EnumCodec[SQLQueryParameterType] { case class SQLQueryParameter(`type`: SQLQueryParameterType, value: String) -object SQLQueryParameter { - final val PatternDate = "y-MM-dd" - final val PatternDateTime = "y-MM-dd HH:mm:ss" - - final val FormatterDate = DateTimeFormatter.ofPattern(PatternDate) - final val FormatterDateTime = DateTimeFormatter.ofPattern(PatternDateTime) -} - case class SQLQuery private[druid] (query: String, context: Map[QueryContextParam, QueryContextValue] = Map.empty, parameters: Seq[SQLQueryParameter] = Seq.empty)( diff --git a/src/main/scala/ing/wbaa/druid/SQL.scala b/src/main/scala/ing/wbaa/druid/SQL.scala index 885d8b7..9ce09d1 100644 --- a/src/main/scala/ing/wbaa/druid/SQL.scala +++ b/src/main/scala/ing/wbaa/druid/SQL.scala @@ -1,7 +1,7 @@ package ing.wbaa.druid import java.sql.Timestamp -import java.time.{ LocalDate, LocalDateTime } +import java.time.{ Instant, LocalDate, LocalDateTime } import scala.language.implicitConversions @@ -34,16 +34,23 @@ object SQL { implicit def boolean2Param(v: Boolean): SQLQueryParameter = SQLQueryParameter(SQLQueryParameterType.Boolean, v.toString) - implicit def localDate2Param(v: LocalDate): SQLQueryParameter = - SQLQueryParameter(SQLQueryParameterType.Date, v.format(SQLQueryParameter.FormatterDate)) + implicit def localDate2Param(v: LocalDate)(implicit config: DruidConfig = + DruidConfig.DefaultConfig): SQLQueryParameter = + SQLQueryParameter(SQLQueryParameterType.Date, v.format(config.FormatterDate)) - implicit def localDateTime2Param(v: LocalDateTime): SQLQueryParameter = - SQLQueryParameter(SQLQueryParameterType.Timestamp, - v.format(SQLQueryParameter.FormatterDateTime)) + implicit def localDateTime2Param( + v: LocalDateTime + )(implicit config: DruidConfig = DruidConfig.DefaultConfig): SQLQueryParameter = + SQLQueryParameter(SQLQueryParameterType.Timestamp, v.format(config.FormatterDateTime)) - implicit def timestamp2Param(v: Timestamp): SQLQueryParameter = - SQLQueryParameter(SQLQueryParameterType.Timestamp, - v.formatted(SQLQueryParameter.PatternDateTime)) + implicit def timestamp2Param(v: Timestamp)(implicit config: DruidConfig = + DruidConfig.DefaultConfig): SQLQueryParameter = + SQLQueryParameter(SQLQueryParameterType.Timestamp, config.FormatterDateTime.format(v.toInstant)) + + implicit def instant2Param( + v: Instant + )(implicit config: DruidConfig = DruidConfig.DefaultConfig): SQLQueryParameter = + SQLQueryParameter(SQLQueryParameterType.Timestamp, config.FormatterDateTime.format(v)) implicit class StringToSQL(val sc: StringContext) extends AnyVal { diff --git a/src/test/scala/ing/wbaa/druid/sql/SQLQuerySpec.scala b/src/test/scala/ing/wbaa/druid/SQLQuerySpec.scala similarity index 70% rename from src/test/scala/ing/wbaa/druid/sql/SQLQuerySpec.scala rename to src/test/scala/ing/wbaa/druid/SQLQuerySpec.scala index 24598d0..9fa28c8 100644 --- a/src/test/scala/ing/wbaa/druid/sql/SQLQuerySpec.scala +++ b/src/test/scala/ing/wbaa/druid/SQLQuerySpec.scala @@ -1,16 +1,14 @@ -package ing.wbaa.druid.sql +package ing.wbaa.druid import java.time.{ LocalDateTime, ZonedDateTime } import akka.stream.scaladsl.Sink -import ing.wbaa.druid.{ DruidConfig, SQLQuery } import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.{ Matchers, WordSpec } import ing.wbaa.druid.SQL._ import ing.wbaa.druid.client.CirceDecoders import io.circe.generic.auto._ -import io.circe.syntax._ //noinspection SqlNoDataSourceInspection class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDecoders { @@ -26,7 +24,7 @@ class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDe "SQL query" should { - val query: SQLQuery = dsql""" + val sqlQuery: SQLQuery = dsql""" |SELECT FLOOR(__time to HOUR) AS hourTime, count(*) AS "count" |FROM wikipedia |WHERE "__time" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' @@ -34,14 +32,14 @@ class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDe |""".stripMargin "successfully be interpreted by Druid" in { - val resultsF = query.execute() + val resultsF = sqlQuery.execute() whenReady(resultsF) { response => response.list[Result].map(_.count).sum shouldBe totalNumberOfEntries } } "support streaming" in { - val resultsF = query.streamAs[Result]().runWith(Sink.seq) + val resultsF = sqlQuery.streamAs[Result]().runWith(Sink.seq) whenReady(resultsF) { results => results.map(_.count).sum shouldBe totalNumberOfEntries @@ -55,7 +53,7 @@ class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDe val untilDateTime = fromDateTime.plusDays(1) val countryIsoCode = "US" - val query: SQLQuery = + val sqlQuery: SQLQuery = dsql""" |SELECT FLOOR(__time to HOUR) AS hourTime, count(*) AS "count" |FROM wikipedia @@ -63,15 +61,26 @@ class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDe |GROUP BY 1 |""".stripMargin + "be expressed as a parameterized query with three parameters" in { + sqlQuery.query.count(_ == '?') shouldBe 3 + sqlQuery.parameters.size shouldBe 3 + + sqlQuery.parameters(0) shouldBe SQLQueryParameter(SQLQueryParameterType.Timestamp, + "2015-09-12 00:00:00") + sqlQuery.parameters(1) shouldBe SQLQueryParameter(SQLQueryParameterType.Timestamp, + "2015-09-13 00:00:00") + sqlQuery.parameters(2) shouldBe SQLQueryParameter(SQLQueryParameterType.Varchar, "US") + } + "successfully be interpreted by Druid" in { - val resultsF = query.execute() + val resultsF = sqlQuery.execute() whenReady(resultsF) { response => response.list[Result].map(_.count).sum shouldBe usOnlyNumberOfEntries } } "support streaming" in { - val resultsF = query.streamAs[Result]().runWith(Sink.seq) + val resultsF = sqlQuery.streamAs[Result]().runWith(Sink.seq) whenReady(resultsF) { results => results.map(_.count).sum shouldBe usOnlyNumberOfEntries