From 1220a92315b38b110217c1614395f978c6733603 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Sat, 18 Jun 2022 13:46:19 -0400 Subject: [PATCH] Fix a fd leak issue Signed-off-by: Konstantinos Kallas --- compiler/ast_to_ir.py | 4 ++-- compiler/dspash/ir_helper.py | 4 ++-- compiler/pash.py | 4 ++-- compiler/pash_runtime.py | 4 ++-- compiler/pash_runtime_daemon.py | 2 +- compiler/util.py | 6 +++++- 6 files changed, 14 insertions(+), 10 deletions(-) diff --git a/compiler/ast_to_ir.py b/compiler/ast_to_ir.py index 48676520c..edd8abc5f 100644 --- a/compiler/ast_to_ir.py +++ b/compiler/ast_to_ir.py @@ -863,7 +863,7 @@ def preprocess_node_case(ast_node, irFileGen, config, last_object=False): ## If we are need to disable parallel pipelines, e.g., if we are in the context of an if, ## or if we are in the end of a script, then we set a variable. def replace_df_region(asts, irFileGen, config, disable_parallel_pipelines=False, ast_text=None): - _, ir_filename = ptempfile() + ir_filename = ptempfile() ## Serialize the node in a file with open(ir_filename, "wb") as ir_file: @@ -871,7 +871,7 @@ def replace_df_region(asts, irFileGen, config, disable_parallel_pipelines=False, ## Serialize the candidate df_region asts back to shell ## so that the sequential script can be run in parallel to the compilation. - _, sequential_script_file_name = ptempfile() + sequential_script_file_name = ptempfile() ## If we don't have the original ast text, we need to unparse the ast if (ast_text is None): kv_asts = [ast_node_to_untyped_deep(ast) for ast in asts] diff --git a/compiler/dspash/ir_helper.py b/compiler/dspash/ir_helper.py index 9cb1602f5..e75f959ed 100644 --- a/compiler/dspash/ir_helper.py +++ b/compiler/dspash/ir_helper.py @@ -47,7 +47,7 @@ def save_configs(graph:IR, dfs_configs_paths: Dict[HDFSFileConfig, str]): resource : DFSSplitResource = edge.get_resource() config: HDFSFileConfig = resource.config if config not in dfs_configs_paths: - _, config_path = ptempfile() + config_path = ptempfile() with open(config_path, "w") as f: f.write(config) dfs_configs_paths[config] = config_path @@ -57,7 +57,7 @@ def save_configs(graph:IR, dfs_configs_paths: Dict[HDFSFileConfig, str]): resource.set_config_path(config_path) def to_shell_file(graph: IR, args) -> str: - _, filename = ptempfile() + filename = ptempfile() dirs = set() for edge in graph.all_fids(): diff --git a/compiler/pash.py b/compiler/pash.py index adcf1dd02..ac7500e1a 100755 --- a/compiler/pash.py +++ b/compiler/pash.py @@ -63,7 +63,7 @@ def preprocess_and_execute_asts(ast_objects, args, input_script_arguments, shell preprocessed_shell_script = preprocess_ast(ast_objects, args) ## Write the new shell script to a file to execute - _, fname = ptempfile() + fname = ptempfile() log("Preprocessed script stored in:", fname) with open(fname, 'w') as new_shell_file: new_shell_file.write(preprocessed_shell_script) @@ -190,7 +190,7 @@ def parse_args(): shell_name = "pash" if args.command is not None: - _, fname = ptempfile() + fname = ptempfile() with open(fname, 'w') as f: f.write(args.command) ## If the shell is invoked with -c and arguments after it, then these arguments diff --git a/compiler/pash_runtime.py b/compiler/pash_runtime.py index e2155e6ee..b5f881119 100644 --- a/compiler/pash_runtime.py +++ b/compiler/pash_runtime.py @@ -116,7 +116,7 @@ def compile_optimize_output_script(ir_filename, compiled_script_file, args, comp ## which should be translated to a parallel script. if(isinstance(optimized_ast_or_ir, IR)): if args.distributed_exec: - _, ir_filename = ptempfile() + ir_filename = ptempfile() script_to_execute = f"$PASH_TOP/compiler/dspash/remote_exec_graph.sh {ir_filename}\n" ## This might not be needed anymore (since the output script is output anyway) ## TODO: This is probably useless, remove @@ -381,7 +381,7 @@ def split_hdfs_cat_input(hdfs_cat, next_node, graph, fileIdGen): # Create a cat command per file block file_config = hdfs_utils.get_file_config(hdfs_filepath) - _, dummy_config_path = ptempfile() # Dummy config file, should be updated by workers + dummy_config_path = ptempfile() # Dummy config file, should be updated by workers for split_num, block in enumerate(file_config.blocks): resource = DFSSplitResource(file_config.dumps(), dummy_config_path, split_num, block.hosts) block_fid = fileIdGen.next_file_id() diff --git a/compiler/pash_runtime_daemon.py b/compiler/pash_runtime_daemon.py index 54733b68c..5c241eeb1 100644 --- a/compiler/pash_runtime_daemon.py +++ b/compiler/pash_runtime_daemon.py @@ -82,7 +82,7 @@ def init_bash_mirror_subprocess(): echo=False) ## If we are in debug mode also log the bash's output if (config.pash_args.debug >= 1): - _, file_to_save_output = ptempfile() + file_to_save_output = ptempfile() log("bash mirror log saved in:", file_to_save_output) fout = open(file_to_save_output, "w") p.logfile = fout diff --git a/compiler/util.py b/compiler/util.py index 3dd63b9de..a6b3857a9 100644 --- a/compiler/util.py +++ b/compiler/util.py @@ -1,4 +1,5 @@ from datetime import timedelta +import os import sys import config import tempfile @@ -40,4 +41,7 @@ def log(*args, end='\n', level=1): print(config.LOGGING_PREFIX, *args, file=f, end=end, flush=True) def ptempfile(): - return tempfile.mkstemp(dir=config.PASH_TMP_PREFIX) + fd, name = tempfile.mkstemp(dir=config.PASH_TMP_PREFIX) + ## TODO: Get a name without opening the fd too if possible + os.close(fd) + return name