Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add link to scheduled pipeline #7536

Closed
wants to merge 11 commits into from
26 changes: 24 additions & 2 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -858,13 +858,19 @@ To allow scheduled queries, add the following to your `config.py`:
},
'start_date': {
'type': 'string',
'format': 'date-time',
'title': 'Start date',
# date-time is parsed using the chrono library, see
# https://www.npmjs.com/package/chrono-node#usage
'format': 'date-time',
'default': 'tomorrow at 9am',
},
'end_date': {
'type': 'string',
'format': 'date-time',
'title': 'End date',
# date-time is parsed using the chrono library, see
# https://www.npmjs.com/package/chrono-node#usage
'format': 'date-time',
'default': '9am in 30 days',
},
'schedule_interval': {
'type': 'string',
Expand All @@ -890,6 +896,22 @@ To allow scheduled queries, add the following to your `config.py`:
),
},
},
'VALIDATION': [
# ensure that start_date <= end_date
{
'name': 'less_equal',
'arguments': ['start_date', 'end_date'],
'message': 'End date cannot be before start date',
# this is where the error message is shown
'container': 'end_date',
},
],
# link to the scheduler; this example links to an Airflow pipeline
# that uses the query id and the output table as its name
'linkback': (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not a circular dependency? Superset should not know anything about the scheduler (eg Airflow), should it? The scheduler knows about superset, and grabs work from a known endpoint, and neither the user nor superset system itself should actually care who is doing that work.

I think we should consider letting any arbitrary scheduler PUT back information (like a URL) about how to view it pipelines or whatever representation it uses for the work it is doing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it establishes a bi-directional connection, but Superset still doesn't know anything about Airflow with this (it's just an example config). The user is simply saying "when you show the scheduled information, put a link to this URL", and Superset does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it does require the running instance of superset to have internal airflow details (via its configuration). This has a "correctness" problem IMO which could manifest as actual issues. It requires the configurator of superset to know (at deployment time?) who will be servicing these and what their URLs look like.

With this approach it is coupled such that it prevents the possibility of multiple systems being able to service these scheduled queries, or if the owners of those services decide to migrate them to an new system it will break the feature in superset. Imagine if the load was migrated partially to another internal system like flyte for example, this would unnecessarily cause us to have to do significant eng work to accomodate that (if it even can be accomodated at all), whereas if the servicer itself PUTs the URL to superset, we don't have any concerns or opinions about that at all, it will just work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it does require the running instance of superset to have internal airflow details (via its configuration). This has a "correctness" problem IMO which could manifest as actual issues. It requires the configurator of superset to know (at deployment time?) who will be servicing these and what their URLs look like.

The SCHEDULED_QUERIES feature flag config is a way of informing Superset of the internals of a scheduler: it basically tells what information is needed for a given scheduler. I don't see how the linkback is different from the information stored in the configuration, since the configuration is already scheduler-specific.

With this approach it is coupled such that it prevents the possibility of multiple systems being able to service these scheduled queries, or if the owners of those services decide to migrate them to an new system it will break the feature in superset. Imagine if the load was migrated partially to another internal system like flyte for example, this would unnecessarily cause us to have to do significant eng work to accomodate that (if it even can be accomodated at all), whereas if the servicer itself PUTs the URL to superset, we don't have any concerns or opinions about that at all, it will just work.

Migrating to a new scheduler would most probably require updating the extra_json in all the existing queries, in addition to updating the SCHEDULE_QUERIES config, so the significant engineering work would be already expected.

And while I agree that that having the consumers PUTting the URL would be nice because it could support multiple schedulers (and we get the information from the system that knows more about it) I don't think it's a likely scenario to happen in practice.

I'm also worried about PUTting the URL because in order for the consumer to update the scheduled query with the pipeline URL it needs it to impersonate the user, opening a backdoor for running arbitrary queries in the user's name. And technically it could also result in race conditions, but I think that's an unlikely scenario.

'https://airflow.example.com/admin/airflow/tree?'
'dag_id=query_${id}_${extra_json.schedule_info.output_table}'
),
},
}

Expand Down
4 changes: 3 additions & 1 deletion superset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ def is_feature_enabled(feature):
if conf.get('ENABLE_FLASK_COMPRESS'):
Compress(app)

Talisman(app, content_security_policy=None)
if app.config['TALISMAN_ENABLED']:
talisman_config = app.config.get('TALISMAN_CONFIG')
Talisman(app, **talisman_config)

