Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a fd leak issue #575

Merged
merged 1 commit into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compiler/ast_to_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,15 +863,15 @@ def preprocess_node_case(ast_node, irFileGen, config, last_object=False):
## If we are need to disable parallel pipelines, e.g., if we are in the context of an if,
## or if we are in the end of a script, then we set a variable.
def replace_df_region(asts, irFileGen, config, disable_parallel_pipelines=False, ast_text=None):
_, ir_filename = ptempfile()
ir_filename = ptempfile()

## Serialize the node in a file
with open(ir_filename, "wb") as ir_file:
pickle.dump(asts, ir_file)

## Serialize the candidate df_region asts back to shell
## so that the sequential script can be run in parallel to the compilation.
_, sequential_script_file_name = ptempfile()
sequential_script_file_name = ptempfile()
## If we don't have the original ast text, we need to unparse the ast
if (ast_text is None):
kv_asts = [ast_node_to_untyped_deep(ast) for ast in asts]
Expand Down
4 changes: 2 additions & 2 deletions compiler/dspash/ir_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def save_configs(graph:IR, dfs_configs_paths: Dict[HDFSFileConfig, str]):
resource : DFSSplitResource = edge.get_resource()
config: HDFSFileConfig = resource.config
if config not in dfs_configs_paths:
_, config_path = ptempfile()
config_path = ptempfile()
with open(config_path, "w") as f:
f.write(config)
dfs_configs_paths[config] = config_path
Expand All @@ -57,7 +57,7 @@ def save_configs(graph:IR, dfs_configs_paths: Dict[HDFSFileConfig, str]):
resource.set_config_path(config_path)

def to_shell_file(graph: IR, args) -> str:
_, filename = ptempfile()
filename = ptempfile()

dirs = set()
for edge in graph.all_fids():
Expand Down
4 changes: 2 additions & 2 deletions compiler/pash.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def preprocess_and_execute_asts(ast_objects, args, input_script_arguments, shell
preprocessed_shell_script = preprocess_ast(ast_objects, args)

## Write the new shell script to a file to execute
_, fname = ptempfile()
fname = ptempfile()
log("Preprocessed script stored in:", fname)
with open(fname, 'w') as new_shell_file:
new_shell_file.write(preprocessed_shell_script)
Expand Down Expand Up @@ -190,7 +190,7 @@ def parse_args():
shell_name = "pash"

if args.command is not None:
_, fname = ptempfile()
fname = ptempfile()
with open(fname, 'w') as f:
f.write(args.command)
## If the shell is invoked with -c and arguments after it, then these arguments
Expand Down
4 changes: 2 additions & 2 deletions compiler/pash_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def compile_optimize_output_script(ir_filename, compiled_script_file, args, comp
## which should be translated to a parallel script.
if(isinstance(optimized_ast_or_ir, IR)):
if args.distributed_exec:
_, ir_filename = ptempfile()
ir_filename = ptempfile()
script_to_execute = f"$PASH_TOP/compiler/dspash/remote_exec_graph.sh {ir_filename}\n"
## This might not be needed anymore (since the output script is output anyway)
## TODO: This is probably useless, remove
Expand Down Expand Up @@ -381,7 +381,7 @@ def split_hdfs_cat_input(hdfs_cat, next_node, graph, fileIdGen):

# Create a cat command per file block
file_config = hdfs_utils.get_file_config(hdfs_filepath)
_, dummy_config_path = ptempfile() # Dummy config file, should be updated by workers
dummy_config_path = ptempfile() # Dummy config file, should be updated by workers
for split_num, block in enumerate(file_config.blocks):
resource = DFSSplitResource(file_config.dumps(), dummy_config_path, split_num, block.hosts)
block_fid = fileIdGen.next_file_id()
Expand Down
2 changes: 1 addition & 1 deletion compiler/pash_runtime_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def init_bash_mirror_subprocess():
echo=False)
## If we are in debug mode also log the bash's output
if (config.pash_args.debug >= 1):
_, file_to_save_output = ptempfile()
file_to_save_output = ptempfile()
log("bash mirror log saved in:", file_to_save_output)
fout = open(file_to_save_output, "w")
p.logfile = fout
Expand Down
6 changes: 5 additions & 1 deletion compiler/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta
import os
import sys
import config
import tempfile
Expand Down Expand Up @@ -40,4 +41,7 @@ def log(*args, end='\n', level=1):
print(config.LOGGING_PREFIX, *args, file=f, end=end, flush=True)

def ptempfile():
return tempfile.mkstemp(dir=config.PASH_TMP_PREFIX)
fd, name = tempfile.mkstemp(dir=config.PASH_TMP_PREFIX)
## TODO: Get a name without opening the fd too if possible
os.close(fd)
return name