Skip to content

Commit

Permalink
feat: introduce fetchType for Query and Trigger (#471)
Browse files Browse the repository at this point in the history
close #449
  • Loading branch information
mgabelle authored Dec 23, 2024
1 parent 4347f9b commit 1891c1c
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 22 deletions.
27 changes: 17 additions & 10 deletions src/main/java/io/kestra/plugin/gcp/bigquery/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.cloud.bigquery.*;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand Down Expand Up @@ -127,6 +128,9 @@ public class Query extends AbstractJob implements RunnableTask<Query.Output>, Qu
@Builder.Default
private boolean fetchOne = false;

@Builder.Default
private Property<FetchType> fetchType = Property.of(FetchType.NONE);

// private List<String> positionalParameters;

// private Map<String, String> namedParameters;
Expand Down Expand Up @@ -271,18 +275,21 @@ public Query.Output run(RunContext runContext) throws Exception {
logger.debug("Query loaded in: {}", tableIdentity.getDataset() + "." + tableIdentity.getTable());
}

this.metrics(runContext, queryJobStatistics, queryJob);
FetchType fetchTypeRendered = this.computeFetchType(runContext);

this.metrics(runContext, queryJobStatistics, queryJob, fetchTypeRendered);

Output.OutputBuilder output = Output.builder()
.jobId(queryJob.getJobId().getJob());

if (this.fetch || this.fetchOne || this.store) {

if (!FetchType.NONE.equals(fetchTypeRendered)) {
TableResult result = queryJob.getQueryResults();
String[] tags = this.tags(queryJobStatistics, queryJob);
String[] tags = this.tags(queryJobStatistics, queryJob, fetchTypeRendered);

runContext.metric(Counter.of("total.rows", result.getTotalRows(), tags));

if (this.store) {
if (FetchType.STORE.equals(fetchTypeRendered)) {
Map.Entry<URI, Long> store = this.storeResult(result, runContext);

runContext.metric(Counter.of("fetch.rows", store.getValue(), tags));
Expand All @@ -300,7 +307,7 @@ public Query.Output run(RunContext runContext) throws Exception {
runContext.metric(Counter.of("fetch.rows", fetch.size(), tags));
output.size((long) fetch.size());

if (this.fetch) {
if (FetchType.FETCH.equals(fetchTypeRendered)) {
output.rows(fetch);
} else {
output.row(fetch.size() > 0 ? fetch.get(0) : ImmutableMap.of());
Expand Down Expand Up @@ -453,11 +460,11 @@ public static class Output implements io.kestra.core.models.tasks.Output {
private DestinationTable destinationTable;
}

private String[] tags(JobStatistics.QueryStatistics stats, Job queryJob) {
private String[] tags(JobStatistics.QueryStatistics stats, Job queryJob, FetchType fetchType) {
return new String[]{
"statement_type", stats.getStatementType().name(),
"fetch", this.fetch || this.fetchOne ? "true" : "false",
"store", this.store ? "true" : "false",
"fetch", FetchType.FETCH.equals(fetchType) || FetchType.FETCH_ONE.equals(fetchType) ? "true" : "false",
"store", FetchType.STORE.equals(fetchType) ? "true" : "false",
"project_id", queryJob.getJobId().getProject(),
"location", queryJob.getJobId().getLocation(),
};
Expand Down Expand Up @@ -497,8 +504,8 @@ public String getTable() {
}
}

private void metrics(RunContext runContext, JobStatistics.QueryStatistics stats, Job queryJob) throws IllegalVariableEvaluationException {
String[] tags = this.tags(stats, queryJob);
private void metrics(RunContext runContext, JobStatistics.QueryStatistics stats, Job queryJob, FetchType fetchTypeRendered) throws IllegalVariableEvaluationException {
String[] tags = this.tags(stats, queryJob, fetchTypeRendered);

if (stats.getEstimatedBytesProcessed() != null) {
runContext.metric(Counter.of("estimated.bytes.processed", stats.getEstimatedBytesProcessed(), tags));
Expand Down
32 changes: 29 additions & 3 deletions src/main/java/io/kestra/plugin/gcp/bigquery/QueryInterface.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.kestra.plugin.gcp.bigquery;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import io.kestra.core.models.annotations.PluginProperty;

Expand All @@ -17,20 +20,43 @@ public interface QueryInterface {
Property<Boolean> getLegacySql();

@Schema(
title = "Whether to Fetch the data from the query result to the task output"
title = "Whether to Fetch the data from the query result to the task output. This is deprecated, use fetchType: FETCH instead"
)
@PluginProperty
@Deprecated
boolean isFetch();

@Schema(
title = "Whether to store the data from the query result into an ion serialized data file"
title = "Whether to store the data from the query result into an ion serialized data file. This is deprecated, use fetchType: STORE instead"
)
@PluginProperty
@Deprecated
boolean isStore();

@Schema(
title = "Whether to Fetch only one data row from the query result to the task output"
title = "Whether to Fetch only one data row from the query result to the task output. This is deprecated, use fetchType: FETCH_ONE instead"
)
@PluginProperty
@Deprecated
boolean isFetchOne();

@Schema(
title = "Fetch type",
description = """
The way you want to store data :
- FETCH_ONE - output the first row
- FETCH - output all rows as output variable
- STORE - store all rows to a file
- NONE - do nothing
"""
)
Property<FetchType> getFetchType();

default FetchType computeFetchType(RunContext runContext) throws IllegalVariableEvaluationException {
if (this.isFetch()) return FetchType.FETCH;
if (this.isStore()) return FetchType.STORE;
if (this.isFetchOne()) return FetchType.FETCH_ONE;

return runContext.render(this.getFetchType()).as(FetchType.class).orElseThrow();
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/bigquery/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
@Builder.Default
private boolean fetchOne = false;

@Builder.Default
private Property<FetchType> fetchType = Property.of(FetchType.NONE);

@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
RunContext runContext = conditionContext.getRunContext();
Expand All @@ -91,6 +95,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.legacySql(this.legacySql)
.fetch(this.fetch)
.store(this.store)
.fetchType(this.fetchType)
.fetchOne(this.fetchOne)
.build();
Query.Output run = task.run(runContext);
Expand Down
43 changes: 34 additions & 9 deletions src/test/java/io/kestra/plugin/gcp/bigquery/QueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.io.CharStreams;
import dev.failsafe.FailsafeException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.common.FetchType;
import io.micronaut.context.annotation.Value;
import io.kestra.core.junit.annotations.KestraTest;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -24,8 +25,12 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import jakarta.inject.Inject;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down Expand Up @@ -70,9 +75,17 @@ static String sql() {

}

@Test
private static Stream<Arguments> provideFetchOrFetchType() {
return Stream.of(
Arguments.of(true, null),
Arguments.of(false, Property.of(FetchType.FETCH))
);
}

@ParameterizedTest
@MethodSource("provideFetchOrFetchType")
@SuppressWarnings("unchecked")
void fetch() throws Exception {
void fetch(boolean fetch, Property<FetchType> fetchType) throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of(
"sql", sql(),
"flow", ImmutableMap.of("id", FriendlyId.createFriendlyId(), "namespace", "io.kestra.tests"),
Expand All @@ -83,7 +96,8 @@ void fetch() throws Exception {
Query task = Query.builder()
.sql(new Property<>("{{sql}}"))
.location(Property.of("EU"))
.fetch(true)
.fetch(fetch)
.fetchType(fetchType)
.build();

Query.Output run = task.run(runContext);
Expand Down Expand Up @@ -112,13 +126,22 @@ void fetch() throws Exception {
assertThat(((Map<String, Object>) rows.get(0).get("json")).get("age"), is(30));
}

@Test
void store() throws Exception {
private static Stream<Arguments> provideStoreOrFetchType() {
return Stream.of(
Arguments.of(true, null),
Arguments.of(false, Property.of(FetchType.STORE))
);
}

@ParameterizedTest
@MethodSource("provideStoreOrFetchType")
void store(boolean store, Property<FetchType> fetchType) throws Exception {
Query task = Query.builder()
.id(QueryTest.class.getSimpleName())
.type(Query.class.getName())
.sql(Property.of(sql() + "\n UNION ALL \n " + sql()))
.store(true)
.store(store)
.fetchType(fetchType)
.build();

Query.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));
Expand All @@ -130,13 +153,15 @@ void store() throws Exception {
assertThat(ionResult, containsString("interval:\"1-0 0 0:0:0\""));
}

@Test
void fetchLongPage() throws Exception {
@ParameterizedTest
@MethodSource("provideFetchOrFetchType")
void fetchLongPage(boolean fetch, Property<FetchType> fetchType) throws Exception {
Query task = Query.builder()
.id(QueryTest.class.getSimpleName())
.type(Query.class.getName())
.sql(Property.of("SELECT repository_forks FROM `bigquery-public-data.samples.github_timeline` LIMIT 100000"))
.fetch(true)
.fetch(fetch)
.fetchType(fetchType)
.build();

Query.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));
Expand Down

0 comments on commit 1891c1c

Please sign in to comment.