From 2565d13251d23e5f88602c013af295828e6c0297 Mon Sep 17 00:00:00 2001 From: whelan Date: Wed, 18 Dec 2019 09:41:51 +0800 Subject: [PATCH 1/4] Provide more information about the sql_json and csv actions and send it to App Insights --- superset/utils/log.py | 17 ++++++++++++++++- superset/views/core.py | 11 +++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/superset/utils/log.py b/superset/utils/log.py index 8532bcc7b2565..a142b63267389 100644 --- a/superset/utils/log.py +++ b/superset/utils/log.py @@ -61,6 +61,10 @@ def wrapper(*args, **kwargs): self.stats_logger.incr(f.__name__) start_dttm = datetime.now() value = f(*args, **kwargs) + extra_info = {} + if isinstance(value, tuple): + extra_info = value[1] + value = value[0] duration_ms = (datetime.now() - start_dttm).total_seconds() * 1000 # bulk insert @@ -80,6 +84,9 @@ def wrapper(*args, **kwargs): slice_id=slice_id, duration_ms=duration_ms, referrer=referrer, + database=extra_info.get("database"), + schema=extra_info.get("schema"), + sql=extra_info.get("sql"), ) return value @@ -144,6 +151,9 @@ def log(self, user_id, action, *args, **kwargs): slice_id = kwargs.get("slice_id") duration_ms = kwargs.get("duration_ms") referrer = kwargs.get("referrer") + database = kwargs.get("database") + schema = kwargs.get("schema") + sql = kwargs.get("sql") logs = list() for record in records: @@ -161,7 +171,12 @@ def log(self, user_id, action, *args, **kwargs): user_id=user_id, ) logs.append(log) - self.appinsights({'level': 'info', 'success': 'true', 'state':'finish', 'function': action, 'json': json_string, 'duration': duration_ms, 'referrer': referrer, 'user_id': user_id}) + self.appinsights( + {'level': 'info', 'success': 'true', 'state':'finish', + 'function': action, 'json': json_string, 'duration': duration_ms, + 'referrer': referrer, 'user_id': user_id, + 'database': database, 'schema': schema, 'sql': sql + }) sesh = current_app.appbuilder.get_session sesh.bulk_save_objects(logs) diff --git a/superset/views/core.py b/superset/views/core.py index fa044d9dafd46..621e516295521 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -2724,8 +2724,11 @@ def sql_json(self): # Async request. if async_flag: return self._sql_json_async(session, rendered_query, query) + + # Extra log info for App Insights + extra_info = {'database': query.database.name, 'schema': query.schema, 'sql': query.sql} # Sync request. - return self._sql_json_sync(session, rendered_query, query) + return self._sql_json_sync(session, rendered_query, query), extra_info @has_access @expose("/csv/") @@ -2779,6 +2782,9 @@ def csv(self, client_id): f"CSV exported: {repr(event_info)}", extra={"superset_event": event_info} ) + # Extra log info for App Insights + extra_info = {'database': query.database.name, 'schema': query.schema, 'sql': query.sql} + # Fetch Clickhouse secrets from ENVs CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST') CLICKHOUSE_UNAME = os.environ.get('CLICKHOUSE_UNAME') @@ -2797,7 +2803,8 @@ def generate(): s += item + ',' s = s[:-1] + '\n' yield s - return Response(generate(), mimetype='text/csv') + + return Response(generate(), mimetype='text/csv'), extra_info @api @handle_api_exception From f699710159ba34ae1b35006bb3cd855e1ae69720 Mon Sep 17 00:00:00 2001 From: whelan Date: Sun, 22 Dec 2019 15:51:15 +0800 Subject: [PATCH 2/4] Fix the asynchronous worker timeout issue Workers silent for more than this many seconds are killed and restarted. By silent, it means silent from the perspective of the arbiter process, which communicates with the workers through a temporary file. If the worker is busy sending data, it does not update that file. From the perspective of the arbiter, the worker is missing heartbeats. We add sleep(0) between generator iterations to give the worker a break and update the heartbeat file to keep the connection and worker process alive. --- requirements.txt | 2 +- setup.py | 2 +- superset/views/core.py | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5dcb507cdc3fc..159271aee2db4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,7 +24,7 @@ croniter==0.3.30 cryptography==2.7 decorator==4.4.0 # via retry defusedxml==0.6.0 # via python3-openid -flask-appbuilder==2.1.13 +flask-appbuilder==2.2.1 flask-babel==0.12.2 # via flask-appbuilder flask-caching==1.7.2 flask-compress==1.4.0 diff --git a/setup.py b/setup.py index 4e9b33656a8d9..323d3c1cc0d3c 100644 --- a/setup.py +++ b/setup.py @@ -78,7 +78,7 @@ def get_git_sha(): "croniter>=0.3.28", "cryptography>=2.4.2", "flask>=1.1.0, <2.0.0", - "flask-appbuilder>=2.1.13, <2.3.0", + "flask-appbuilder>=2.2.0, <2.3.0", "flask-caching", "flask-compress", "flask-talisman", diff --git a/superset/views/core.py b/superset/views/core.py index 621e516295521..7340245274a03 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -111,6 +111,7 @@ from clickhouse_driver import Client import os +from time import sleep config = app.config CACHE_DEFAULT_TIMEOUT = config.get("CACHE_DEFAULT_TIMEOUT", 0) @@ -2797,6 +2798,9 @@ def csv(self, client_id): def generate(): cnt = 0 for row in rows_gen: + # We add sleep(0) between generator iterations to give the worker a break + # to update the heartbeat file and keep the connection and worker process alive. + sleep(0) s = '' for item in row: item = str(item).replace('\n', '') From 0ebcffc3ac79d50014953a85b2709a811e5596ce Mon Sep 17 00:00:00 2001 From: whelan Date: Mon, 23 Dec 2019 18:52:41 +0800 Subject: [PATCH 3/4] Replace new lines and commas with spaces to avoid unexpected word conjunction. --- superset/views/core.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/superset/views/core.py b/superset/views/core.py index 7340245274a03..3bb6a215c8089 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -2803,7 +2803,20 @@ def generate(): sleep(0) s = '' for item in row: - item = str(item).replace('\n', '') + # Remove extra commas + item = str(item).replace(',', ' ') + # Remove new lines in Windows + if '\r\n' in item: + item = item.replace('\r\n', ' ') + # Remove new lines in Linux and new MacOS + if '\n' in item: + item = item.replace('\n', ' ') + # Remove new lines in old MacOS + if '\r' in item: + item = item.replace('\r', ' ') + # Escape double quotes + if '"' in item: + item = item.replace('"', '""') s += item + ',' s = s[:-1] + '\n' yield s From f8c58d4a7dff8fca439cf5058d4aec092597c7cf Mon Sep 17 00:00:00 2001 From: whelan Date: Tue, 24 Dec 2019 13:13:31 +0800 Subject: [PATCH 4/4] Fix the server error and add the CSV header. --- superset/views/core.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/superset/views/core.py b/superset/views/core.py index 3bb6a215c8089..e30258f5a93c9 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -112,6 +112,7 @@ from clickhouse_driver import Client import os from time import sleep +from urllib.parse import urlparse config = app.config CACHE_DEFAULT_TIMEOUT = config.get("CACHE_DEFAULT_TIMEOUT", 0) @@ -2739,6 +2740,12 @@ def csv(self, client_id): logging.info("Exporting CSV file [{}]".format(client_id)) query = db.session.query(Query).filter_by(client_id=client_id).one() + # Handle the situations when schema is empty + if query.schema is None: + # Utilize the endpoint of the SQLAlchemy URI as our fallback schema + query.schema = urlparse(query.database.sqlalchemy_uri).path.strip('/') + logging.info(f'Empty query schema. Replaced it with the endpoint of sqlalchemy_uri: {query.schema}') + rejected_tables = security_manager.rejected_tables( query.sql, query.database, query.schema ) @@ -2791,17 +2798,22 @@ def csv(self, client_id): CLICKHOUSE_UNAME = os.environ.get('CLICKHOUSE_UNAME') CLICKHOUSE_PWD = os.environ.get('CLICKHOUSE_PWD') client = Client(host=CLICKHOUSE_HOST, database=query.schema, user=CLICKHOUSE_UNAME, password=CLICKHOUSE_PWD) - rows_gen = client.execute_iter(sql, settings={'max_block_size': 10000}) + rows_gen = client.execute_iter(sql, with_column_types=True, settings={'max_block_size': 10000}) # Utilize the generator pattern to stream CSV contents # ref: https://flask.palletsprojects.com/en/1.1.x/patterns/streaming/ # ref: https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results def generate(): - cnt = 0 + # Determine whether this row is CSV header(columns) or not + isHeader = True for row in rows_gen: # We add sleep(0) between generator iterations to give the worker a break # to update the heartbeat file and keep the connection and worker process alive. sleep(0) s = '' + if isHeader: + # Transform headers from a list of tuples to a comma-seperated string + s = ','.join([col[0] for col in row]) + isHeader = False for item in row: # Remove extra commas item = str(item).replace(',', ' ') @@ -2821,7 +2833,12 @@ def generate(): s = s[:-1] + '\n' yield s - return Response(generate(), mimetype='text/csv'), extra_info + response = Response(generate(), mimetype='text/csv') + # Add the header to assign the filename and filename extension of the response + response.headers[ + "Content-Disposition" + ] = f"attachment; filename={query.name}.csv" + return response, extra_info @api @handle_api_exception