# Hook that provides administrators a handle on the Flask APP
# after initialization
Expand Down
15 changes: 15 additions & 0 deletions superset/assets/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions superset/assets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"bootstrap": "^3.3.6",
"bootstrap-slider": "^10.0.0",
"brace": "^0.11.1",
"chrono-node": "^1.3.11",
"classnames": "^2.2.5",
"d3-array": "^1.2.4",
"d3-color": "^1.2.0",
Expand Down
65 changes: 65 additions & 0 deletions superset/assets/spec/javascripts/showSavedQuery/utils_spec.jsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { getNestedValue, interpolate } from '../../../src/showSavedQuery/utils';

describe('getNestedValue', () => {
it('is a function', () => {
expect(typeof getNestedValue).toBe('function');
});

it('works with simple ids', () => {
const obj = { a: '1' };
const id = 'a';
expect(getNestedValue(obj, id)).toEqual('1');
});

it('works with complex ids', () => {
const obj = { a: { b: '1' } };
const id = 'a.b';
expect(getNestedValue(obj, id)).toEqual('1');
});

it('works with other separators', () => {
const obj = { a: { b: { c: '1' } } };
const id = 'a__b__c';
const separator = '__';
expect(getNestedValue(obj, id, separator)).toEqual('1');
});
});


describe('interpolate', () => {
it('is a function', () => {
expect(typeof interpolate).toBe('function');
});

it('works with simple ids', () => {
const obj = { a: '1' };
// eslint-disable-next-line no-template-curly-in-string
const str = 'value: ${a}';
expect(interpolate(str, obj)).toEqual('value: 1');
});

it('works with complex ids', () => {
const obj = { a: { b: '1' } };
// eslint-disable-next-line no-template-curly-in-string
const str = 'value: ${a.b}';
expect(interpolate(str, obj)).toEqual('value: 1');
});
});
54 changes: 54 additions & 0 deletions superset/assets/src/SqlLab/actions/sqlLab.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ export const CLEAR_QUERY_RESULTS = 'CLEAR_QUERY_RESULTS';
export const REMOVE_DATA_PREVIEW = 'REMOVE_DATA_PREVIEW';
export const CHANGE_DATA_PREVIEW_ID = 'CHANGE_DATA_PREVIEW_ID';

export const START_QUERY_VALIDATION = 'START_QUERY_VALIDATION';
export const QUERY_VALIDATION_RETURNED = 'QUERY_VALIDATION_RETURNED';
export const QUERY_VALIDATION_FAILED = 'QUERY_VALIDATION_FAILED';

export const CREATE_DATASOURCE_STARTED = 'CREATE_DATASOURCE_STARTED';
export const CREATE_DATASOURCE_SUCCESS = 'CREATE_DATASOURCE_SUCCESS';
export const CREATE_DATASOURCE_FAILED = 'CREATE_DATASOURCE_FAILED';
Expand All @@ -77,6 +81,21 @@ export function resetState() {
return { type: RESET_STATE };
}

export function startQueryValidation(query) {
Object.assign(query, {
id: query.id ? query.id : shortid.generate(),
});
return { type: START_QUERY_VALIDATION, query };
}

export function queryValidationReturned(query, results) {
return { type: QUERY_VALIDATION_RETURNED, query, results };
}

export function queryValidationFailed(query, message, error) {
return { type: QUERY_VALIDATION_FAILED, query, message, error };
}

