Skip to content

Commit

Permalink
Updates dsql
Browse files Browse the repository at this point in the history
  - adds supoort for Instant
  - adds configuration zone-id, in order to correctly convert Instant, Timestam, LocalDateTime and LocalDate to Druid SQL TIMESTAMP (y-MM-dd HH:mm:dd)
  - adds unit test to verify the creation of parameterized sql query
  - updates documentation (for zone-id)
  - moves SQLQuerySpec from package ing.wbaa.druid.sql to ing.wbaa.druid

ing-bankGH-90
  • Loading branch information
anskarl committed May 8, 2020
1 parent 8c64cb1 commit 072de82
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 30 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ druid = {
response-parsing-timeout = 5 seconds
response-parsing-timeout = ${?DRUID_RESPONSE_PARSING_TIMEOUT}
zone-id = "UTC"
}
```

Expand Down
22 changes: 21 additions & 1 deletion docs/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ case class Result(count: Double)
val result: Future[Result] = response.map(_.list[Result])
```

Function `sql`, allows multiline queries:
Function `dsql`, allows multiline queries:

```scala
import ing.wbaa.druid.SQL._
Expand Down Expand Up @@ -104,8 +104,28 @@ instances of `LocalDateTime` and appear as types of SQL `TIMESTAMP` with values
| Boolean | BOOLEAN |
| java.time.LocalDate | DATE |
| java.time.LocalDateTime | TIMESTAMP |
| java.time.Instant | TIMESTAMP |
| java.sql.Timestamp | TIMESTAMP |

For all temporal types (`LocalDate`, `LocalDateTime`, `Instant` and `Timestamp`) the default zone id is the one that is
defined in the configuration.

```
zone-id = "UTC" // when null/not-set, defaults to ZoneId.systemDefault()
```

By default `zone-id` parameter is set to `UTC`. The parameter is optional and when is null or not-set,
Scruid defaults to `ZoneId.systemDefault()`.

Similar to any other configuration parameter of Scruid, `zone-id` can be specified either from `application.conf` or
overridden programmatically:

```scala

