-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added windows extensions #16110
Added windows extensions #16110
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
|
Looks like a weekend adventure to take a look (and possibly finally use the dual boot dusted Windows partition on my Linux) |
@potiuk please let me now how I can assist you (sadly not during the weekend since I have to book my hours ^^). I am aware that the changes so far are more of workarounds than actual platform support. |
Thanks @casra-developers . We can work on asynchronous fashion all right here. And I believe adding full windows support to get Airflow up and running on Windows will take quite some time (if it will be there at all). For now what we can do we can add partial support:
And both 1) and 2) are mostly about a series of hacks to make it works for those particular scenarios. So we are well aligned here I think :P |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on the implementation style. I have a hunch the windows_extensions
submodule could have legal implications but that’s another topic…
airflow/operators/python.py
Outdated
# find python executable folder | ||
candidates = [os.path.join(tmp_dir, 'bin'), os.path.join(tmp_dir, 'scripts')] | ||
python_folder = None | ||
for candidate in candidates: | ||
if os.path.isdir(candidate): | ||
python_folder = candidate | ||
break | ||
|
||
if python_folder is None: | ||
raise AirflowException(f'Unable to find python executable in "{tempdir}"') | ||
|
||
execute_in_subprocess( | ||
cmd=[ | ||
f'{tmp_dir}/bin/python', | ||
os.path.join(python_folder, 'python'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified with shutil.which
. Instead of trying to find a parent directory to os.path.join
later:
candidate_dirs = [os.path.join(tmp_dir, "bin"), os.path.join(tmp_dir, "Scripts")]
python_executable = shutil.which("python", path=os.pathsep.join(candidate_dirs))
if not python_executable:
raise AirflowException(...)
execute_in_subprocess(cmd=[python_executable, ...])
Also, I feel the AirflowException
should show tmp_dir
instead. Showing tempdir
can be misleading because the Python executable isn’t directly in the temp directory, but subdirectory tmp_dir
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, I didn't know about this function. Updated in the latest commit.
airflow/utils/platform_utils.py
Outdated
def is_windows() -> bool: | ||
""" | ||
Returns true if executing system is Windows | ||
""" | ||
return platform.system() == 'Windows' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be a good idea to functools.lru_cache
this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obsolete now since we are using a constant in the latest commit but I will keep this in mind.
@@ -86,6 +87,10 @@ def __init__(self, local_task_job): | |||
|
|||
# pylint: disable=consider-using-with | |||
self._error_file = NamedTemporaryFile(delete=True) | |||
|
|||
# HOTFIX: When reporting exceptions, this file was usually locked because it was still opened by this process | |||
self._error_file.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong. The documentation says this would destroy the file immediately, rendering this attribute useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On Windows we basically just create an empty text file. Not closing it immediately raised an exception while logging since the file was opened by another python process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok nevermind... I removed the line and it still works on our Dask-Worker.
if is_windows(): | ||
# pylint: disable=subprocess-popen-preexec-fn,consider-using-with | ||
proc = subprocess.Popen( | ||
full_cmd, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
universal_newlines=True, | ||
close_fds=True, | ||
env=os.environ.copy() | ||
) | ||
else: | ||
# pylint: disable=subprocess-popen-preexec-fn,consider-using-with | ||
proc = subprocess.Popen( | ||
full_cmd, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
universal_newlines=True, | ||
close_fds=True, | ||
env=os.environ.copy(), | ||
preexec_fn=os.setsid, # does not exist on Windows | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use start_new_session=True
instead. This options is a shorthand to preexec_fn=os.setsid
if available, and silently ignored on Windows (if I’m understanding the documentation correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to work, I will change it. Thanks!
'which are configured differently to not find the log files.' | ||
])) | ||
result_name = result_name.replace(':', '_') | ||
return result_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should just always replace :
everywhere. It’s only there because of the datetime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also be for it. The warning is fine to keep like it, but we could simply change default configuration of log_filename_template to contain the replace filter. This will keep backwards compatibility for people who already have airflow but all new installations will use the ones with replace.
airflow/utils/process_utils.py
Outdated
if is_windows(): | ||
from airflow.windows_extensions import termios, tty, pty | ||
else: | ||
import pty | ||
import termios | ||
import tty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should have a global shim (maybe a new airflow.platform
that works similarly to airflow.compat
) and use it in all places instead of doing if is_windows()
everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do that. If I understand you correctly you basically do import distinction in one file and than just import from there.
airflow/models/baseoperator.py
Outdated
@@ -955,7 +955,7 @@ def __deepcopy__(self, memo): | |||
|
|||
for k, v in self.__dict__.items(): | |||
if k not in shallow_copy: | |||
setattr(result, k, copy.deepcopy(v, memo)) # noqa | |||
setattr(result, k, v if type(v).__name__ == 'module' else copy.deepcopy(v, memo)) # modules cannot be pickled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What module is being copied? We've already got the shallow_copy_attrs list -- so anything not deep-copyable should probably go in that list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I am not sure if this comes from the different implementations of some dependencies between Windows and the other systems. This was basically one approach to get rid of the errors popping up without having to go through all the offending attributes. I will see if I can make this a bit less "hacky".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to get clarity here indeed .
airflow/utils/platform_utils.py
Outdated
@@ -0,0 +1,16 @@ | |||
import platform | |||
|
|||
def is_windows() -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've already got airflow.utils.platform
-- put all this in there instead of creating a new module.
airflow/utils/platform_utils.py
Outdated
def is_windows() -> bool: | ||
""" | ||
Returns true if executing system is Windows | ||
""" | ||
return platform.system() == 'Windows' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a function for this either.
def is_windows() -> bool: | |
""" | |
Returns true if executing system is Windows | |
""" | |
return platform.system() == 'Windows' | |
IS_WINDOWS = platform.system() == 'Windows' | |
"""True if operating system is Windows"""``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, has been changed in the latest commit.
airflow/utils/platform_utils.py
Outdated
:param posix_option: Option for Windows system (e.g. path to executable) | ||
:returns: Choice based on platform | ||
""" | ||
return windows_option if is_windows() else posix_option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New line missing at end of file.
@@ -0,0 +1,35 @@ | |||
import enum | |||
|
|||
class Signals(enum.Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do need this on Windows? -- it doesn't support most of these signals does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, this was basically just to satisfy some of the scripts in airflow.utils. We tried to approximate the behavior of the various signal method by using things like e.g. threads for timers. Certainly not ideal, I am very open to better options.
@@ -0,0 +1 @@ | |||
from airflow.windows_extensions.Signals import Signals |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from airflow.windows_extensions.Signals import Signals | |
from airflow.windows_extensions.signals import Signals |
Module names should be lower case by convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait -- we have Signals and signals.
Don't do that. Put them both in signals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it’s Signals
and signal
, but that’s still (more?) confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uups, mistake
airflow/windows_extensions/pty.py
Outdated
# Only tested on Linux. | ||
# See: W. Richard Stevens. 1992. Advanced Programming in the | ||
# UNIX Environment. Chapter 19. | ||
# Author: Steen Lumholt -- with additions by Guido. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this file come from? What is it licensed under?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is copied from the installation directory of Python. Under Windows this is "...\Python37\Lib\pty.py". The license should be this one https://docs.python.org/3/license.html. We put it there to have our own tty module injected instead of the one it uses normally.
airflow/windows_extensions/pty.py
Outdated
def _open_terminal(): | ||
"""Open pty master and return (master_fd, tty_name).""" | ||
for x in 'pqrstuvwxyzPQRST': | ||
for y in '0123456789abcdef': | ||
pty_name = '/dev/pty' + x + y | ||
try: | ||
fd = os.open(pty_name, os.O_RDWR) | ||
except OSError: | ||
continue | ||
return (fd, '/dev/tty' + x + y) | ||
raise OSError('out of pty devices') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to work on windows?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good question. I would have assumed as much since this file comes with every python installation on Windows, but then again the reason why we pasted it in here was that the tty module did not work properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to see windows support coming in, but some refactoring to do on this PR as uranusjr has suggested
- To remove the
if_windows()
form the code base and instead just import the "right" thing formairflow.platform
etc. - I'm not sure that the
pty
module you have will work on windows?
Thank you so much for all the feedback. I will look into it and try address all the various points. |
When we made this module internally we did not really consider this but you are right. I'm not sure about the nomenclature to be honest but maybe "win_extensions" or "nt_extensions" would be less problematic. |
The module name should be fine, I think. I was actually referring to the code inside it; Apache has some pretty strict rules on what code can be used in a project, and that module seems like copied from other projects which would be problematic. |
So long as it is appropriately licensed, and you can attribute it then it's fine, and we'll put a section in the |
airflow/__main__.py
Outdated
@@ -34,6 +35,15 @@ def main(): | |||
os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache') | |||
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab') | |||
|
|||
# if dags folder has to be set to configured value, make sure it is set properly (needed on Dask-Workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably not an ideal solution and I forgot to put this in from our repository in the first commit. On Windows the Dask-Worker tries to get the DAG at the location where it is stored on the Airflow server (e.g. /home/airflow/dags). So here we actually force it to use the DAG path in the config file... I don't know how this is handled on a setup involving only Linux machines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the right fix for this.
There's a bug in a previous feature where the "dag folder" should be replaced with DAG_FOLDER
which is then automaticall replaced, but this isn't working.
We should fix that rather than adding a new config and new way of making this work.
See
Lines 160 to 167 in 9ba796e
def process_subdir(subdir: Optional[str]): | |
"""Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc.""" | |
if subdir: | |
if not settings.DAGS_FOLDER: | |
raise ValueError("DAGS_FOLDER variable in settings should be filled.") | |
subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER) | |
subdir = os.path.abspath(os.path.expanduser(subdir)) | |
return subdir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Also see #16423 - might be the right time to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more comment here. If we are talking about scheduler running on Linux/Unix and worker running on Windows, we also have problem with directory separator (/
-> \
) for dags that are in sub-folders.
I thought we could make airflow to accept the file URI instead in the command line: https://datatracker.ietf.org/doc/html/rfc8089 but it does not support relative paths (officially at least).
So probably the best approach is to make Airlfow replaces /
to \
on windows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Windows API can handle both forward and backward slashes (except some very old legacy stuff, none of which is used by Python’s path libraries), so it should be fine if we use forward slashes everywhere. For rare cases where some third-party packages rely on legacy APIs, we can call os.path.normpath
to convert the slashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah cool. Did not know that (not a Windows user :D)
I thought it's a Java-specfic behaviour only (it was part of the specification from the beginning). So no problem here.
How should we go about the licensing topic? Should I copy the license note from python.org somewhere? |
I feel your frustration in the question. So let me try to be helpful and explain a bit. I'd advise you to continue rebasing and ping us when you see tests succed. There are conflicts now as you can see. And Ping everyone here when it succeds. In a way when you are a PR "author" it's part of the author's "job" to drag attention of others when you see your PR is ready to merge (green or you only see unrelated issues). This is the best you can do. It becomes quite obvious if you try to put yourself in our shoes. For you this is only (or maybe one of a few) PRs that you care about. Yet we - as committers had 340 (!) PR merged overt the course of 1 month https://github.com/apache/airflow/pulse/monthly) . There are just a handful of commiters and this is ~20 PRs merged per working day. Some of them taking 20-30 comments on. As you can possibly imagine none of us is on a lookou to merge PRs at the moment they succeed. We have likely 100-200 completeed PR jobs a day. This is also a reason why sometimes main is broken as things slip through. So it's simply much easier for you to pay an attention and ping use when you see things are "good" for the PR you "care" about. I understand you would like to work in "fire and forget" mode, but simply this is difficult (but possibly merge queue feature which I wrote about earlier, will help with that). Just as a general note, there is also the old saying In fact if something is more painful, do it more often. if you rebase more often, it pains less overall becaue a) you do it in smaller chunks, b) you learn how to do it fast. I've learned how to rebase my commits and resolve conflicts quickly during working on Airlfow. So, apologies if it takes longer than you thought and that you have to do it several times, but this is how it works and best you can do is to "vet" your PR and be a little annoying (but not after some time - immediately when you see it is ready to merge). I hope that is helpful and provides you the right context on why things are like that . Thanks for your understanding. |
@potiuk thank you for your very helpful advice. In my case it is less about frustration but more about the time I am able to spend on this topic. Since I am part of an SD Dev team at CASRA, these are company resources which are invested. The company has decided that it wants to support Open Source projects such as Airflow but obviously we have to commit to a budget. With that said, it is true that the rebasing gets less painful every time I do it and therefore it is now something that I can do quite comfortably besides other tasks (including the testing on our Airflow-Linux-Windows test environment). Something which I probably should have asked earlier is if it is expected of me to investigate those 4 failing checks or if this is something that will be fixed by just rebasing to the current main branch? To be honest, I looked at the checks but I cannot really see how they would relate to my changes therefore I did not feel responsible to investigate further. Please let me know if I can assist in any way. Airflow is an awesome platform and I am happy to help within the scope available to me. |
Sure. Yeah. I understand the time constrints. If in doubt - just rebase :) |
If it does not help - ask here |
Ah .. closed/reopened by accident :) |
@potiuk Resolved all conflicts and tested within our Linux-Windows setup. |
🤞 |
You got unlucky on temporary docker unavailability :(. But also there was linux-only static check fail, I added a fixup to ignore it and committed it to re-run. 🤞 again. |
Did another rebase 👍 |
I told you it gets the easier, the more often you do it ;) |
Seems like the pwname not used is the last problem @casra-developers |
Added a fixup to fix the static-checks |
@potiuk thank you very much for adding the #noqa in base_task_runner.py. I've rebased the branch 👍 |
Awesome work, congrats on your first merged pull request! |
🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 |
Awesome! Thanks everyone for the patience and support :-) |
Likewise! |
This PR was created after a discussion in this post. @potiuk asked to be mentioned here so he can work with us to integrate those gradual changes.
The aim of this PR is to gradually enable support for the Windows Platform. The changes in this PR allow Airflow to be used with on a Windows System as Dask-Worker. Other things like Web-Server and Task-Scheduling are not possible because of the way processes are handled in Airflow. Next steps would be to find suitable alternatives to those POSIX process management concepts that work on Windows.