-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.10.5
What happened?
The logic for connections export in 2.10
airflow/airflow-core/src/airflow/cli/utils.py
Lines 39 to 49 in ea61ff7
| def is_stdout(fileio: IOBase) -> bool: | |
| """ | |
| Check whether a file IO is stdout. | |
| The intended use case for this helper is to check whether an argument parsed | |
| with argparse.FileType points to stdout (by setting the path to ``-``). This | |
| is why there is no equivalent for stderr; argparse does not allow using it. | |
| .. warning:: *fileio* must be open for this check to be successful. | |
| """ | |
| return fileio.fileno() == sys.stdout.fileno() |
def is_stdout(fileio: IOBase) -> bool:
return fileio.fileno() == sys.stdout.fileno()
fails on my server because stdout doesn't have a fileno
connections_export(args)
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/connection_command.py", line 176, in connections_export
if is_stdout(f):
^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/utils.py", line 44, in is_stdout
return fileio.fileno() == sys.stdout.fileno()
^^^^^^^^^^^^^^^^^^^
io.UnsupportedOperation: fileno
I was using 2.7.3 before and the implementation was
airflow/airflow/cli/commands/connection_command.py
Lines 144 to 145 in f124353
| def _is_stdout(fileio: io.TextIOWrapper) -> bool: | |
| return fileio.name == "<stdout>" |
def _is_stdout(fileio: io.TextIOWrapper) -> bool:
return fileio.name == "<stdout>"
I am relying on this public method to export connections to the same way the CLI would, without having to spawn a process to actually run the export
What you think should happen instead?
I'd like a way to use a public method of airflow to export connections to the same format the connections CLI export uses. The goal is that I'm migrating them programmatically to another Airflow instance and importing them with the connections import command.
How to reproduce
I see the issue when running from airflow.cli.commands.connection_command import connections_export from a kubernetes container.
Complete snippet to export connections to dict:
def _get_airflow_connections() -> Dict[str, Any]:
"""
Retrieve airflow connections using
the same code as `airflow connections export`
"""
class FakeFileStringIO(io.StringIO):
"""
Airflow expects a real file. StringIO has the same interface
but lacks the `name` attribute, so we patch it
"""
def __init__(self):
super().__init__()
self.content = ""
def write(self, s: str) -> int:
self.content += s
return len(s)
def fileno(self):
return None
name = "FakeFileStringIO"
class Args:
"""
This class builds an object expected by `connections_export`
with a fake file io that writes to a string stream.
"""
def __init__(self, f):
self.file = f
self.format = None
self.file_format = "json"
self.serialization_format = None
with FakeFileStringIO() as file:
args = Args(file)
connections_export(args)
return json.loads(args.file.content)Operating System
google cloud composer 2
Versions of Apache Airflow Providers
No response
Deployment
Google Cloud Composer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct