Skip to content

Commit

Permalink
feat(datasource): remove deleted columns and update column type on me…
Browse files Browse the repository at this point in the history
…tadata refresh (apache#10619)

* fix: remove missing columns on metadata refresh

* add tests

* lint and simplify

* lint

* reduce locals

* fix label style
  • Loading branch information
villebro authored and Ofeknielsen committed Oct 5, 2020
1 parent c74dd73 commit f3a4cc2
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ const props = {
onChange: () => {},
};

const extraColumn = {
column_name: 'new_column',
type: 'VARCHAR(10)',
description: null,
filterable: true,
verbose_name: null,
is_dttm: false,
expression: '',
groupby: true,
};

const DATASOURCE_ENDPOINT = 'glob:*/datasource/external_metadata/*';

describe('DatasourceEditor', () => {
Expand Down Expand Up @@ -85,11 +74,65 @@ describe('DatasourceEditor', () => {
});
});

it('merges columns', () => {
it('to add, remove and modify columns accordingly', () => {
const columns = [
{
name: 'ds',
type: 'DATETIME',
nullable: true,
default: '',
primary_key: false,
},
{
name: 'gender',
type: 'VARCHAR(32)',
nullable: true,
default: '',
primary_key: false,
},
{
name: 'new_column',
type: 'VARCHAR(10)',
nullable: true,
default: '',
primary_key: false,
},
];

const numCols = props.datasource.columns.length;
expect(inst.state.databaseColumns).toHaveLength(numCols);
inst.mergeColumns([extraColumn]);
expect(inst.state.databaseColumns).toHaveLength(numCols + 1);
inst.updateColumns(columns);
expect(inst.state.databaseColumns).toEqual(
expect.arrayContaining([
{
type: 'DATETIME',
description: null,
filterable: false,
verbose_name: null,
is_dttm: true,
expression: '',
groupby: false,
column_name: 'ds',
},
{
type: 'VARCHAR(32)',
description: null,
filterable: true,
verbose_name: null,
is_dttm: false,
expression: '',
groupby: true,
column_name: 'gender',
},
expect.objectContaining({
column_name: 'new_column',
type: 'VARCHAR(10)',
}),
]),
);
expect(inst.state.databaseColumns).not.toEqual(
expect.arrayContaining([expect.objectContaining({ name: 'name' })]),
);
});

it('renders isSqla fields', () => {
Expand Down
81 changes: 61 additions & 20 deletions superset-frontend/src/datasource/DatasourceEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ function ColumnCollectionTable({
) : (
v
),
type: d => <Label bsStyle="s">{d}</Label>,
type: d => <Label>{d}</Label>,
is_dttm: checkboxGenerator,
filterable: checkboxGenerator,
groupby: checkboxGenerator,
Expand Down Expand Up @@ -289,29 +289,58 @@ export class DatasourceEditor extends React.PureComponent {
this.validate(this.onChange);
}

mergeColumns(cols) {
let { databaseColumns } = this.state;
let hasChanged;
const currentColNames = databaseColumns.map(col => col.column_name);
updateColumns(cols) {
const { databaseColumns } = this.state;
const databaseColumnNames = cols.map(col => col.name);
const currentCols = databaseColumns.reduce(
(agg, col) => ({
...agg,
[col.column_name]: col,
}),
{},
);
const finalColumns = [];
const results = {
added: [],
modified: [],
removed: databaseColumns
.map(col => col.column_name)
.filter(col => !databaseColumnNames.includes(col)),
};
cols.forEach(col => {
if (currentColNames.indexOf(col.name) < 0) {
// Adding columns
databaseColumns = databaseColumns.concat([
{
id: shortid.generate(),
column_name: col.name,
type: col.type,
groupby: true,
filterable: true,
},
]);
hasChanged = true;
const currentCol = currentCols[col.name];
if (!currentCol) {
// new column
finalColumns.push({
id: shortid.generate(),
column_name: col.name,
type: col.type,
groupby: true,
filterable: true,
});
results.added.push(col.name);
} else if (currentCol.type !== col.type) {
// modified column
finalColumns.push({
...currentCol,
type: col.type,
});
results.modified.push(col.name);
} else {
// unchanged
finalColumns.push(currentCol);
}
});
if (hasChanged) {
this.setColumns({ databaseColumns });
if (
results.added.length ||
results.modified.length ||
results.removed.length
) {
this.setColumns({ databaseColumns: finalColumns });
}
return results;
}

syncMetadata() {
const { datasource } = this.state;
// Handle carefully when the schema is empty
Expand All @@ -326,7 +355,19 @@ export class DatasourceEditor extends React.PureComponent {

SupersetClient.get({ endpoint })
.then(({ json }) => {
this.mergeColumns(json);
const results = this.updateColumns(json);
if (results.modified.length)
this.props.addSuccessToast(
t('Modified columns: %s', results.modified.join(', ')),
);
if (results.removed.length)
this.props.addSuccessToast(
t('Removed columns: %s', results.removed.join(', ')),
);
if (results.added.length)
this.props.addSuccessToast(
t('New columns added: %s', results.added.join(', ')),
);
this.props.addSuccessToast(t('Metadata has been synced'));
this.setState({ metadataLoading: false });
})
Expand Down
67 changes: 46 additions & 21 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import logging
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, Hashable, List, NamedTuple, Optional, Tuple, Union

Expand Down Expand Up @@ -82,6 +83,13 @@ class QueryStringExtended(NamedTuple):
sql: str


@dataclass
class MetadataResult:
added: List[str] = field(default_factory=list)
removed: List[str] = field(default_factory=list)
modified: List[str] = field(default_factory=list)


class AnnotationDatasource(BaseDatasource):
""" Dummy object so we can query annotations using 'Viz' objects just like
regular datasources.
Expand Down Expand Up @@ -1243,10 +1251,15 @@ def mutator(df: pd.DataFrame) -> None:
def get_sqla_table_object(self) -> Table:
return self.database.get_table(self.table_name, schema=self.schema)

def fetch_metadata(self, commit: bool = True) -> None:
"""Fetches the metadata for the table and merges it in"""
def fetch_metadata(self, commit: bool = True) -> MetadataResult:
"""
Fetches the metadata for the table and merges it in
:param commit: should the changes be committed or not.
:return: Tuple with lists of added, removed and modified column names.
"""
try:
table_ = self.get_sqla_table_object()
new_table = self.get_sqla_table_object()
except SQLAlchemyError:
raise QueryObjectValidationError(
_(
Expand All @@ -1260,35 +1273,46 @@ def fetch_metadata(self, commit: bool = True) -> None:
any_date_col = None
db_engine_spec = self.database.db_engine_spec
db_dialect = self.database.get_dialect()
dbcols = (
db.session.query(TableColumn)
.filter(TableColumn.table == self)
.filter(or_(TableColumn.column_name == col.name for col in table_.columns))
old_columns = db.session.query(TableColumn).filter(TableColumn.table == self)

old_columns_by_name = {col.column_name: col for col in old_columns}
results = MetadataResult(
removed=[
col
for col in old_columns_by_name
if col not in {col.name for col in new_table.columns}
]
)
dbcols = {dbcol.column_name: dbcol for dbcol in dbcols}

for col in table_.columns:
# clear old columns before adding modified columns back
self.columns = []
for col in new_table.columns:
try:
datatype = db_engine_spec.column_datatype_to_string(
col.type, db_dialect
)
except Exception as ex: # pylint: disable=broad-except
datatype = "UNKNOWN"
logger.error("Unrecognized data type in %s.%s", table_, col.name)
logger.error("Unrecognized data type in %s.%s", new_table, col.name)
logger.exception(ex)
dbcol = dbcols.get(col.name, None)
if not dbcol:
dbcol = TableColumn(column_name=col.name, type=datatype, table=self)
dbcol.is_dttm = dbcol.is_temporal
db_engine_spec.alter_new_orm_column(dbcol)
old_column = old_columns_by_name.get(col.name, None)
if not old_column:
results.added.append(col.name)
new_column = TableColumn(
column_name=col.name, type=datatype, table=self
)
new_column.is_dttm = new_column.is_temporal
db_engine_spec.alter_new_orm_column(new_column)
else:
dbcol.type = datatype
dbcol.groupby = True
dbcol.filterable = True
self.columns.append(dbcol)
if not any_date_col and dbcol.is_temporal:
new_column = old_column
if new_column.type != datatype:
results.modified.append(col.name)
new_column.type = datatype
new_column.groupby = True
new_column.filterable = True
self.columns.append(new_column)
if not any_date_col and new_column.is_temporal:
any_date_col = col.name

metrics.append(
SqlMetric(
metric_name="count",
Expand All @@ -1307,6 +1331,7 @@ def fetch_metadata(self, commit: bool = True) -> None:
db.session.merge(self)
if commit:
db.session.commit()
return results

@classmethod
def import_obj(
Expand Down
Loading

0 comments on commit f3a4cc2

Please sign in to comment.