Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Airflow DAG processor horizontally scalable #33117

Closed
wants to merge 1 commit into from
Closed

Enable Airflow DAG processor horizontally scalable #33117

wants to merge 1 commit into from

Conversation

pegasas
Copy link
Contributor

@pegasas pegasas commented Aug 4, 2023


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:helm-chart Airflow Helm Chart area:Scheduler including HA (high availability) scheduler labels Aug 4, 2023
@pegasas
Copy link
Contributor Author

pegasas commented Aug 4, 2023

try to fix #32966

@pegasas pegasas marked this pull request as ready for review August 4, 2023 16:05
@potiuk
Copy link
Member

potiuk commented Aug 4, 2023

Not even close. I am not sure what you are trying to do and why but this is wrong.

@pegasas pegasas closed this Aug 4, 2023
@pegasas pegasas reopened this Aug 4, 2023
@pegasas pegasas closed this Aug 4, 2023
@pegasas
Copy link
Contributor Author

pegasas commented Aug 4, 2023

Not even close. I am not sure what you are trying to do and why but this is wrong.

Hi, @potiuk ,

I saw comments from you and @raphaelauv /@dirrao and read the source code

You can manually horizontally scale the DAG processor by using the subdir option https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#dag-processor.

I saw dag_processor_command.py , it passes sub_dir and DagFileProcessorManager runs DagFileProcessorProcess in multi-processing mode.
So I attended to make it a default 'True' option for option 'standalone_dag_processor'.

did you mean we should deploy the DAG processor in distributed nodes?

If then, I will think about how we hosts subdir DAG file distributed collaboration.

@pegasas pegasas reopened this Aug 4, 2023
@pegasas pegasas marked this pull request as draft August 4, 2023 16:28
@potiuk
Copy link
Member

potiuk commented Aug 4, 2023

No. The idea is to implement multiple Dag File Processors that work on the same --subdir and share the load - somehow synchronising between themselves so that they do not process the same files at the same time.

Closing, it, it's result of misunderstanding what the idea is about.

@potiuk potiuk closed this Aug 4, 2023
@dstandish
Copy link
Contributor

you might look at some of the "good first issue" issues here @pegasas https://github.com/apache/airflow/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22

@pegasas
Copy link
Contributor Author

pegasas commented Aug 4, 2023

Thanks @potiuk for clarification!

Actually, I have considered this problem, too, do we have to do multi-process on a single file?
cause as I see, each DagFileProcessorProcess read DAG from the file/zip and saves it to DB.

def _load_modules_from_file(self, filepath, safe_mode):

If we make it multi-process and use synchronising between themselves,
we have to increase their lock expropriation,
but I think the bottleneck may be the File IO.

Hi, @dstandish ,

Actually, I indeed have a good-first-issue PR which is requesting your review #32858,
sorry I am still in the learning stage and I'd like your suggestions in the future.

How do you think about this in your opinion?

@potiuk
Copy link
Member

potiuk commented Aug 4, 2023

Yes. We have to. This is for security and isolation. And quite the oposite - maybe I do not understand what "expropriation" means (difficult word) and what it means in the context of locking.

I think you need to explain your reasoning.

Which locks you talk about ? what contention ? with what?

From what I know how it works multiprocessing (with forking as used here) are the only reasonable way to achieve high parallelism and using all the cores for just parsing. each process has it's own file locks and have no contentions with the other parsers. Then they save to database as single operation when the complete parsing.

I am not sure what you are proposing instead - taking into account GIL limitations of Python, it would be completely unfeasible to use Threading in this case instead but maybe you have something else in mind.

Please elaborate what your proposal is.

@pegasas
Copy link
Contributor Author

pegasas commented Aug 4, 2023

sorry @potiuk , thanks for your patient and guidance at me .

From what I know:

  1. we have supported multi process on subdir, which each DagFileProcessorProcess handles one file.
  2. I was thinking if your last comments are talking about we needs multi process on a single file, each process has it's own file locks, performance may not better than one process on a single file.

BTW. Horizontal scaling of DAG file processor would really be needed if your Python Parsing gets a lot of CPU cycles. And mostly when you have big number of smal Python files.

do you means we should use single process for reading file, but use multi process for parsing after it is loaded?

@pegasas pegasas deleted the horizon branch August 4, 2023 19:36
@potiuk
Copy link
Member

