Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for MIN-1312: Graceful exit of DataMover for invalid prefix #25

Merged
merged 3 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions dss_datamover/master_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ def start(self):
self.logger.info("Performing {} operation".format(self.operation))
self.load_prefix_index_data() # Load prefix metadata.
self.load_prefix_keys_for_resume_operation() # Check if resume operation required.

# Validate S3 prefix before starting the workers, to allow graceful exit of application for bad prefix
# (Fix for MIN-1312)
if self.prefix:
if not validate_s3_prefix(self.logger, self.prefix, self.fs_config.get('nfs', {})):
self.stop_logging()
sys.exit("Invalid prefix. Shutting down DataMover application")

if not self.start_workers():
self.logger.info("Exit DataMover!")
self.stop_logging()
Expand Down Expand Up @@ -425,10 +433,15 @@ def start_indexing(self):
self.logger.info('start_indexing: Processing prefixes for indexing stopped')
break
self.logger.info("Processing prefix:{}".format(prefix))
(ip_address, nfs_share, ret) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
(ip_address, nfs_share, ret, out) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
if ret != 0:
self.nfs_cluster_obj.umount_all()
self.logger.error("NFS Mounting failed, Error: {}".format(out))
self.logger.fatal("Mounting failed, EXIT indexing!")
self.indexing_started_flag.value = -1
return
if ret == 0 and is_prefix_valid_for_nfs_share(self.logger, share=nfs_share, ip_address=ip_address,
prefix=prefix):

nfs_share_prefix_path = os.path.abspath("/" + prefix)
task = Task(operation="indexing",
data=nfs_share_prefix_path,
Expand Down
5 changes: 3 additions & 2 deletions dss_datamover/nfs_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,23 @@ def mount_based_on_prefix(self, prefix):
cluster_ip = prefix[0:first_delimiter_pos]
ret = -1
nfs_share = ""
console = ""
for nfs_share in self.config[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if prefix.startswith(nfs_share_prefix):
if (cluster_ip in self.local_mounts
and nfs_share in self.local_mounts[cluster_ip]):
self.logger.info("Prefix -{} is already mounted to {}".format(
prefix, "/" + nfs_share_prefix))
return cluster_ip, nfs_share, 0
return cluster_ip, nfs_share, 0, console
else:
ret, console = self.mount(cluster_ip, nfs_share)
break
if ret == 0:
self.logger.info("Mounted NFS shares {}:{}".format(cluster_ip, nfs_share))
self.nfs_cluster.append(cluster_ip)

return cluster_ip, nfs_share, ret
return cluster_ip, nfs_share, ret, console

@exception
def mount(self, cluster_ip, nfs_share):
Expand Down
21 changes: 19 additions & 2 deletions dss_datamover/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,34 @@ def get_s3_prefix(logger, nfs_cluster, prefix=None):
yield nfs_server_ip + "/"


def validate_s3_prefix(logger, prefix):
def validate_s3_prefix(logger, prefix, config_nfs=None):
"""
Validate a given prefix. A S3 prefix should start without "/" and end with "/".
<prefix string>/
:param logger: multiprocessing logger object
:param prefix: a string
:param(optional) config_nfs: nfs_share Config dict
:return: Success/Failure
"""
inv_prefix = 0
if prefix.startswith("/") or not prefix.endswith("/"):
logger.error("WRONG specification of prefix. Should be in the format of <nfs_server_ip>/<prefix>/ ")
logger.fatal("WRONG specification of prefix. Should be in the format of <nfs_server_ip>/<prefix>/ ")
return False
if config_nfs is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if config_nfs: is sufficient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

cluster_ip = prefix.split('/')[0]
if config_nfs and cluster_ip in config_nfs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already checked config_nfs on the top. Why to check again?

Copy link
Author

@sohomb91 sohomb91 Mar 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was actually looking to make a double validation...
the 1st clause was to check whether the parameter value was being passed by the caller and the 2nd clause was to check whether the passed value was an empty dict....

Anyways, it did seem clogged up, so have updated the code in my latest commit.

for nfs_share in config_nfs[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if not prefix.startswith(nfs_share_prefix):
inv_prefix += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could have optimized better. Make inv_prefix as a bool with False. If it gets detected, mark it as True and break rather than looping all the entries of config_nfs[cluster_ip]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does make sense... have updated the code to reflect such.

if inv_prefix == len(config_nfs[cluster_ip]):
logger.fatal("Specified Prefix: {} does not match any entry in the Config file nfs_share list: "
"{}.".format(prefix, config_nfs[cluster_ip]))
return False
else:
logger.fatal("Specified Prefix IP: {} does not match any entry in the Config file nfs_share IP list: {}."
.format(cluster_ip, config_nfs))
return False
return True


Expand Down