Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for MIN-1312: Graceful exit of DataMover for invalid prefix #25

Merged
merged 3 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions dss_datamover/master_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Master(object):
def __init__(self, operation, config):
manager = Manager()
self.config = config
self.config_nfs = config["fs_config"]["nfs"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong way of getting nfs shares when the variable self.fs_config is already initialized at the bottom.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated usage in latest commit.

self.client_ip_list = config.get("clients_hosts_or_ip_addresses", [])
self.workers_count = config["master"]["workers"]
self.max_index_size = config["master"]["max_index_size"]
Expand Down Expand Up @@ -179,6 +180,27 @@ def start(self):
self.logger.info("Performing {} operation".format(self.operation))
self.load_prefix_index_data() # Load prefix metadata.
self.load_prefix_keys_for_resume_operation() # Check if resume operation required.

# Validate S3 prefix before starting the workers, to allow graceful exit of application for bad prefix
# (Fix for MIN-1312)
inv_prefix = 0
if self.prefix:
if not validate_s3_prefix(self.logger, self.prefix):
self.logger.fatal("Bad prefix specified, exit application.")
self.stop_logging()
sys.exit("Invalid prefix. Shutting down DataMover application")
cluster_ip = self.prefix.split('/')[0]
for nfs_share in self.config_nfs[cluster_ip]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cluster_ip in prefix is invalid IP address, then you get an exception for trying to use that in self.config_nfs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the whole code into validate_s3_prefix function so that it looks nice. You can pass necessary parameters like self.prefix, self.config_nfs etc as positional ones and do the whole validation inside that function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in latest commit.

nfs_share_prefix = cluster_ip + nfs_share
if not self.prefix.startswith(nfs_share_prefix):
inv_prefix += 1
if inv_prefix == len(self.config_nfs[cluster_ip]):
self.logger.fatal("Specified Prefix: {} does not match any entry in the Config file nfs_share list: "
"{}.".format(self.prefix, self.config_nfs[cluster_ip]))
self.stop_logging()
sys.exit("Specified prefix does not match Config entries. Please specify correct prefix or update "
"the Config file entries to continue!")

if not self.start_workers():
self.logger.info("Exit DataMover!")
self.stop_logging()
Expand Down Expand Up @@ -425,10 +447,15 @@ def start_indexing(self):
self.logger.info('start_indexing: Processing prefixes for indexing stopped')
break
self.logger.info("Processing prefix:{}".format(prefix))
(ip_address, nfs_share, ret) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
(ip_address, nfs_share, ret, out) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
if ret != 0:
self.nfs_cluster_obj.umount_all()
self.logger.error("NFS Mounting failed, Error: \n**{}**".format(out))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you adding newline \n. Not acceptable. Also remove *'s as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in latest commit.

self.logger.fatal("Mounting failed, EXIT indexing!")
self.indexing_started_flag.value = -1
return
if ret == 0 and is_prefix_valid_for_nfs_share(self.logger, share=nfs_share, ip_address=ip_address,
prefix=prefix):

nfs_share_prefix_path = os.path.abspath("/" + prefix)
task = Task(operation="indexing",
data=nfs_share_prefix_path,
Expand Down
5 changes: 3 additions & 2 deletions dss_datamover/nfs_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,23 @@ def mount_based_on_prefix(self, prefix):
cluster_ip = prefix[0:first_delimiter_pos]
ret = -1
nfs_share = ""
console = ""
for nfs_share in self.config[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if prefix.startswith(nfs_share_prefix):
if (cluster_ip in self.local_mounts
and nfs_share in self.local_mounts[cluster_ip]):
self.logger.info("Prefix -{} is already mounted to {}".format(
prefix, "/" + nfs_share_prefix))
return cluster_ip, nfs_share, 0
return cluster_ip, nfs_share, 0, console
else:
ret, console = self.mount(cluster_ip, nfs_share)
break
if ret == 0:
self.logger.info("Mounted NFS shares {}:{}".format(cluster_ip, nfs_share))
self.nfs_cluster.append(cluster_ip)

return cluster_ip, nfs_share, ret
return cluster_ip, nfs_share, ret, console

@exception
def mount(self, cluster_ip, nfs_share):
Expand Down