Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Dbt retry as airflow retry mechanism #1402

Open
1 task done
tuantran0910 opened this issue Dec 18, 2024 · 3 comments
Open
1 task done

[Feature] Dbt retry as airflow retry mechanism #1402

tuantran0910 opened this issue Dec 18, 2024 · 3 comments
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone

Comments

@tuantran0910
Copy link

Description

dbt-core version 1.9.0 has been released few months ago. One of its new feature is the incremental strategy microbatch which will split the data to process into multiple batches and process it sequentially or in parallel. If some of those batch fails, we can use the command dbt-retry to process only failed batches.

The dbt retry command will re-executes the last dbt command from the node point of failure. If the previously executed dbt command was successful, retry will finish as no operation. More details, visit here.

I wonder whether we can replace the Airflow retry mechanism by the dbt retry ? I think would be useful for the incremental strategy microbatch (appears at dbt-core from 1.9.0) as it only needs to retry the failed batch

Use case/motivation

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes, I am willing to submit a PR!
@tuantran0910 tuantran0910 added enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone labels Dec 18, 2024
@dosubot dosubot bot added the area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc label Dec 18, 2024
@pankajkoti
Copy link
Contributor

pankajkoti commented Dec 19, 2024

hi @tuantran0910 , it seems like a great idea.

I am unsure if would be trivial to replace Airflow's retry mechanism directly with dbt retry given that those handlers would be defined at Airflow's task instance level. However, how about we introduce & expose retry mechanism as part of Cosmos operators and allows users to specify if they would like to have dbt retry being tried -- by introducing the needed logic here

def handle_exception(self) -> Callable[..., None]:

Your interest in contributing this is highly appreciated, please let us know if you'd have more thoughts or would like to discuss something more here

@tuantran0910
Copy link
Author

tuantran0910 commented Dec 19, 2024

Thanks @pankajkoti for replying me, I have read the code in the file local.py and I think it's a good point if you can specify the ability of using the dbt retry in the method handle_exception_dbt_runner.

I also have one more proposal:

  • As my memory, we can determine whether a task is in a retry state by using the TaskInstance object's attributes, such as try_number (please correct me if this is wrong). So based on the pre-defined parameter retries of the task, we can determine the retry progress if try_number != 0 and try_number <= retries.
  • So in the retry progress, we can make use of dbt-retry in the task instead of running the as the same as the first run before failed.

For e.g:

def check_retry_state(**kwargs):
    # Get TaskInstance from the context
    task_instance = kwargs['ti']
    
    # Check the current try number
    current_try_number = task_instance.try_number
    max_retries = task_instance.task.retries
    
    logging.info(f"Task is on attempt {current_try_number}/{max_retries + 1}")
    
    if current_try_number > 1:
        logging.info("Task has entered retry progress.")

        # This is where dbt retry happens

    else:
        logging.info("This is the first attempt.")

@dag(schedule_interval=None, start_date=days_ago(1), catchup=False)
def retry_check_dag():
    check_retry = PythonOperator(
        task_id="check_retry_state",
        python_callable=check_retry_state,
        provide_context=True,
    )

This is my thoughts about the dbt retry. Looking forward to hearing your replies.

@pankajkoti
Copy link
Contributor

hi @tuantran0910 , yes, you're right. However, I am still not sure how trivial it would be to intercept try_number from the TaskInstance and then even if we intercept the successive steps that need to be handled for updating the Airflow database metadata with the retries. If you're trying something that could work, please definitely open a PR and we would be happy to seek reviews further (maybe from Airflow devs too if needed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

No branches or pull requests

2 participants