How to test airflow dag in unittest?


How to test airflow dag in unittest?



I am trying to test a dag with more than one task in the test environment. I was able to test single task associated with the dag but I want to create several tasks in dag and kick of the first task.
For testing one task in a dag I am using



task1.run()


task1.run()



which is getting executed. But, the same is not working when I have many tasks one after another in downstream of a dag.


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t2.set_upstream(t1)

t1.run() # It is executing just first task.



In order to run second task I have to run using t2.run() which I don't want as I am designing a DAG. How to achieve this?



The current answer(s) are out-of-date and require revision given recent changes.



Upgrading from 1.7 to 1.9 and trying to test the dag run. It is giving me error dependecies not met for the downstream task.





I am trying to test the entire run of the dag. If we place above code inside a test statement it does not work. The bottom line is I am willing to test the entire dag run which consists of many dependent tasks using pytest.
– mad_
Apr 24 at 21:14





Possible duplicate of Airflow Python Unit Test?
– tobi6
Apr 25 at 7:36





idownvotedbecau.se/noresearch
– tobi6
Apr 25 at 7:36





That link pointing to the unit test of Dag involves only one task. I want to test the entire dag rather than just one task per dag. Hope this clear out the confusion.
– mad_
Apr 25 at 12:58





What have you tried so far? How does the test code look like? What about dag.run()?
– tobi6
Apr 25 at 13:12


dag.run()




1 Answer
1



I'm not totally sure I understand your question yet, but I'll take a stab at starting an answer.



If your goal is to just run the DAG or a subsets of its tasks manually, you can achieve this from the CLI, such as:


$ airflow run ...


$ airflow test ...


$ airflow trigger_dag ...



CLI docs -
https://airflow.apache.org/cli.html



I think the airflow run command is the one most relevant to your use case.



At runtime, scheduling tasks in the DAGs and running downstream dependencies once their requirements are met is all handled automatically by the executor. You shouldn't need to call run() anywhere in your code.



As far as the run method itself, the code is still there:



Questions





Thanks Taylor for taking out time and providing the answer. I have to test a dag with dependencies inside the unit test. Therefore I do not wish to store on the file/ let the dag display on the GUI as it is a test dag. I want to load the entire dag within the unit test and trigger from there. It used to work in 1.7 but not really sure what I am missing to make it work in 1.9.Yes, it is related to my other question as it is related to different version so kept two questions separate to avoid confusion. The problem of running the dag as in the example above is it is not respecting the dependency
– mad_
1 hour ago






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Comments

Popular posts from this blog

paramiko-expect timeout is happening after executing the command

Possible Unhandled Promise Rejection (id: 0): ReferenceError: user is not defined ReferenceError: user is not defined

Opening a url is failing in Swift