Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasource): remove deleted columns and update column type on metadata refresh #10619

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ const props = {
onChange: () => {},
};

const extraColumn = {
column_name: 'new_column',
type: 'VARCHAR(10)',
description: null,
filterable: true,
verbose_name: null,
is_dttm: false,
expression: '',
groupby: true,
};

const DATASOURCE_ENDPOINT = 'glob:*/datasource/external_metadata/*';

describe('DatasourceEditor', () => {
Expand Down Expand Up @@ -85,11 +74,65 @@ describe('DatasourceEditor', () => {
});
});

it('merges columns', () => {
it('to add, remove and modify columns accordingly', () => {
const columns = [
{
name: 'ds',
type: 'DATETIME',
nullable: true,
default: '',
primary_key: false,
},
{
name: 'gender',
type: 'VARCHAR(32)',
nullable: true,
default: '',
primary_key: false,
},
{
name: 'new_column',
type: 'VARCHAR(10)',
nullable: true,
default: '',
primary_key: false,
},
];
Comment on lines +78 to +100
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct schema coming back from the API (compare with extraColumn above).


const numCols = props.datasource.columns.length;
expect(inst.state.databaseColumns).toHaveLength(numCols);
inst.mergeColumns([extraColumn]);
expect(inst.state.databaseColumns).toHaveLength(numCols + 1);
inst.updateColumns(columns);
expect(inst.state.databaseColumns).toEqual(
expect.arrayContaining([
{
type: 'DATETIME',
description: null,
filterable: false,
verbose_name: null,
is_dttm: true,
expression: '',
groupby: false,
column_name: 'ds',
},
Comment on lines +107 to +116
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This column is unchanged.

{
type: 'VARCHAR(32)',
description: null,
filterable: true,
verbose_name: null,
is_dttm: false,
expression: '',
groupby: true,
column_name: 'gender',
},
Comment on lines +117 to +126
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type is updated from VARCHAR(16) to VARCHAR(32)

expect.objectContaining({
column_name: 'new_column',
type: 'VARCHAR(10)',
}),
Comment on lines +127 to +130
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new column.

]),
);
expect(inst.state.databaseColumns).not.toEqual(
expect.arrayContaining([expect.objectContaining({ name: 'name' })]),
);
Comment on lines +133 to +135
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the removed columns in the original metadata.

});

it('renders isSqla fields', () => {
Expand Down
73 changes: 54 additions & 19 deletions superset-frontend/src/datasource/DatasourceEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -289,29 +289,52 @@ export class DatasourceEditor extends React.PureComponent {
this.validate(this.onChange);
}

mergeColumns(cols) {
let { databaseColumns } = this.state;
let hasChanged;
const currentColNames = databaseColumns.map(col => col.column_name);
updateColumns(cols) {
Copy link
Member Author

@villebro villebro Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method was renamed to reflect that metadata is updated, not only merged.

const { databaseColumns } = this.state;
const databaseColumnNames = cols.map(col => col.name);
const currentCols = databaseColumns.reduce(
(agg, col) => ({
...agg,
[col.column_name]: col,
}),
{},
);
const finalColumns = [];
const addedColumns = [];
const modifiedColumns = [];
const removedColumns = databaseColumns
.map(col => col.column_name)
.filter(col => !databaseColumnNames.includes(col));
cols.forEach(col => {
if (currentColNames.indexOf(col.name) < 0) {
// Adding columns
databaseColumns = databaseColumns.concat([
{
id: shortid.generate(),
column_name: col.name,
type: col.type,
groupby: true,
filterable: true,
},
]);
hasChanged = true;
const currentCol = currentCols[col.name];
if (!currentCol) {
// new column
finalColumns.push({
id: shortid.generate(),
column_name: col.name,
type: col.type,
groupby: true,
filterable: true,
});
addedColumns.push(col.name);
} else if (currentCol.type !== col.type) {
// modified column
finalColumns.push({
...currentCol,
type: col.type,
});
modifiedColumns.push(col.name);
} else {
// unchanged
finalColumns.push(currentCol);
}
});
if (hasChanged) {
this.setColumns({ databaseColumns });
if (addedColumns || modifiedColumns || removedColumns) {
this.setColumns({ databaseColumns: finalColumns });
}
return [addedColumns, removedColumns, modifiedColumns];
}

syncMetadata() {
const { datasource } = this.state;
// Handle carefully when the schema is empty
Expand All @@ -326,7 +349,19 @@ export class DatasourceEditor extends React.PureComponent {

SupersetClient.get({ endpoint })
.then(({ json }) => {
this.mergeColumns(json);
const [addedCols, removedCols, modifiedCols] = this.updateColumns(json);
if (modifiedCols.length)
this.props.addSuccessToast(
t('Modified columns: %s', modifiedCols.join(', ')),
);
if (removedCols.length)
this.props.addSuccessToast(
t('Removed columns: %s', removedCols.join(', ')),
);
if (addedCols.length)
this.props.addSuccessToast(
t('New columns added: %s', addedCols.join(', ')),
);
this.props.addSuccessToast(t('Metadata has been synced'));
this.setState({ metadataLoading: false });
})
Expand Down
64 changes: 41 additions & 23 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pandas as pd
import sqlalchemy as sa
import sqlparse
from flask import escape, Markup
from flask import escape, flash, Markup
from flask_appbuilder import Model
from flask_babel import lazy_gettext as _
from jinja2.exceptions import TemplateError
Expand Down Expand Up @@ -1230,10 +1230,17 @@ def mutator(df: pd.DataFrame) -> None:
def get_sqla_table_object(self) -> Table:
return self.database.get_table(self.table_name, schema=self.schema)

def fetch_metadata(self, commit: bool = True) -> None:
"""Fetches the metadata for the table and merges it in"""
def fetch_metadata(
self, commit: bool = True
) -> Tuple[List[str], List[str], List[str]]:
"""
Fetches the metadata for the table and merges it in

:param commit: should the changes be committed or not.
:return: Tuple with lists of added, removed and modified column names.
"""
try:
table_ = self.get_sqla_table_object()
new_table = self.get_sqla_table_object()
except SQLAlchemyError:
raise QueryObjectValidationError(
_(
Expand All @@ -1247,35 +1254,45 @@ def fetch_metadata(self, commit: bool = True) -> None:
any_date_col = None
db_engine_spec = self.database.db_engine_spec
db_dialect = self.database.get_dialect()
dbcols = (
db.session.query(TableColumn)
.filter(TableColumn.table == self)
.filter(or_(TableColumn.column_name == col.name for col in table_.columns))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the intent here was, but I don't see any scenario where we want to bring in columns from another table with the same column names.

)
dbcols = {dbcol.column_name: dbcol for dbcol in dbcols}
old_columns = db.session.query(TableColumn).filter(TableColumn.table == self)

old_columns_by_name = {col.column_name: col for col in old_columns}
new_columns = {col.name for col in new_table.columns}
modified_columns: List[str] = []
added_columns: List[str] = []
removed_columns: List[str] = [
col for col in old_columns_by_name if col not in new_columns
]

for col in table_.columns:
# clear old columns before adding modified columns back
self.columns = []
for col in new_table.columns:
try:
datatype = db_engine_spec.column_datatype_to_string(
col.type, db_dialect
)
except Exception as ex: # pylint: disable=broad-except
datatype = "UNKNOWN"
logger.error("Unrecognized data type in %s.%s", table_, col.name)
logger.error("Unrecognized data type in %s.%s", new_table, col.name)
logger.exception(ex)
dbcol = dbcols.get(col.name, None)
if not dbcol:
dbcol = TableColumn(column_name=col.name, type=datatype, table=self)
dbcol.is_dttm = dbcol.is_temporal
db_engine_spec.alter_new_orm_column(dbcol)
old_column = old_columns_by_name.get(col.name, None)
if not old_column:
added_columns.append(col.name)
new_column = TableColumn(
column_name=col.name, type=datatype, table=self
)
new_column.is_dttm = new_column.is_temporal
db_engine_spec.alter_new_orm_column(new_column)
else:
dbcol.type = datatype
dbcol.groupby = True
dbcol.filterable = True
self.columns.append(dbcol)
if not any_date_col and dbcol.is_temporal:
new_column = old_column
if new_column.type != datatype:
modified_columns.append(col.name)
new_column.type = datatype
new_column.groupby = True
new_column.filterable = True
self.columns.append(new_column)
if not any_date_col and new_column.is_temporal:
any_date_col = col.name

metrics.append(
SqlMetric(
metric_name="count",
Expand All @@ -1294,6 +1311,7 @@ def fetch_metadata(self, commit: bool = True) -> None:
db.session.merge(self)
if commit:
db.session.commit()
return added_columns, removed_columns, modified_columns

@classmethod
def import_obj(
Expand Down
48 changes: 45 additions & 3 deletions superset/connectors/sqla/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Views used by the SqlAlchemy connector"""
import logging
import re
from typing import List, Union
from typing import Dict, List, Union

from flask import flash, Markup, redirect
from flask_appbuilder import CompactCRUDMixin, expose
Expand Down Expand Up @@ -428,9 +428,18 @@ def refresh( # pylint: disable=no-self-use
tables = [tables]
successes = []
failures = []
added_cols: Dict[str, List[str]] = {}
removed_cols: Dict[str, List[str]] = {}
modified_cols: Dict[str, List[str]] = {}
for table_ in tables:
try:
table_.fetch_metadata()
added, removed, modified = table_.fetch_metadata()
if added:
added_cols[table_.table_name] = added
if removed:
removed_cols[table_.table_name] = removed
if modified:
modified_cols[table_.table_name] = modified
successes.append(table_)
except Exception: # pylint: disable=broad-except
failures.append(table_)
Expand All @@ -441,9 +450,42 @@ def refresh( # pylint: disable=no-self-use
tables=", ".join([t.table_name for t in successes]),
)
flash(success_msg, "info")
if added_cols:
added_tables = []
for table, cols in added_cols.items():
added_tables.append(f"{table} ({', '.join(cols)})")
flash(
_(
"The following tables added new columns: %(tables)s",
tables=", ".join(added_tables),
),
"info",
)
if removed_cols:
removed_tables = []
for table, cols in removed_cols.items():
removed_tables.append(f"{table} ({', '.join(cols)})")
flash(
_(
"The following tables removed columns: %(tables)s",
tables=", ".join(removed_tables),
),
"info",
)
if modified_cols:
modified_tables = []
for table, cols in modified_cols.items():
modified_tables.append(f"{table} ({', '.join(cols)})")
flash(
_(
"The following tables update column metadata: %(tables)s",
tables=", ".join(modified_tables),
),
"info",
)
if len(failures) > 0:
failure_msg = _(
"Unable to retrieve metadata for the following table(s): %(tables)s",
"Unable to refresh metadata for the following table(s): %(tables)s",
tables=", ".join([t.table_name for t in failures]),
)
flash(failure_msg, "danger")
Expand Down