Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Impala impersonation by passing cursor configuration. #4699

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ def get_configuration_for_impersonation(cls, uri, impersonate_user, username):
"""
return {}

@classmethod
def get_cursor_configuration_for_impersonation(cls, uri, impersonate_user, username):
"""
Return a cursor configuration dictionary that can be merged with other configs
that can set the correct properties for impersonating users
:param uri: URI string
:param impersonate_user: Bool indicating if impersonation is enabled
:param username: Effective username
:return: Dictionary with configs required for impersonation
"""
return {}


class PostgresBaseEngineSpec(BaseEngineSpec):
""" Abstract class for Postgres 'like' databases """
Expand Down Expand Up @@ -1046,7 +1058,7 @@ def get_configuration_for_impersonation(cls, uri, impersonate_user, username):
if (backend_name == 'hive' and 'auth' in url.query.keys() and
impersonate_user is True and username is not None):
configuration['hive.server2.proxy.user'] = username
return configuration
return { 'connect_args': {'configuration': configuration} }


class MssqlEngineSpec(BaseEngineSpec):
Expand Down Expand Up @@ -1203,6 +1215,8 @@ class ImpalaEngineSpec(BaseEngineSpec):
time_grains = (
Grain('Time Column', _('Time Column'), '{col}'),
Grain('minute', _('minute'), "TRUNC({col}, 'MI')"),
Grain('5 minute', _('5 minute'), "CAST(FLOOR(CAST(`{col}` as int)/300)*300 as TIMESTAMP)"),
Grain('10 minute', _('10 minute'), "CAST(FLOOR(CAST(`{col}` as int)/600)*600 as TIMESTAMP)"),
Grain('hour', _('hour'), "TRUNC({col}, 'HH')"),
Grain('day', _('day'), "TRUNC({col}, 'DD')"),
Grain('week', _('week'), "TRUNC({col}, 'WW')"),
Expand All @@ -1219,10 +1233,18 @@ def convert_dttm(cls, target_type, dttm):
return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S'))

@classmethod
def get_schema_names(cls, inspector):
schemas = [row[0] for row in inspector.engine.execute('SHOW SCHEMAS')
if not row[0].startswith('_')]
return schemas
def get_configuration_for_impersonation(cls, uri, impersonate_user, username):
logging.debug('Passing Impala execution_options.cursor_configuration for impersonation')
return { 'execution_options': { 'cursor_configuration': {'impala.doas.user': username } } }

@classmethod
def get_cursor_configuration_for_impersonation(cls, uri, impersonate_user, username):
logging.debug('Passing Impala cursor configuration for impersonation')
return { 'configuration': {'impala.doas.user': username } }

@classmethod
def modify_url_for_impersonation(cls, url, impersonate_user, username):
pass


class DruidEngineSpec(BaseEngineSpec):
Expand Down
3 changes: 2 additions & 1 deletion superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ def get_sqla_engine(self, schema=None, nullpool=True, user_name=None):
logging.info('Database.get_sqla_engine(). Masked URL: {0}'.format(masked_url))

params = extra.get('engine_params', {})
self.cursor_kwargs = self.db_engine_spec.get_cursor_configuration_for_impersonation(str(url), self.impersonate_user, effective_username)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stamping into self.cursor_kwargs here is brittle as it assumes method call ordering which isn't an explicit assumption. Either make this happen on the __init__ method if the context is available at that point, or use a @property.

if nullpool:
params['poolclass'] = NullPool

Expand All @@ -676,7 +677,7 @@ def get_sqla_engine(self, schema=None, nullpool=True, user_name=None):
self.impersonate_user,
effective_username))
if configuration:
params['connect_args'] = {'configuration': configuration}
params.update(configuration)

DB_CONNECTION_MUTATOR = config.get('DB_CONNECTION_MUTATOR')
if DB_CONNECTION_MUTATOR:
Expand Down
2 changes: 1 addition & 1 deletion superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def handle_error(msg):
user_name=user_name,
)
conn = engine.raw_connection()
cursor = conn.cursor()
cursor = conn.cursor(**database.cursor_kwargs)
logging.info('Running query: \n{}'.format(executed_sql))
logging.info(query.executed_sql)
cursor.execute(query.executed_sql,
Expand Down
5 changes: 3 additions & 2 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1684,10 +1684,11 @@ def testconn(self):
.get('engine_params', {})
.get('connect_args', {}))

engine_params = {}
if configuration:
connect_args['configuration'] = configuration
engine_params.update(configuration) # impersonation configuration have to include parent keys, like "connect_args" and "configuration"

engine = create_engine(uri, connect_args=connect_args)
engine = create_engine(uri, **engine_params)
engine.connect()
return json_success(json.dumps(engine.table_names(), indent=4))
except Exception as e:
Expand Down