export function saveQuery(query) {
return dispatch =>
SupersetClient.post({
Expand Down Expand Up @@ -187,6 +206,41 @@ export function runQuery(query) {
};
}

export function validateQuery(query) {
return function (dispatch) {
dispatch(startQueryValidation(query));

const postPayload = {
client_id: query.id,
database_id: query.dbId,
json: true,
schema: query.schema,
sql: query.sql,
sql_editor_id: query.sqlEditorId,
templateParams: query.templateParams,
validate_only: true,
};

return SupersetClient.post({
endpoint: `/superset/validate_sql_json/${window.location.search}`,
postPayload,
stringify: false,
})
.then(({ json }) => {
dispatch(queryValidationReturned(query, json));
})
.catch(response =>
getClientErrorObject(response).then((error) => {
let message = error.error || error.statusText || t('Unknown error');
if (message.includes('CSRF token')) {
message = t(COMMON_ERR_MESSAGES.SESSION_TIMED_OUT);
}
dispatch(queryValidationFailed(query, message, error));
}),
);
};
}

export function postStopQuery(query) {
return function (dispatch) {
return SupersetClient.post({
Expand Down
15 changes: 15 additions & 0 deletions superset/assets/src/SqlLab/components/AceEditorWrapper.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ class AceEditorWrapper extends React.PureComponent {
}
});
}
getAceAnnotations() {
const validationResult = this.props.queryEditor.validationResult;
const resultIsReady = (validationResult && validationResult.completed);
if (resultIsReady && validationResult.errors.length > 0) {
const errors = validationResult.errors.map(err => ({
type: 'error',
row: err.line_number - 1,
column: err.start_column - 1,
text: err.message,
}));
return errors;
}
return [];
}
render() {
return (
<AceEditor
Expand All @@ -170,6 +184,7 @@ class AceEditorWrapper extends React.PureComponent {
editorProps={{ $blockScrolling: true }}
enableLiveAutocompletion
value={this.state.sql}
annotations={this.getAceAnnotations()}
/>
);
}
Expand Down
3 changes: 1 addition & 2 deletions superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class QueryAutoRefresh extends React.PureComponent {
}
shouldCheckForQueries() {
// if there are started or running queries, this method should return true
const { queries, queriesLastUpdate } = this.props;
const { queries } = this.props;
const now = new Date().getTime();

// due to a race condition, queries can be marked as successful before the
Expand All @@ -50,7 +50,6 @@ class QueryAutoRefresh extends React.PureComponent {
);

return (
queriesLastUpdate > 0 &&
Object.values(queries).some(
q => isQueryRunning(q) &&
now - q.startDttm < MAX_QUERY_AGE_TO_POLL,
Expand Down
6 changes: 3 additions & 3 deletions superset/assets/src/SqlLab/components/QueryTable.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const propTypes = {
const defaultProps = {
columns: ['started', 'duration', 'rows'],
queries: [],
onUserClicked: () => {},
onDbClicked: () => {},
onUserClicked: () => { },
onDbClicked: () => { },
};

class QueryTable extends React.PureComponent {
Expand Down Expand Up @@ -169,7 +169,7 @@ class QueryTable extends React.PureComponent {
style={{ width: '75px' }}
striped
now={q.progress}
label={`${q.progress}%`}
label={`${q.progress.toFixed(0)}%`}
/>
);
let errorTooltip;
Expand Down
2 changes: 1 addition & 1 deletion superset/assets/src/SqlLab/components/ResultSet.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ export default class ResultSet extends React.PureComponent {
<ProgressBar
striped
now={query.progress}
label={`${query.progress}%`}
label={`${query.progress.toFixed(0)}%`}
/>);
}
if (query.trackingUrl) {
Expand Down
50 changes: 48 additions & 2 deletions superset/assets/src/SqlLab/components/ScheduleQueryButton.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,56 @@
import React from 'react';
import PropTypes from 'prop-types';
import Form from 'react-jsonschema-form';
import chrono from 'chrono-node';
import { t } from '@superset-ui/translation';

import Button from '../../components/Button';
import ModalTrigger from '../../components/ModalTrigger';

const validators = {
greater: (a, b) => a > b,
greater_equal: (a, b) => a >= b,
less: (a, b) => a < b,
less_equal: (a, b) => a <= b,
};

function getJSONSchema() {
const jsonSchema = window.featureFlags.SCHEDULED_QUERIES.JSONSCHEMA;
// parse date-time into usable value (eg, 'today' => `new Date()`)
Object.entries(jsonSchema.properties).forEach(([key, properties]) => {
if (properties.default && properties.format === 'date-time') {
jsonSchema.properties[key] = {
...properties,
default: chrono.parseDate(properties.default).toISOString(),
};
}
});
return jsonSchema;
}

function getUISchema() {
return window.featureFlags.SCHEDULED_QUERIES.UISCHEMA;
}

function getValidationRules() {
return window.featureFlags.SCHEDULED_QUERIES.VALIDATION || [];
}

function getValidator() {
const rules = getValidationRules();
return (formData, errors) => {
rules.forEach((rule) => {
const test = validators[rule.name];
const args = rule.arguments.map(name => formData[name]);
const container = rule.container || rule.arguments.slice(-1)[0];
if (!test(...args)) {
errors[container].addError(rule.message);
}
});
return errors;
};
}

const propTypes = {
defaultLabel: PropTypes.string,
sql: PropTypes.string.isRequired,
Expand Down Expand Up @@ -79,9 +124,10 @@ class ScheduleQueryButton extends React.PureComponent {
renderModalBody() {
return (
<Form
schema={window.featureFlags.SCHEDULED_QUERIES.JSONSCHEMA}
uiSchema={window.featureFlags.SCHEDULED_QUERIES.UISCHEMA}
schema={getJSONSchema()}
uiSchema={getUISchema()}
onSubmit={this.onSchedule}
validate={getValidator()}
/>
);
}
Expand Down
Loading