potiuk commented Aug 4, 2023

do you means we should use single process for reading file, but use multi process for parsing after it is loaded?

really no idea what you propose. If you can be clear on that, then we can have a conversation, but I do not understand what your proposal is.

@pegasas
Copy link
Contributor Author

pegasas commented Aug 5, 2023

Hi, @potiuk ,

I'm not trying to argue, and I'm 100% agreeing with using multi process instead of multi thread.

my understanding:

  • DagFileProcessorManager runs multiple processes DagFileProcessorProcess, now single DagFileProcessorProcess reading and parsing single Python file.

my confusing part:

No. The idea is to implement multiple Dag File Processors that work on the same --subdir and share the load - somehow synchronising between themselves so that they do not process the same files at the same time.

I thought we have already implemented multi process in reading python file, I am not sure of the way you mentioned here.

From what I know how it works multiprocessing (with forking as used here) are the only reasonable way to achieve high parallelism and using all the cores for just parsing. each process has it's own file locks and have no contentions with the other parsers. Then they save to database as single operation when the complete parsing.

My question #1: Do you mean that we should multi process in reading & parsing single file?

avatar

My question #2: If the answer of #1 is NO, which part is multi process used for?

My question #3: If the answer of #1 is YES, like you said, each process has it's own file locks and have no contentions with the other parsers. Would multi process be helpful in this IO bound scenario?

@potiuk
Copy link
Member

potiuk commented Aug 5, 2023

We already do multiprocessing for parsing the files. Each file is processed in a separately forked process (that communicates with the main process via multi-processing). You seem to insist on doing something that we already have if I understand correctly.

Have you looked at the source code? I strongly recommend to, before proposing something.

@pegasas
Copy link
Contributor Author

pegasas commented Aug 5, 2023

Sorry @potiuk,

So the answer to #32966 is that there is nothing we should do for this issue because current design have fixed it?

I know the code you mentioned in

def _load_modules_from_file(self, filepath, safe_mode):
from airflow.models.dag import DagContext
if not might_contain_dag(filepath, safe_mode):
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
return []
self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
path_hash = hashlib.sha1(filepath.encode("utf-8")).hexdigest()
mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}"
if mod_name in sys.modules:
del sys.modules[mod_name]
DagContext.current_autoregister_module_name = mod_name
def parse(mod_name, filepath):
try:
loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
spec = importlib.util.spec_from_loader(mod_name, loader)
new_module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = new_module
loader.exec_module(new_module)
return [new_module]
except Exception as e:
DagContext.autoregistered_dags.clear()
self.log.exception("Failed to import: %s", filepath)
if self.dagbag_import_error_tracebacks:
self.import_errors[filepath] = traceback.format_exc(
limit=-self.dagbag_import_error_traceback_depth
)
else:
self.import_errors[filepath] = str(e)
return []
, and I do read it, so I feel weird about that.

you are my first expect of my open source road, I am appreciated.
I am not insisting on doing anything
I just want to make this issue clear and close it.
I am truly sorry if any of my words makes you uncomfortable.

@potiuk
Copy link
Member

potiuk commented Aug 5, 2023

It's not about making people uncomfortable, but about trying to make your proposal clear so that people do not need to loose time on trying to understand what you want. All this should be backed not on assumptions but on looking at the code and if you do base it on assumptions, clearly state it.

Your proposals were (and continue to be) confusing. After those iterations of your proposals, I am still not sure what you are proposing and whether your read the code, because you make some assumptions without checking them and without stating that you are making assumptions.

From your coments it looks lke you know what you are talking about, but in fact you don't. It almost looks like you are using chatbot to generate your questions. They look vaguely familiar to those "hallucitanating" chatbots that seem to know what they are talking about but what they say is not based on facts.

Check your assumptions, if you don't know - state your assumptions.

And no. There is STILL somethig to do here. you might want to utilise not multi-processing but multi-node setup, similarly like scheduler is horizontally scalable now. You can run mutlple schedulers and they will scale - the more nodes you have, the more CPU on multiple machines they will be able to use. Similarly the issue is about being able to run multiple dag file processors on several different machines, parsing the same shared folder in order to scale horizontally.

@potiuk
Copy link
Member

potiuk commented Aug 5, 2023

