-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add unsafe_dynamic_query to gcp_bigquery_select #190
base: main
Are you sure you want to change the base?
Conversation
…erpolation for dynamic queries Signed-off-by: Jem Davies <jemsot@gmail.com>
Signed-off-by: Jem Davies <jemsot@gmail.com>
Signed-off-by: Jem Davies <jemsot@gmail.com>
pipeline: | ||
processors: | ||
- gcp_bigquery_select: | ||
project: ${GGP_PROJECT} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
project: ${GGP_PROJECT} | |
project: ${GCP_PROJECT} |
return | ||
} | ||
} | ||
if inConf.Contains("columns_mapping") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably need to add a linter such that only 1 columns / columns_mapping field is provided
Description("A list of columns to query."). | ||
Optional()). | ||
Field(service.NewBloblangField("columns_mapping"). | ||
Description("An optional [Bloblang mapping](/docs/guides/bloblang/about) which should evaluate to an array of column names to query."). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add better description - think about how we need to set unsafe_dynamic_query
for this field to be evaluated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just have a single column
field that either supports plain string or bloblang interpolation?
` | ||
An example to show the use of the unsafe_dynamic_query field:`, | ||
` | ||
# {"table": "test.people", "where": "city IN (?,?,?)", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# {"table": "test.people", "where": "city IN (?,?,?)", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]} | |
# {"table": "test.people", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one here. Some questions/feedback.
Also, can we write a test for this? Just a unit test with this new interpolation functionality would be fine.
Stretch goal though: am thinking it could be nice to add an integration test using that bigquery emulator. Check our the setupBigQueryEmulator
function and the TestGCPBigQueryStorageOutputWriteOK
test for reference.
Description("A list of columns to query."). | ||
Optional()). | ||
Field(service.NewBloblangField("columns_mapping"). | ||
Description("An optional [Bloblang mapping](/docs/guides/bloblang/about) which should evaluate to an array of column names to query."). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just have a single column
field that either supports plain string or bloblang interpolation?
Field(service.NewBoolField("unsafe_dynamic_query"). | ||
Description("Whether to enable [interpolation functions](/docs/configuration/interpolation/#bloblang-queries) in the columns & where fields. Great care should be made to ensure your queries are defended against injection attacks."). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add an experimental note here or some admonition that specifies this approach will likely be re-worked in future releases.
|
||
proc.config.queryParts.columns, err = toStringSlice(cols) | ||
if err != nil { | ||
msg.SetError(fmt.Errorf("%w", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either wrap this with an error message or just pass the err
} | ||
|
||
for i, msg := range batch { | ||
outBatch = append(outBatch, msg) | ||
|
||
if proc.config.unsafeDyn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we consider putting this into its own function? that way instead of this msg.SetError
we can just return an err
and set it once.
Thinking something like:
if proc.config.unsafeDyn {
result, err := proc.executeDynamicQuery(msg)
if err != nil {
msg.SetError(err)
}
}
Will enable things like:
So that there is greater flexibility with this processor.