Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add unsafe_dynamic_query to gcp_bigquery_select #190

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions internal/impl/gcp/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Masterminds/squirrel"
"go.uber.org/multierr"

"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"
)

Expand Down Expand Up @@ -64,11 +65,14 @@ func (client *wrappedBQClient) Close() error {
}

type bqQueryParts struct {
table string
columns []string
where string
prefix string
suffix string
table string
tableDyn *service.InterpolatedString
columns []string
columnsMapping *bloblang.Executor
where string
whereDyn *service.InterpolatedString
prefix string
suffix string
}

type bqQueryBuilderOptions struct {
Expand Down
136 changes: 121 additions & 15 deletions internal/impl/gcp/processor_bigquery_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type bigQuerySelectProcessorConfig struct {
queryParts *bqQueryParts
jobLabels map[string]string
argsMapping *bloblang.Executor
unsafeDyn bool
}

func bigQuerySelectProcessorConfigFromParsed(inConf *service.ParsedConfig) (conf bigQuerySelectProcessorConfig, err error) {
Expand All @@ -30,6 +31,11 @@ func bigQuerySelectProcessorConfigFromParsed(inConf *service.ParsedConfig) (conf
return
}

conf.unsafeDyn, err = inConf.FieldBool("unsafe_dynamic_query")
if err != nil {
return
}

if inConf.Contains("args_mapping") {
if conf.argsMapping, err = inConf.FieldBloblang("args_mapping"); err != nil {
return
Expand All @@ -40,16 +46,33 @@ func bigQuerySelectProcessorConfigFromParsed(inConf *service.ParsedConfig) (conf
return
}

if queryParts.table, err = inConf.FieldString("table"); err != nil {
return
}

if queryParts.columns, err = inConf.FieldStringList("columns"); err != nil {
return
if conf.unsafeDyn {
if queryParts.tableDyn, err = inConf.FieldInterpolatedString("table"); err != nil {
return
}
if inConf.Contains("where") {
if queryParts.whereDyn, err = inConf.FieldInterpolatedString("where"); err != nil {
return
}
}
if inConf.Contains("columns_mapping") {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably need to add a linter such that only 1 columns / columns_mapping field is provided

if conf.queryParts.columnsMapping, err = inConf.FieldBloblang("columns_mapping"); err != nil {
return
}
}
} else {
if queryParts.table, err = inConf.FieldString("table"); err != nil {
return
}
if inConf.Contains("where") {
if queryParts.where, err = inConf.FieldString("where"); err != nil {
return
}
}
}

if inConf.Contains("where") {
if queryParts.where, err = inConf.FieldString("where"); err != nil {
if inConf.Contains("columns") {
if queryParts.columns, err = inConf.FieldStringList("columns"); err != nil {
return
}
}
Expand Down Expand Up @@ -77,8 +100,15 @@ func newBigQuerySelectProcessorConfig() *service.ConfigSpec {
Categories("Integration").
Summary("Executes a `SELECT` query against BigQuery and replaces messages with the rows returned.").
Field(service.NewStringField("project").Description("GCP project where the query job will execute.")).
Field(service.NewStringField("table").Description("Fully-qualified BigQuery table name to query.").Example("bigquery-public-data.samples.shakespeare")).
Field(service.NewStringListField("columns").Description("A list of columns to query.")).
Field(service.NewInterpolatedStringField("table").Description("Fully-qualified BigQuery table name to query.").Example("bigquery-public-data.samples.shakespeare")).
Field(service.NewStringListField("columns").
Description("A list of columns to query.").
Optional()).
Field(service.NewBloblangField("columns_mapping").
Description("An optional [Bloblang mapping](/docs/guides/bloblang/about) which should evaluate to an array of column names to query.").
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add better description - think about how we need to set unsafe_dynamic_query for this field to be evaluated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just have a single column field that either supports plain string or bloblang interpolation?

Optional().
Version("1.5.0").
Advanced()).
Field(service.NewStringField("where").
Description("An optional where clause to add. Placeholder arguments are populated with the `args_mapping` field. Placeholders should always be question marks (`?`).").
Example("type = ? and created_at > ?").
Expand All @@ -96,6 +126,12 @@ func newBigQuerySelectProcessorConfig() *service.ConfigSpec {
Field(service.NewStringField("suffix").
Description("An optional suffix to append to the select query.").
Optional()).
Field(service.NewBoolField("unsafe_dynamic_query").
Description("Whether to enable [interpolation functions](/docs/configuration/interpolation/#bloblang-queries) in the columns & where fields. Great care should be made to ensure your queries are defended against injection attacks.").
Comment on lines +129 to +130
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add an experimental note here or some admonition that specifies this approach will likely be re-worked in future releases.

Advanced().
Default(false).
Version("1.5.0").
Optional()).
Example("Word count",
`
Given a stream of English terms, enrich the messages with the word count from Shakespeare's public works:`,
Expand All @@ -118,6 +154,22 @@ pipeline:
args_mapping: root = [ this.term ]
result_map: |
root.count = this.get("0.total_count")
`,
).
Example("Unsafe Dynamic Query",
`
An example to show the use of the unsafe_dynamic_query field:`,
`
# {"table": "test.people", "where": "city IN (?,?,?)", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# {"table": "test.people", "where": "city IN (?,?,?)", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]}
# {"table": "test.people", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]}

pipeline:
processors:
- gcp_bigquery_select:
project: ${GGP_PROJECT}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
project: ${GGP_PROJECT}
project: ${GCP_PROJECT}

table: ${! this.table } # test.people
columns_mapping: root = this.columns #["name", "age", "city"]
where: ${! "city IN ("+this.args.join(",").re_replace_all("\\b\\w+\\b","?")+")" } # city IN (?,?,?)
args_mapping: root = this.args # ["London", "Paris", "Dublin"]
unsafe_dynamic_query: true
`,
)
}
Expand Down Expand Up @@ -165,34 +217,77 @@ func newBigQuerySelectProcessor(inConf *service.ParsedConfig, options *bigQueryP

func (proc *bigQuerySelectProcessor) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) {
argsMapping := proc.config.argsMapping
columnsMapping := proc.config.queryParts.columnsMapping

outBatch := make(service.MessageBatch, 0, len(batch))

var executor *service.MessageBatchBloblangExecutor
var argsMappingExecutor *service.MessageBatchBloblangExecutor
if argsMapping != nil {
executor = batch.BloblangExecutor(argsMapping)
argsMappingExecutor = batch.BloblangExecutor(argsMapping)
}

var columnsMappingExecutor *service.MessageBatchBloblangExecutor
if columnsMapping != nil {
columnsMappingExecutor = batch.BloblangExecutor(columnsMapping)
}

for i, msg := range batch {
outBatch = append(outBatch, msg)

if proc.config.unsafeDyn {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider putting this into its own function? that way instead of this msg.SetError we can just return an err and set it once.

Thinking something like:

if proc.config.unsafeDyn {
  result, err := proc.executeDynamicQuery(msg)
  if err != nil {
    msg.SetError(err)
  }
}

var err error
proc.config.queryParts.table, err = proc.config.queryParts.tableDyn.TryString(msg)
if err != nil {
msg.SetError(fmt.Errorf("failed to resolve table mapping: %w", err))
continue
}
proc.config.queryParts.where, err = proc.config.queryParts.whereDyn.TryString(msg)
if err != nil {
msg.SetError(fmt.Errorf("failed to resolve where mapping: %w", err))
continue
}

resMsg, err := columnsMappingExecutor.Query(i)
if err != nil {
msg.SetError(fmt.Errorf("failed to resolve columns mapping: %w", err))
continue
}

icols, err := resMsg.AsStructured()
if err != nil {
msg.SetError(fmt.Errorf("mapping returned non-structured result: %w", err))
continue
}
cols, ok := icols.([]any)
if !ok {
msg.SetError(fmt.Errorf("col mapping returned non-array result: %T", icols))
continue
}

proc.config.queryParts.columns, err = toStringSlice(cols)
if err != nil {
msg.SetError(fmt.Errorf("%w", err))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either wrap this with an error message or just pass the err

continue
}
}

var args []any
if argsMapping != nil {
resMsg, err := executor.Query(i)
resMsg, err := argsMappingExecutor.Query(i)
if err != nil {
msg.SetError(fmt.Errorf("failed to resolve args mapping: %w", err))
continue
}

iargs, err := resMsg.AsStructured()
if err != nil {
msg.SetError(fmt.Errorf("mapping returned non-structured result: %w", err))
msg.SetError(fmt.Errorf("args mapping returned non-structured result: %w", err))
continue
}

var ok bool
if args, ok = iargs.([]any); !ok {
msg.SetError(fmt.Errorf("mapping returned non-array result: %T", iargs))
msg.SetError(fmt.Errorf("args mapping returned non-array result: %T", iargs))
continue
}
}
Expand Down Expand Up @@ -261,3 +356,14 @@ func init() {
panic(err)
}
}

func toStringSlice(in []any) (out []string, err error) {
if in == nil {
return nil, errors.New("column mapping returned nil")
}
out = make([]string, len(in))
for i, v := range in {
out[i] = fmt.Sprint(v)
}
return out, nil
}
72 changes: 70 additions & 2 deletions website/docs/components/processors/gcp_bigquery_select.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,55 @@ Executes a `SELECT` query against BigQuery and replaces messages with the rows r

Introduced in version 1.0.0.


<Tabs defaultValue="common" values={[
{ label: 'Common', value: 'common', },
{ label: 'Advanced', value: 'advanced', },
]}>

<TabItem value="common">

```yml
# Common config fields, showing default values
label: ""
gcp_bigquery_select:
project: "" # No default (required)
table: bigquery-public-data.samples.shakespeare # No default (required)
columns: [] # No default (optional)
where: type = ? and created_at > ? # No default (optional)
job_labels: {}
args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
prefix: "" # No default (optional)
suffix: "" # No default (optional)
```

</TabItem>
<TabItem value="advanced">

```yml
# Config fields, showing default values
# All config fields, showing default values
label: ""
gcp_bigquery_select:
project: "" # No default (required)
table: bigquery-public-data.samples.shakespeare # No default (required)
columns: [] # No default (required)
columns: [] # No default (optional)
columns_mapping: "" # No default (optional)
where: type = ? and created_at > ? # No default (optional)
job_labels: {}
args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
prefix: "" # No default (optional)
suffix: "" # No default (optional)
unsafe_dynamic_query: false
```

</TabItem>
</Tabs>

## Examples

<Tabs defaultValue="Word count" values={[
{ label: 'Word count', value: 'Word count', },
{ label: 'Unsafe Dynamic Query', value: 'Unsafe Dynamic Query', },
]}>

<TabItem value="Word count">
Expand Down Expand Up @@ -68,6 +99,25 @@ pipeline:
root.count = this.get("0.total_count")
```

</TabItem>
<TabItem value="Unsafe Dynamic Query">


An example to show the use of the unsafe_dynamic_query field:

```yaml
# {"table": "test.people", "where": "city IN (?,?,?)", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]}
pipeline:
processors:
- gcp_bigquery_select:
project: ${GGP_PROJECT}
table: ${! this.table } # test.people
columns_mapping: root = this.columns #["name", "age", "city"]
where: ${! "city IN ("+this.args.join(",").re_replace_all("\\b\\w+\\b","?")+")" } # city IN (?,?,?)
args_mapping: root = this.args # ["London", "Paris", "Dublin"]
unsafe_dynamic_query: true
```

</TabItem>
</Tabs>

Expand All @@ -83,6 +133,7 @@ Type: `string`
### `table`

Fully-qualified BigQuery table name to query.
This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries).


Type: `string`
Expand All @@ -100,6 +151,14 @@ A list of columns to query.

Type: `array`

### `columns_mapping`

An optional [Bloblang mapping](/docs/guides/bloblang/about) which should evaluate to an array of column names to query.


Type: `string`
Requires version 1.5.0 or newer

### `where`

An optional where clause to add. Placeholder arguments are populated with the `args_mapping` field. Placeholders should always be question marks (`?`).
Expand Down Expand Up @@ -150,4 +209,13 @@ An optional suffix to append to the select query.

Type: `string`

### `unsafe_dynamic_query`

Whether to enable [interpolation functions](/docs/configuration/interpolation/#bloblang-queries) in the columns & where fields. Great care should be made to ensure your queries are defended against injection attacks.


Type: `bool`
Default: `false`
Requires version 1.5.0 or newer


Loading