And BTW. The code you are referring to is wrong. What you are referring to is generic DAGBag code. But you should look what DagFileProcessor does. It spawns forks for every file it finds and the forked process got the FILE as input not folder. You missed the fact that DAGBAg can be given a file not only a folder.

@pegasas
Copy link
Contributor Author

pegasas commented Aug 5, 2023

Your proposals were (and continue to be) confusing. After those iterations of your proposals, I am still not sure what you are proposing and whether your read the code, because you make some assumptions without checking them and without stating that you are making assumptions.

I get your point. I will refer to other AIP/issues to see how to make a suggestion clear. I tried to solve this in this next proposal.

Similarly the issue is about being able to run multiple dag file processors on several different machines, parsing the same shared folder in order to scale horizontally.

These solved my doubt. I will dig into this and tries to come to a clear suggestion.

The code you are referring to is wrong. What you are referring to is a generic DAGBag code. But you should look at what DagFileProcessor does. It spawns forks for every file it finds and the forked process got the FILE as input, not the folder. You missed the fact that DAGBAg can be given a file not only a folder.

you are right. next time I will refer to the right place to make my suggestion clear. make my proposal more clear.

@pegasas
Copy link
Contributor Author

pegasas commented Aug 14, 2023

Hi, @potiuk , Here's my new proposal.

To implement multi DAG processor embedded in the scheduler on different nodes, we should:

https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L1075-L1101

  1. Saving DAG files in DFS, I will start from S3, and leverage S3Hook. https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/hooks/s3.py
  2. Leverage AMQP-support MQ from Celery for the scheduler on different nodes for picking the DFS file path. After parsing the DAG and saving it to DB, the message will be confirmed & deleted.

What do you think about it?

@potiuk
Copy link
Member

potiuk commented Aug 14, 2023

Can you perform some calculations and realistic simulations showing how this would help to achieve better performance? I think there is a big risks that your idea of horizontal scalability is there to implement the idea of horizontal scalability. But whether it solves any problem and allows to achieve some use cases and getting things better and more performant? With S3 and celery as broker of the messages, I am not sure if the overhead connected with it would justify potential gains.

Only some realistic case and performance benchmarks could justify it.

Also if you want to propose it, you have to consider alternatives and see how they compare - both performance wise and complexity-wise.

And it's not an abstract ask.

This is precisely what we've done when we've implemented Horizontal scalability for Scheduler. If you read that post https://www.astronomer.io/blog/airflow-2-scheduler/ which summarizes all the effort done there. It's just an icing on the cake that is a result of a numerous discussion we had on the devlist, discussing draft Airflow Improvement Proposals, and when we gone through prototype code walkthroughs with Ash where he explained us how the idea works.

When you look at the blog post - it explains why we implemented it and why we made the decisions - it was based on some observations and benchmarks that we have a real bottleneck on some realistic cases, then prototyping was done and and benchmarking on several approaches we could take, finally the currrent "Database locking" mechanism with SKIP_LOCKED was chosen based on those benchmarks and analysis as a good balance between simplicity and achieved gains.

HINT: we've chosen the DB because it did not require to add any component, messaging communication queues, zookeper and many other thing that we could choose if there that would awfully complicate Airflow deployments.

So, by the sheer look of it, your proposal goes into opposite direction comparing to what we decided for Scheduler. That does not mean it is wrong, but it's a hint, that likely it's not preferred one. But if you perform analysis of performance/gains/risks/complexity of this and compare with alternative solutions that you considered, discuss it at the devlist, turn it into Airflow Improvement Proposal, pass it throuhg voting and implement, then of course, there is a possibility this might be something that the community will be willing to accept.

@pegasas
Copy link
Contributor Author

pegasas commented Aug 14, 2023

This is precisely what we've done when we've implemented Horizontal scalability for Scheduler. If you read that post https://www.astronomer.io/blog/airflow-2-scheduler/ which summarizes all the effort done there. It's just an icing on the cake that is a result of a numerous discussion we had on the devlist, discussing draft Airflow Improvement Proposals, and when we gone through prototype code walkthroughs with Ash where he explained us how the idea works.

Great Work! I indeed only read the discussion in AIP-15 previously, it mentioned a lot of thinking about HA scheduler and some debates about consensus algorithms.

I will continue to follow this issue, thanks for guidelines!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:helm-chart Airflow Helm Chart area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants