From f8b947eeb5e3e84a395c64c39b682e81f3c8e111 Mon Sep 17 00:00:00 2001 From: "D. Ferruzzi" Date: Sat, 14 Oct 2023 13:22:33 -0700 Subject: [PATCH] D401 Support - WWW (#34933) --- airflow/www/api/experimental/endpoints.py | 14 ++--- airflow/www/app.py | 2 +- airflow/www/auth.py | 10 ++-- airflow/www/decorators.py | 4 +- airflow/www/extensions/init_auth_manager.py | 8 ++- airflow/www/extensions/init_manifest_files.py | 2 +- airflow/www/extensions/init_security.py | 2 +- airflow/www/fab_security/manager.py | 47 ++++++++-------- airflow/www/forms.py | 4 +- airflow/www/security_manager.py | 30 +++++----- airflow/www/utils.py | 24 ++++---- airflow/www/views.py | 56 +++++++++---------- 12 files changed, 103 insertions(+), 100 deletions(-) diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 0b7d9a91348f6..22a955610f544 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -46,7 +46,7 @@ def requires_authentication(function: T): - """Decorator for functions that require authentication.""" + """Mark a function as requiring authentication.""" @wraps(function) def decorated(*args, **kwargs): @@ -158,7 +158,7 @@ def delete_dag(dag_id): @requires_authentication def dag_runs(dag_id): """ - Returns a list of Dag Runs for a specific DAG ID. + Return a list of Dag Runs for a specific DAG ID. :query param state: a query string parameter '?state=queued|running|success...' @@ -209,7 +209,7 @@ def get_dag_code(dag_id): @api_experimental.route("/dags//tasks/", methods=["GET"]) @requires_authentication def task_info(dag_id, task_id): - """Returns a JSON with a task's public instance variables.""" + """Return a JSON with a task's public instance variables.""" try: t_info = get_task(dag_id, task_id) except AirflowException as err: @@ -227,7 +227,7 @@ def task_info(dag_id, task_id): @api_experimental.route("/dags//paused/", methods=["GET"]) @requires_authentication def dag_paused(dag_id, paused): - """(Un)pauses a dag.""" + """(Un)pause a dag.""" is_paused = bool(paused == "true") models.DagModel.get_dagmodel(dag_id).set_is_paused( @@ -252,7 +252,7 @@ def dag_is_paused(dag_id): @requires_authentication def task_instance_info(dag_id, execution_date, task_id): """ - Returns a JSON with a task instance's public instance variables. + Return a JSON with a task instance's public instance variables. The format for the exec_date is expected to be "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will @@ -289,7 +289,7 @@ def task_instance_info(dag_id, execution_date, task_id): @requires_authentication def dag_run_status(dag_id, execution_date): """ - Returns a JSON with a dag_run's public instance variables. + Return a JSON with a dag_run's public instance variables. The format for the exec_date is expected to be "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will @@ -323,7 +323,7 @@ def dag_run_status(dag_id, execution_date): @api_experimental.route("/latest_runs", methods=["GET"]) @requires_authentication def latest_dag_runs(): - """Returns the latest DagRun for each DAG formatted for the UI.""" + """Return the latest DagRun for each DAG formatted for the UI.""" from airflow.models import DagRun dagruns = DagRun.get_latest_runs() diff --git a/airflow/www/app.py b/airflow/www/app.py index 9ac01ef5b5d2c..e8da104d2237e 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -188,6 +188,6 @@ def cached_app(config=None, testing=False): def purge_cached_app(): - """Removes the cached version of the app in global state.""" + """Remove the cached version of the app in global state.""" global app app = None diff --git a/airflow/www/auth.py b/airflow/www/auth.py index 93ee8196bd575..ffd80a117c763 100644 --- a/airflow/www/auth.py +++ b/airflow/www/auth.py @@ -48,7 +48,7 @@ def get_access_denied_message(): def has_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable[[T], T]: """ - Factory for decorator that checks current user's permissions against required permissions. + Check current user's permissions against required permissions. Deprecated. Do not use this decorator, use one of the decorator `has_access_*` defined in airflow/www/auth.py instead. @@ -70,7 +70,7 @@ def has_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable def _has_access_no_details(is_authorized_callback: Callable[[], bool]) -> Callable[[T], T]: """ - Generic Decorator that checks current user's permissions against required permissions. + Check current user's permissions against required permissions. This works only for resources with no details. This function is used in some ``has_access_`` functions below. @@ -208,15 +208,15 @@ def decorated(*args, **kwargs): def has_access_dataset(method: ResourceMethod) -> Callable[[T], T]: - """Decorator that checks current user's permissions against required permissions for datasets.""" + """Check current user's permissions against required permissions for datasets.""" return _has_access_no_details(lambda: get_auth_manager().is_authorized_dataset(method=method)) def has_access_variable(method: ResourceMethod) -> Callable[[T], T]: - """Decorator that checks current user's permissions against required permissions for variables.""" + """Check current user's permissions against required permissions for variables.""" return _has_access_no_details(lambda: get_auth_manager().is_authorized_variable(method=method)) def has_access_website() -> Callable[[T], T]: - """Decorator that checks current user's permissions to access the website.""" + """Check current user's permissions to access the website.""" return _has_access_no_details(lambda: get_auth_manager().is_authorized_website()) diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py index b82731e43ef92..a4b613170cfe3 100644 --- a/airflow/www/decorators.py +++ b/airflow/www/decorators.py @@ -78,7 +78,7 @@ def _mask_connection_fields(extra_fields): def action_logging(func: Callable | None = None, event: str | None = None) -> Callable[[T], T]: - """Decorator to log user actions.""" + """Log user actions.""" def log_action(f: T) -> T: @functools.wraps(f) @@ -137,7 +137,7 @@ def wrapper(*args, **kwargs): def gzipped(f: T) -> T: - """Decorator to make a view compressed.""" + """Make a view compressed.""" @functools.wraps(f) def view_func(*args, **kwargs): diff --git a/airflow/www/extensions/init_auth_manager.py b/airflow/www/extensions/init_auth_manager.py index 32db0f2cdc907..b84e45ae7fe9a 100644 --- a/airflow/www/extensions/init_auth_manager.py +++ b/airflow/www/extensions/init_auth_manager.py @@ -30,7 +30,8 @@ def get_auth_manager_cls() -> type[BaseAuthManager]: - """Returns just the auth manager class without initializing it. + """ + Return just the auth manager class without initializing it. Useful to save execution time if only static methods need to be called. """ @@ -46,7 +47,8 @@ def get_auth_manager_cls() -> type[BaseAuthManager]: def init_auth_manager(app: Flask) -> BaseAuthManager: - """Initialize the auth manager with the given flask app object. + """ + Initialize the auth manager with the given flask app object. Import the user manager class and instantiate it. """ @@ -57,7 +59,7 @@ def init_auth_manager(app: Flask) -> BaseAuthManager: def get_auth_manager() -> BaseAuthManager: - """Returns the auth manager, provided it's been initialized before.""" + """Return the auth manager, provided it's been initialized before.""" if auth_manager is None: raise Exception( "Auth Manager has not been initialized yet. " diff --git a/airflow/www/extensions/init_manifest_files.py b/airflow/www/extensions/init_manifest_files.py index 4d491786a81fc..a487f7475c94f 100644 --- a/airflow/www/extensions/init_manifest_files.py +++ b/airflow/www/extensions/init_manifest_files.py @@ -24,7 +24,7 @@ def configure_manifest_files(app): - """Loads the manifest file and register the `url_for_asset_` template tag. + """Load the manifest file and register the `url_for_asset_` template tag. :param app: """ diff --git a/airflow/www/extensions/init_security.py b/airflow/www/extensions/init_security.py index 390c7bc2baf75..b74c61e2e63b5 100644 --- a/airflow/www/extensions/init_security.py +++ b/airflow/www/extensions/init_security.py @@ -48,7 +48,7 @@ def apply_caching(response): def init_api_experimental_auth(app): - """Loads authentication backends.""" + """Load authentication backends.""" auth_backends = "airflow.api.auth.backend.default" try: auth_backends = conf.get("api", "auth_backends") diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py index 2015a572ed2bf..21ca6a511aee8 100644 --- a/airflow/www/fab_security/manager.py +++ b/airflow/www/fab_security/manager.py @@ -78,7 +78,7 @@ def _oauth_tokengetter(token=None): - """Default function to return the current user oauth token from session cookie.""" + """Return the current user oauth token from session cookie.""" token = session.get("oauth") log.debug("Token Get: %s", token) return token @@ -378,7 +378,7 @@ def current_user(self): def oauth_user_info_getter(self, f): """ - Decorator function to be the OAuth user info getter for all the providers. + Get OAuth user info; used by all providers. Receives provider and response return a dict with the information returned from the provider. The returned user info dict should have its keys with the same name as the User Model. @@ -407,7 +407,7 @@ def wraps(provider, response=None): def get_oauth_token_key_name(self, provider): """ - Returns the token_key name for the oauth provider. + Return the token_key name for the oauth provider. If none is configured defaults to oauth_token this is configured using OAUTH_PROVIDERS and token_key key. @@ -474,7 +474,7 @@ def _rotate_session_id(self): def auth_user_db(self, username, password): """ - Method for authenticating user, auth db style. + Authenticate user, auth db style. :param username: The username or registered email address @@ -506,7 +506,7 @@ def auth_user_db(self, username, password): def _search_ldap(self, ldap, con, username): """ - Searches LDAP for user. + Search LDAP for user. :param ldap: The ldap module reference :param con: The ldap connection @@ -628,7 +628,7 @@ def ldap_extract_list(ldap_dict: dict[str, list[bytes]], field_name: str) -> lis def auth_user_ldap(self, username, password): """ - Method for authenticating user with LDAP. + Authenticate user with LDAP. NOTE: this depends on python-ldap module. @@ -885,7 +885,7 @@ def _oauth_calculate_user_roles(self, userinfo) -> list[str]: def auth_user_oauth(self, userinfo): """ - Method for authenticating user with OAuth. + Authenticate user with OAuth. :userinfo: dict with user information (keys are the same as User model columns) @@ -944,7 +944,7 @@ def auth_user_oauth(self, userinfo): return None def _has_access_builtin_roles(self, role, action_name: str, resource_name: str) -> bool: - """Checks permission on builtin role.""" + """Check permission on builtin role.""" perms = self.builtin_roles.get(role.name, []) for _resource_name, _action_name in perms: if re2.match(_resource_name, resource_name) and re2.match(_action_name, action_name): @@ -954,7 +954,8 @@ def _has_access_builtin_roles(self, role, action_name: str, resource_name: str) def _get_user_permission_resources( self, user: User | None, action_name: str, resource_names: list[str] | None = None ) -> set[str]: - """Get resource names with a certain action name that a user has access to. + """ + Get resource names with a certain action name that a user has access to. Mainly used to fetch all menu permissions on a single db call, will also check public permissions and builtin roles @@ -1018,7 +1019,7 @@ def add_limit_view(self, baseview): def add_permissions_view(self, base_action_names, resource_name): # Keep name for compatibility with FAB. """ - Adds an action on a resource to the backend. + Add an action on a resource to the backend. :param base_action_names: list of permissions from view (all exposed methods): @@ -1064,7 +1065,7 @@ def add_permissions_view(self, base_action_names, resource_name): # Keep name f def add_permissions_menu(self, resource_name): """ - Adds menu_access to resource on permission_resource. + Add menu_access to resource on permission_resource. :param resource_name: The resource name @@ -1085,7 +1086,7 @@ def get_action(self, name: str) -> Action: def security_cleanup(self, baseviews, menus): """ - Will cleanup all unused permissions from the database. + Cleanup all unused permissions from the database. :param baseviews: A list of BaseViews class :param menus: Menu class @@ -1109,7 +1110,7 @@ def security_cleanup(self, baseviews, menus): self.delete_resource(resource.name) def find_user(self, username=None, email=None): - """Generic function find a user by its username or email.""" + """Find a user by its username or email.""" raise NotImplementedError def get_role_permissions_from_db(self, role_id: int) -> list[Permission]: @@ -1117,12 +1118,12 @@ def get_role_permissions_from_db(self, role_id: int) -> list[Permission]: raise NotImplementedError def add_user(self, username, first_name, last_name, email, role, password=""): - """Generic function to create user.""" + """Create user.""" raise NotImplementedError def update_user(self, user): """ - Generic function to update user. + Update user. :param user: User model to update to database """ @@ -1135,7 +1136,7 @@ def get_all_roles(self): raise NotImplementedError def get_public_role(self): - """Returns all permissions from public role.""" + """Return all permissions from public role.""" raise NotImplementedError def filter_roles_by_perm_with_action(self, permission_name: str, role_ids: list[int]): @@ -1144,7 +1145,7 @@ def filter_roles_by_perm_with_action(self, permission_name: str, role_ids: list[ def permission_exists_in_one_or_more_roles( self, resource_name: str, action_name: str, role_ids: list[int] ) -> bool: - """Finds and returns permission views for a group of roles.""" + """Find and returns permission views for a group of roles.""" raise NotImplementedError """ @@ -1155,7 +1156,7 @@ def permission_exists_in_one_or_more_roles( def get_all_resources(self) -> list[Resource]: """ - Gets all existing resource records. + Get all existing resource records. :return: List of all resources """ @@ -1171,7 +1172,7 @@ def create_resource(self, name): def delete_resource(self, name): """ - Deletes a Resource from the backend. + Delete a Resource from the backend. :param name: name of the Resource @@ -1186,7 +1187,7 @@ def delete_resource(self, name): def get_permission(self, action_name: str, resource_name: str) -> Permission | None: """ - Gets a permission made with the given action->resource pair, if the permission already exists. + Get a permission made with the given action->resource pair, if the permission already exists. :param action_name: Name of action :param resource_name: Name of resource @@ -1205,7 +1206,7 @@ def get_resource_permissions(self, resource) -> Permission: def create_permission(self, action_name: str, resource_name: str) -> Permission | None: """ - Creates a permission linking an action and resource. + Create a permission linking an action and resource. :param action_name: Name of existing action :param resource_name: Name of existing resource @@ -1215,7 +1216,7 @@ def create_permission(self, action_name: str, resource_name: str) -> Permission def delete_permission(self, action_name: str, resource_name: str) -> None: """ - Deletes the permission linking an action->resource pair. + Delete the permission linking an action->resource pair. Doesn't delete the underlying action or resource. @@ -1249,5 +1250,5 @@ def remove_permission_from_role(self, role, permission) -> None: @staticmethod def before_request(): - """Hook runs before request.""" + """Run hook before request.""" g.user = get_auth_manager().get_user() diff --git a/airflow/www/forms.py b/airflow/www/forms.py index 9b31290451ac9..8a8f69cf4455c 100644 --- a/airflow/www/forms.py +++ b/airflow/www/forms.py @@ -142,7 +142,7 @@ class DagRunEditForm(DynamicForm): note = TextAreaField(lazy_gettext("User Note"), widget=BS3TextAreaFieldWidget()) def populate_obj(self, item): - """Populates the attributes of the passed obj with data from the form's not-read-only fields.""" + """Populate the attributes of the passed obj with data from the form's not-read-only fields.""" for name, field in self._fields.items(): if not field.flags.readonly: field.populate_obj(item, name) @@ -188,7 +188,7 @@ class TaskInstanceEditForm(DynamicForm): note = TextAreaField(lazy_gettext("User Note"), widget=BS3TextAreaFieldWidget()) def populate_obj(self, item): - """Populates the attributes of the passed obj with data from the form's not-read-only fields.""" + """Populate the attributes of the passed obj with data from the form's not-read-only fields.""" for name, field in self._fields.items(): if not field.flags.readonly: field.populate_obj(item, name) diff --git a/airflow/www/security_manager.py b/airflow/www/security_manager.py index f1a0a92919c43..580191a9cb5bf 100644 --- a/airflow/www/security_manager.py +++ b/airflow/www/security_manager.py @@ -270,7 +270,7 @@ def get_user_roles(user=None): return user.roles def get_readable_dags(self, user) -> Iterable[DagModel]: - """Gets the DAGs readable by authenticated user.""" + """Get the DAGs readable by authenticated user.""" warnings.warn( "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.", RemovedInAirflow3Warning, @@ -281,7 +281,7 @@ def get_readable_dags(self, user) -> Iterable[DagModel]: return self.get_accessible_dags([permissions.ACTION_CAN_READ], user) def get_editable_dags(self, user) -> Iterable[DagModel]: - """Gets the DAGs editable by authenticated user.""" + """Get the DAGs editable by authenticated user.""" warnings.warn( "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.", RemovedInAirflow3Warning, @@ -307,11 +307,11 @@ def get_accessible_dags( return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids))) def get_readable_dag_ids(self, user) -> set[str]: - """Gets the DAG IDs readable by authenticated user.""" + """Get the DAG IDs readable by authenticated user.""" return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ]) def get_editable_dag_ids(self, user) -> set[str]: - """Gets the DAG IDs editable by authenticated user.""" + """Get the DAG IDs editable by authenticated user.""" return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT]) @provide_session @@ -321,7 +321,7 @@ def get_accessible_dag_ids( user_actions: Container[str] | None = None, session: Session = NEW_SESSION, ) -> set[str]: - """Generic function to get readable or writable DAGs for user.""" + """Get readable or writable DAGs for user.""" if not user_actions: user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] @@ -361,7 +361,7 @@ def get_accessible_dag_ids( } def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool: - """Checks if user has read or write access to some dags.""" + """Check if user has read or write access to some dags.""" if dag_id and dag_id != "~": root_dag_id = self._get_root_dag_id(dag_id) return self.has_access(action, permissions.resource_name_for_dag(root_dag_id)) @@ -372,25 +372,25 @@ def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool: return any(self.get_editable_dag_ids(user)) def can_read_dag(self, dag_id: str, user=None) -> bool: - """Determines whether a user has DAG read access.""" + """Determine whether a user has DAG read access.""" root_dag_id = self._get_root_dag_id(dag_id) dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user) def can_edit_dag(self, dag_id: str, user=None) -> bool: - """Determines whether a user has DAG edit access.""" + """Determine whether a user has DAG edit access.""" root_dag_id = self._get_root_dag_id(dag_id) dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user) def can_delete_dag(self, dag_id: str, user=None) -> bool: - """Determines whether a user has DAG delete access.""" + """Determine whether a user has DAG delete access.""" root_dag_id = self._get_root_dag_id(dag_id) dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user) def prefixed_dag_id(self, dag_id: str) -> str: - """Returns the permission name for a DAG id.""" + """Return the permission name for a DAG id.""" warnings.warn( "`prefixed_dag_id` has been deprecated. " "Please use `airflow.security.permissions.resource_name_for_dag` instead.", @@ -401,7 +401,7 @@ def prefixed_dag_id(self, dag_id: str) -> str: return permissions.resource_name_for_dag(root_dag_id) def is_dag_resource(self, resource_name: str) -> bool: - """Determines if a resource belongs to a DAG or all DAGs.""" + """Determine if a resource belongs to a DAG or all DAGs.""" if resource_name == permissions.RESOURCE_DAG: return True return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX) @@ -516,7 +516,7 @@ def add_homepage_access_to_custom_roles(self) -> None: self.appbuilder.get_session.commit() def get_all_permissions(self) -> set[tuple[str, str]]: - """Returns all permissions as a set of tuples with the action and resource names.""" + """Return all permissions as a set of tuples with the action and resource names.""" return set( self.appbuilder.get_session.execute( select(self.action_model.name, self.resource_model.name) @@ -545,7 +545,7 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]: } def _get_all_roles_with_permissions(self) -> dict[str, Role]: - """Returns a dict with a key of role name and value of role with early loaded permissions.""" + """Return a dict with a key of role name and value of role with early loaded permissions.""" return { r.name: r for r in self.appbuilder.get_session.scalars( @@ -633,7 +633,7 @@ def sync_roles(self) -> None: self.clean_perms() def sync_resource_permissions(self, perms: Iterable[tuple[str, str]] | None = None) -> None: - """Populates resource-based permissions.""" + """Populate resource-based permissions.""" if not perms: return @@ -740,7 +740,7 @@ def check_authorization( perms: Sequence[tuple[str, str]] | None = None, dag_id: str | None = None, ) -> bool: - """Checks that the logged in user has the specified permissions.""" + """Check that the logged in user has the specified permissions.""" if not perms: return True diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 321e343fa4180..9d7728798e7bb 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -251,7 +251,7 @@ def generate_pages( sorting_direction=None, ): """ - Generates the HTML for a paging component. + Generate the HTML for a paging component. Uses a similar logic to the paging auto-generated by Flask managed views. The paging component defines a number of pages visible in the pager (window) and once the user @@ -404,19 +404,19 @@ def is_current(current, page): def epoch(dttm): - """Returns an epoch-type date (tuple with no timezone).""" + """Return an epoch-type date (tuple with no timezone).""" return (int(time.mktime(dttm.timetuple())) * 1000,) def make_cache_key(*args, **kwargs): - """Used by cache to get a unique key per URL.""" + """Get a unique key per URL; used by cache.""" path = request.path args = str(hash(frozenset(request.args.items()))) return (path + args).encode("ascii", "ignore") def task_instance_link(attr): - """Generates a URL to the Graph view for a TaskInstance.""" + """Generate a URL to the Graph view for a TaskInstance.""" dag_id = attr.get("dag_id") task_id = attr.get("task_id") execution_date = attr.get("dag_run.execution_date") or attr.get("execution_date") or timezone.utcnow() @@ -444,7 +444,7 @@ def task_instance_link(attr): def state_token(state): - """Returns a formatted string with HTML for a given State.""" + """Return a formatted string with HTML for a given State.""" color = State.color(state) fg_color = State.color_fg(state) return Markup( @@ -456,13 +456,13 @@ def state_token(state): def state_f(attr): - """Gets 'state' & returns a formatted string with HTML for a given State.""" + """Get 'state' & return a formatted string with HTML for a given State.""" state = attr.get("state") return state_token(state) def nobr_f(attr_name): - """Returns a formatted string with HTML with a Non-breaking Text element.""" + """Return a formatted string with HTML with a Non-breaking Text element.""" def nobr(attr): f = attr.get(attr_name) @@ -472,7 +472,7 @@ def nobr(attr): def datetime_f(attr_name): - """Returns a formatted string with HTML for given DataTime.""" + """Return a formatted string with HTML for given DataTime.""" def dt(attr): f = attr.get(attr_name) @@ -494,7 +494,7 @@ def datetime_html(dttm: DateTime | None) -> str: def json_f(attr_name): - """Returns a formatted string with HTML for given JSON serializable.""" + """Return a formatted string with HTML for given JSON serializable.""" def json_(attr): f = attr.get(attr_name) @@ -505,7 +505,7 @@ def json_(attr): def dag_link(attr): - """Generates a URL to the Graph view for a Dag.""" + """Generate a URL to the Graph view for a Dag.""" dag_id = attr.get("dag_id") execution_date = attr.get("execution_date") if not dag_id: @@ -515,7 +515,7 @@ def dag_link(attr): def dag_run_link(attr): - """Generates a URL to the Graph view for a DagRun.""" + """Generate a URL to the Graph view for a DagRun.""" dag_id = attr.get("dag_id") run_id = attr.get("run_id") execution_date = attr.get("dag_run.execution_date") or attr.get("execution_date") @@ -827,7 +827,7 @@ def is_utcdatetime(self, col_name): return False def is_extendedjson(self, col_name): - """Checks if it is a special extended JSON type.""" + """Check if it is a special extended JSON type.""" from airflow.utils.sqlalchemy import ExtendedJSON if col_name in self.list_columns: diff --git a/airflow/www/views.py b/airflow/www/views.py index 8495f15d07a90..5126b1ff6a11a 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -600,7 +600,7 @@ def get_task_stats_from_query(qry): def redirect_or_json(origin, msg, status="", status_code=200): """ - Returning json will allow us to more elegantly handle side effects in-page. + Return json which allows us to more elegantly handle side effects in-page. This is useful because some endpoints are called by javascript. """ @@ -713,7 +713,7 @@ class Airflow(AirflowBaseView): @expose("/health") def health(self): """ - An endpoint helping check the health status of the Airflow instance. + Check the health status of the Airflow instance. Includes metadatabase, scheduler and triggerer. """ @@ -1897,7 +1897,7 @@ def xcom(self, session: Session = NEW_SESSION): @auth.has_access_dag("DELETE") @action_logging def delete(self): - """Deletes DAG.""" + """Delete DAG.""" from airflow.api.common import delete_dag from airflow.exceptions import DagNotFound @@ -2217,7 +2217,7 @@ def _clear_dag_tis( @action_logging @provide_session def clear(self, *, session: Session = NEW_SESSION): - """Clears DAG tasks.""" + """Clear DAG tasks.""" dag_id = request.form.get("dag_id") task_id = request.form.get("task_id") origin = get_safe_url(request.form.get("origin")) @@ -2307,7 +2307,7 @@ def clear(self, *, session: Session = NEW_SESSION): @action_logging @provide_session def dagrun_clear(self, *, session: Session = NEW_SESSION): - """Clears the DagRun.""" + """Clear the DagRun.""" dag_id = request.form.get("dag_id") dag_run_id = request.form.get("dag_run_id") confirmed = request.form.get("confirmed") == "true" @@ -3135,7 +3135,7 @@ def legacy_tries(self): @action_logging @provide_session def tries(self, dag_id: str, session: Session = NEW_SESSION): - """Shows all tries.""" + """Show all tries.""" dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session) dag_model = DagModel.get_dagmodel(dag_id, session=session) if not dag: @@ -3218,7 +3218,7 @@ def legacy_landing_times(self): @action_logging @provide_session def landing_times(self, dag_id: str, session: Session = NEW_SESSION): - """Shows landing times.""" + """Show landing times.""" dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session) dag_model = DagModel.get_dagmodel(dag_id, session=session) if not dag: @@ -3341,7 +3341,7 @@ def gantt(self, dag_id: str, session: Session = NEW_SESSION): @provide_session def extra_links(self, *, session: Session = NEW_SESSION): """ - A restful endpoint that returns external links for a given Operator. + Return external links for a given Operator. It queries the operator that sent the request for the links it wishes to provide for a given external link name. @@ -3425,7 +3425,7 @@ def graph_data(self, session: Session = NEW_SESSION): @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) @action_logging def task_instances(self): - """Shows task instances.""" + """Show task instances.""" dag_id = request.args.get("dag_id") dag = get_airflow_app().dag_bag.get_dag(dag_id) @@ -3446,7 +3446,7 @@ def task_instances(self): @expose("/object/grid_data") @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) def grid_data(self): - """Returns grid data.""" + """Return grid data.""" dag_id = request.args.get("dag_id") dag = get_airflow_app().dag_bag.get_dag(dag_id) @@ -3499,7 +3499,7 @@ def grid_data(self): @expose("/object/historical_metrics_data") @auth.has_access_cluster_activity("GET") def historical_metrics_data(self): - """Returns cluster activity historical metrics.""" + """Return cluster activity historical metrics.""" start_date = _safe_parse_datetime(request.args.get("start_date")) end_date = _safe_parse_datetime(request.args.get("end_date")) @@ -3558,7 +3558,7 @@ def historical_metrics_data(self): @expose("/object/next_run_datasets/") @auth.has_access_dag("GET", DagAccessEntity.DATASET) def next_run_datasets(self, dag_id): - """Returns datasets necessary, and their status, for the next dag run.""" + """Return datasets necessary, and their status, for the next dag run.""" dag = get_airflow_app().dag_bag.get_dag(dag_id) if not dag: return {"error": f"can't find dag {dag_id}"}, 404 @@ -3601,7 +3601,7 @@ def next_run_datasets(self, dag_id): @expose("/object/dataset_dependencies") @auth.has_access_dag("GET", DagAccessEntity.DEPENDENCIES) def dataset_dependencies(self): - """Returns dataset dependencies graph.""" + """Return dataset dependencies graph.""" nodes_dict: dict[str, Any] = {} edge_tuples: set[dict[str, str]] = set() @@ -3733,7 +3733,7 @@ def datasets_summary(self): @action_logging def robots(self): """ - Returns a robots.txt file for blocking certain search engine crawlers. + Return a robots.txt file for blocking certain search engine crawlers. This mitigates some of the risk associated with exposing Airflow to the public internet, however it does not address the real security risks associated with @@ -3822,7 +3822,7 @@ class ConfigurationView(AirflowBaseView): @expose("/configuration") @auth.has_access_configuration("GET") def conf(self): - """Shows configuration.""" + """Show configuration.""" raw = request.args.get("raw") == "true" title = "Airflow Configuration" expose_config = conf.get("webserver", "expose_config").lower() @@ -3919,7 +3919,7 @@ class AirflowModelView(ModelView): def __getattribute__(self, attr): """ - Wraps action REST methods with `action_logging` wrapper. + Wrap action REST methods with `action_logging` wrapper. Overriding enables differentiating resource and generation of event name at the decorator level. @@ -3950,7 +3950,7 @@ class AirflowPrivilegeVerifierModelView(AirflowModelView): @staticmethod def validate_dag_edit_access(item: DagRun | TaskInstance): - """Validates whether the user has 'can_edit' access for this specific DAG.""" + """Validate whether the user has 'can_edit' access for this specific DAG.""" if not get_airflow_app().appbuilder.sm.can_edit_dag(item.dag_id): raise AirflowException(f"Access denied for dag_id {item.dag_id}") @@ -3974,7 +3974,7 @@ def post_delete_redirect(self): # Required to prevent redirect loop def action_has_dag_edit_access(action_func: Callable) -> Callable: - """Decorator for actions which verifies you have DAG edit access on the given tis/drs.""" + """Verify you have DAG edit access on the given tis/drs.""" @wraps(action_func) def check_dag_edit_acl_for_actions( @@ -4674,7 +4674,7 @@ def pool_link(self): return Markup('Invalid') def frunning_slots(self): - """Running slots rendering.""" + """Format running slots rendering.""" pool_id = self.get("pool") running_slots = self.get("running_slots") if pool_id is not None and running_slots is not None: @@ -4774,7 +4774,7 @@ class VariableModelView(AirflowModelView): base_order = ("key", "asc") def hidden_field_formatter(self): - """Formats hidden fields.""" + """Format hidden fields.""" key = self.get("key") val = self.get("val") if secrets_masker.should_hide_value_for_key(key): @@ -5087,7 +5087,7 @@ def _set_dag_runs_to_active_state( state: DagRunState, session: Session = NEW_SESSION, ): - """This routine only supports Running and Queued state.""" + """Set dag run to active state; this routine only supports Running and Queued state.""" try: count = 0 for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))): @@ -5163,7 +5163,7 @@ def action_set_success(self, drs: list[DagRun], session: Session = NEW_SESSION): @provide_session @action_logging def action_clear(self, drs: list[DagRun], session: Session = NEW_SESSION): - """Clears the state.""" + """Clear the state.""" try: count = 0 cleared_ti_count = 0 @@ -5467,14 +5467,14 @@ class TaskInstanceModelView(AirflowPrivilegeVerifierModelView): base_filters = [["dag_id", DagFilter, list]] def log_url_formatter(self): - """Formats log URL.""" + """Format log URL.""" log_url = self.get("log_url") return Markup( '' ).format(log_url=log_url) def duration_f(self): - """Formats duration.""" + """Format duration.""" end_date = self.get("end_date") duration = self.get("duration") if end_date and duration: @@ -5500,7 +5500,7 @@ def _clear_task_instances( self, task_instances: list[TaskInstance], session: Session, clear_downstream: bool = False ) -> tuple[int, int]: """ - Clears task instances, optionally including their downstream dependencies. + Clear task instances, optionally including their downstream dependencies. :param task_instances: list of TIs to clear :param clear_downstream: should downstream task instances be cleared as well? @@ -5571,7 +5571,7 @@ def _clear_task_instances( @provide_session @action_logging def action_clear(self, task_instances, session: Session = NEW_SESSION): - """Clears an arbitrary number of task instances.""" + """Clear an arbitrary number of task instances.""" try: count, _ = self._clear_task_instances( task_instances=task_instances, session=session, clear_downstream=False @@ -5597,7 +5597,7 @@ def action_clear(self, task_instances, session: Session = NEW_SESSION): @provide_session @action_logging def action_clear_downstream(self, task_instances, session: Session = NEW_SESSION): - """Clears an arbitrary number of task instances, including downstream dependencies.""" + """Clear an arbitrary number of task instances, including downstream dependencies.""" try: selected_ti_count, downstream_ti_count = self._clear_task_instances( task_instances=task_instances, session=session, clear_downstream=True @@ -5795,7 +5795,7 @@ def _calculate_graph(self): def add_user_permissions_to_dag(sender, template, context, **extra): """ - Adds `.can_edit`, `.can_trigger`, and `.can_delete` properties to DAG based on current user's permissions. + Add `.can_edit`, `.can_trigger`, and `.can_delete` properties to DAG based on current user's permissions. Located in `views.py` rather than the DAG model to keep permissions logic out of the Airflow core. """