Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BigQuery] Add extra params to the QueryRequest #2679

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.googlecloud.bigquery.model.QueryRequest.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.googlecloud.bigquery.model.QueryRequest.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.googlecloud.bigquery.model.QueryRequest.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.googlecloud.bigquery.model.QueryRequest.unapply")
kovstas marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ import scala.concurrent.duration.FiniteDuration
* @param timeout specifies the maximum amount of time that the client is willing to wait for the query to complete
* @param dryRun if set to `true` BigQuery doesn't run the job and instead returns statistics about the job such as how many bytes would be processed
* @param useLegacySql specifies whether to use BigQuery's legacy SQL dialect for this query
* @param requestId a unique user provided identifier to ensure idempotent behavior for queries
* @param location the geographic location where the job should run
* @param labels the labels associated with this query
* @param maximumBytesBilled limits the number of bytes billed for this query
* @param requestId a unique user provided identifier to ensure idempotent behavior for queries
*/
final case class QueryRequest private (query: String,
maxResults: Option[Int],
defaultDataset: Option[DatasetReference],
timeout: Option[FiniteDuration],
dryRun: Option[Boolean],
useLegacySql: Option[Boolean],
requestId: Option[String],
location: Option[String],
maximumBytesBilled: Option[Long]) {
labels: Map[String, String],
maximumBytesBilled: Option[Long],
requestId: Option[String]) {

def getQuery = query
def getMaxResults = maxResults.asPrimitive
Expand All @@ -53,6 +55,7 @@ final case class QueryRequest private (query: String,
def getRequestId = requestId.asJava
def getLocation = location.asJava
def getMaximumBytesBilled = maximumBytesBilled.asJava
def getLabels = labels.asJava

def withQuery(query: String) =
copy(query = query)
Expand Down Expand Up @@ -96,10 +99,24 @@ final case class QueryRequest private (query: String,
copy(maximumBytesBilled = maximumBytesBilled)
def withMaximumBytesBilled(maximumBytesBilled: util.OptionalLong) =
copy(maximumBytesBilled = maximumBytesBilled.asScala)

def withLabels(labels: Map[String, String]) =
copy(labels = labels)
def withLabels(labels: util.Map[String, String]) =
copy(labels = labels.asScala.toMap)
}

object QueryRequest {

def apply(query: String,
maxResults: Option[Int],
defaultDataset: Option[DatasetReference],
timeout: Option[FiniteDuration],
dryRun: Option[Boolean],
useLegacySql: Option[Boolean],
requestId: Option[String]): QueryRequest =
QueryRequest(query, maxResults, defaultDataset, timeout, dryRun, useLegacySql, None, Map.empty, None, requestId)

/**
* Java API: QueryRequest model
* @see [[https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#queryrequest BigQuery reference]]
Expand Down Expand Up @@ -127,9 +144,10 @@ object QueryRequest {
timeout.asScala.map(_.asScala),
dryRun.asScala.map(_.booleanValue),
useLegacySql.asScala.map(_.booleanValue),
requestId.asScala,
None,
None
Map.empty,
None,
requestId.asScala
)

implicit val format: RootJsonFormat[QueryRequest] = jsonFormat(
Expand All @@ -142,7 +160,8 @@ object QueryRequest {
"useLegacySql",
"requestId",
"location",
"maximumBytesBilled"
"maximumBytesBilled",
"labels"
kovstas marked this conversation as resolved.
Show resolved Hide resolved
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[scaladsl] trait BigQueryQueries { this: BigQueryRest =>
def query[Out](query: String, dryRun: Boolean = false, useLegacySql: Boolean = true)(
implicit um: FromEntityUnmarshaller[QueryResponse[Out]]
): Source[Out, Future[QueryResponse[Out]]] = {
val request = QueryRequest(query, None, None, None, Some(dryRun), Some(useLegacySql), None, None, None)
val request = QueryRequest(query, None, None, None, Some(dryRun), Some(useLegacySql), None)
this.query(request).mapMaterializedValue(_._2)
}

Expand Down