implicit val druidConf = DruidConfig(
zoneId = ZoneId.of("GMT+2")
)
```

### Context parameters

Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ druid = {

response-parsing-timeout = 5 seconds
response-parsing-timeout = ${?DRUID_RESPONSE_PARSING_TIMEOUT}

zone-id = "UTC" // when null/not-set, defaults to ZoneId.systemDefault()
}


Expand Down
28 changes: 27 additions & 1 deletion src/main/scala/ing/wbaa/druid/DruidConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package ing.wbaa.druid

import java.net.URI
import java.time.ZoneId
import java.time.format.DateTimeFormatter

import akka.actor.ActorSystem
import com.typesafe.config.{ Config, ConfigException, ConfigFactory }
Expand All @@ -40,6 +42,7 @@ class DruidConfig(val hosts: Seq[QueryHost],
val clientBackend: Class[_ <: DruidClient],
val clientConfig: Config,
val scanQueryLegacyMode: Boolean,
val zoneId: ZoneId,
val system: ActorSystem) {
def copy(
hosts: Seq[QueryHost] = this.hosts,
Expand All @@ -50,7 +53,8 @@ class DruidConfig(val hosts: Seq[QueryHost],
responseParsingTimeout: FiniteDuration = this.responseParsingTimeout,
clientBackend: Class[_ <: DruidClient] = this.clientBackend,
clientConfig: Config = this.clientConfig,
scanQueryLegacyMode: Boolean = this.scanQueryLegacyMode
scanQueryLegacyMode: Boolean = this.scanQueryLegacyMode,
zoneId: ZoneId = this.zoneId
): DruidConfig =
new DruidConfig(hosts,
secure,
Expand All @@ -61,6 +65,7 @@ class DruidConfig(val hosts: Seq[QueryHost],
clientBackend,
clientConfig,
scanQueryLegacyMode,
zoneId,
system)

lazy val client: DruidClient = {
Expand All @@ -76,11 +81,24 @@ class DruidConfig(val hosts: Seq[QueryHost],

clientConstructor(this)
}

lazy val FormatterDate: DateTimeFormatter = DateTimeFormatter
.ofPattern(DruidConfig.PatternDate)
.withZone(zoneId)

lazy val FormatterDateTime: DateTimeFormatter = DateTimeFormatter
.ofPattern(DruidConfig.PatternDateTime)
.withZone(zoneId)
}

case class QueryHost(host: String, port: Int)

object DruidConfig {

final val PatternDate = "y-MM-dd"

final val PatternDateTime = "y-MM-dd HH:mm:ss"

private final val URISchemeSepPattern = "://".r

private val config = ConfigFactory.load()
Expand All @@ -103,6 +121,7 @@ object DruidConfig {
Class.forName(druidConfig.getString("client-backend")).asInstanceOf[Class[DruidClient]],
clientConfig: Config = druidConfig.getConfig("client-config"),
scanQueryLegacyMode: Boolean = druidConfig.getBoolean("scan-query-legacy-mode"),
zoneId: ZoneId = extractZoneIdFromConfig,
system: ActorSystem = ActorSystem("scruid-actor-system")
): DruidConfig =
new DruidConfig(hosts,
Expand All @@ -114,8 +133,15 @@ object DruidConfig {
clientBackend,
clientConfig,
scanQueryLegacyMode,
zoneId,
system)

private def extractZoneIdFromConfig: ZoneId =
try ZoneId.of(druidConfig.getString("zone-id"))
catch {
case _: com.typesafe.config.ConfigException.Missing => ZoneId.systemDefault()
}

/**
* Extract query node hosts with their ports from the specified configuration
*
Expand Down
11 changes: 1 addition & 10 deletions src/main/scala/ing/wbaa/druid/DruidQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package ing.wbaa.druid

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.{ ZoneId, ZonedDateTime }

import akka.NotUsed
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -330,14 +329,6 @@ object SQLQueryParameterType extends EnumCodec[SQLQueryParameterType] {

case class SQLQueryParameter(`type`: SQLQueryParameterType, value: String)

object SQLQueryParameter {
final val PatternDate = "y-MM-dd"
final val PatternDateTime = "y-MM-dd HH:mm:ss"

final val FormatterDate = DateTimeFormatter.ofPattern(PatternDate)
final val FormatterDateTime = DateTimeFormatter.ofPattern(PatternDateTime)
}

case class SQLQuery private[druid] (query: String,
context: Map[QueryContextParam, QueryContextValue] = Map.empty,
parameters: Seq[SQLQueryParameter] = Seq.empty)(
Expand Down
25 changes: 16 additions & 9 deletions src/main/scala/ing/wbaa/druid/SQL.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ing.wbaa.druid

import java.sql.Timestamp
import java.time.{ LocalDate, LocalDateTime }
import java.time.{ Instant, LocalDate, LocalDateTime }

import scala.language.implicitConversions

Expand Down Expand Up @@ -34,16 +34,23 @@ object SQL {
implicit def boolean2Param(v: Boolean): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Boolean, v.toString)

implicit def localDate2Param(v: LocalDate): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Date, v.format(SQLQueryParameter.FormatterDate))
implicit def localDate2Param(v: LocalDate)(implicit config: DruidConfig =
DruidConfig.DefaultConfig): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Date, v.format(config.FormatterDate))

implicit def localDateTime2Param(v: LocalDateTime): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Timestamp,
v.format(SQLQueryParameter.FormatterDateTime))
implicit def localDateTime2Param(
v: LocalDateTime
)(implicit config: DruidConfig = DruidConfig.DefaultConfig): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Timestamp, v.format(config.FormatterDateTime))

implicit def timestamp2Param(v: Timestamp): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Timestamp,
v.formatted(SQLQueryParameter.PatternDateTime))
implicit def timestamp2Param(v: Timestamp)(implicit config: DruidConfig =
DruidConfig.DefaultConfig): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Timestamp, config.FormatterDateTime.format(v.toInstant))

implicit def instant2Param(
v: Instant
)(implicit config: DruidConfig = DruidConfig.DefaultConfig): SQLQueryParameter =
SQLQueryParameter(SQLQueryParameterType.Timestamp, config.FormatterDateTime.format(v))

implicit class StringToSQL(val sc: StringContext) extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package ing.wbaa.druid.sql
package ing.wbaa.druid

import java.time.{ LocalDateTime, ZonedDateTime }

import akka.stream.scaladsl.Sink
import ing.wbaa.druid.{ DruidConfig, SQLQuery }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ Matchers, WordSpec }
import ing.wbaa.druid.SQL._
import ing.wbaa.druid.client.CirceDecoders
import io.circe.generic.auto._
import io.circe.syntax._

//noinspection SqlNoDataSourceInspection
class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDecoders {
Expand All @@ -26,22 +24,22 @@ class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDe

"SQL query" should {

val query: SQLQuery = dsql"""
val sqlQuery: SQLQuery = dsql"""
|SELECT FLOOR(__time to HOUR) AS hourTime, count(*) AS "count"
|FROM wikipedia
|WHERE "__time" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00'
|GROUP BY 1
|""".stripMargin

"successfully be interpreted by Druid" in {
val resultsF = query.execute()
val resultsF = sqlQuery.execute()
whenReady(resultsF) { response =>
response.list[Result].map(_.count).sum shouldBe totalNumberOfEntries
}
}

"support streaming" in {
val resultsF = query.streamAs[Result]().runWith(Sink.seq)
val resultsF = sqlQuery.streamAs[Result]().runWith(Sink.seq)

whenReady(resultsF) { results =>
results.map(_.count).sum shouldBe totalNumberOfEntries
Expand All @@ -55,23 +53,34 @@ class SQLQuerySpec extends WordSpec with Matchers with ScalaFutures with CirceDe
val untilDateTime = fromDateTime.plusDays(1)
val countryIsoCode = "US"

val query: SQLQuery =
val sqlQuery: SQLQuery =
dsql"""
|SELECT FLOOR(__time to HOUR) AS hourTime, count(*) AS "count"
|FROM wikipedia
|WHERE "__time" BETWEEN ${fromDateTime} AND ${untilDateTime} AND countryIsoCode = ${countryIsoCode}
|GROUP BY 1
|""".stripMargin

"be expressed as a parameterized query with three parameters" in {
sqlQuery.query.count(_ == '?') shouldBe 3
sqlQuery.parameters.size shouldBe 3

sqlQuery.parameters(0) shouldBe SQLQueryParameter(SQLQueryParameterType.Timestamp,
"2015-09-12 00:00:00")
sqlQuery.parameters(1) shouldBe SQLQueryParameter(SQLQueryParameterType.Timestamp,
"2015-09-13 00:00:00")
sqlQuery.parameters(2) shouldBe SQLQueryParameter(SQLQueryParameterType.Varchar, "US")
}

"successfully be interpreted by Druid" in {
val resultsF = query.execute()
val resultsF = sqlQuery.execute()
whenReady(resultsF) { response =>
response.list[Result].map(_.count).sum shouldBe usOnlyNumberOfEntries
}
}

"support streaming" in {
val resultsF = query.streamAs[Result]().runWith(Sink.seq)
val resultsF = sqlQuery.streamAs[Result]().runWith(Sink.seq)

whenReady(resultsF) { results =>
results.map(_.count).sum shouldBe usOnlyNumberOfEntries
Expand Down

0 comments on commit 072de82

Please sign in to comment.