Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for MIN-1312: Graceful exit of DataMover for invalid prefix #25

Merged
merged 3 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions dss_datamover/master_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ def start(self):
self.logger.info("Performing {} operation".format(self.operation))
self.load_prefix_index_data() # Load prefix metadata.
self.load_prefix_keys_for_resume_operation() # Check if resume operation required.

# Validate S3 prefix before starting the workers, to allow graceful exit of application for bad prefix
# (Fix for MIN-1312)
if self.prefix:
if not validate_s3_prefix(self.logger, self.prefix, self.fs_config.get('nfs', {})):
self.stop_logging()
sys.exit("Invalid prefix. Shutting down DataMover application")

if not self.start_workers():
self.logger.info("Exit DataMover!")
self.stop_logging()
Expand Down Expand Up @@ -425,10 +433,15 @@ def start_indexing(self):
self.logger.info('start_indexing: Processing prefixes for indexing stopped')
break
self.logger.info("Processing prefix:{}".format(prefix))
(ip_address, nfs_share, ret) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
(ip_address, nfs_share, ret, out) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
if ret != 0:
self.nfs_cluster_obj.umount_all()
self.logger.error("NFS Mounting failed, Error: {}".format(out))
self.logger.fatal("Mounting failed, EXIT indexing!")
self.indexing_started_flag.value = -1
return
if ret == 0 and is_prefix_valid_for_nfs_share(self.logger, share=nfs_share, ip_address=ip_address,
prefix=prefix):

nfs_share_prefix_path = os.path.abspath("/" + prefix)
task = Task(operation="indexing",
data=nfs_share_prefix_path,
Expand Down
5 changes: 3 additions & 2 deletions dss_datamover/nfs_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,23 @@ def mount_based_on_prefix(self, prefix):
cluster_ip = prefix[0:first_delimiter_pos]
ret = -1
nfs_share = ""
console = ""
for nfs_share in self.config[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if prefix.startswith(nfs_share_prefix):
if (cluster_ip in self.local_mounts
and nfs_share in self.local_mounts[cluster_ip]):
self.logger.info("Prefix -{} is already mounted to {}".format(
prefix, "/" + nfs_share_prefix))
return cluster_ip, nfs_share, 0
return cluster_ip, nfs_share, 0, console
else:
ret, console = self.mount(cluster_ip, nfs_share)
break
if ret == 0:
self.logger.info("Mounted NFS shares {}:{}".format(cluster_ip, nfs_share))
self.nfs_cluster.append(cluster_ip)

return cluster_ip, nfs_share, ret
return cluster_ip, nfs_share, ret, console

@exception
def mount(self, cluster_ip, nfs_share):
Expand Down
22 changes: 20 additions & 2 deletions dss_datamover/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,35 @@ def get_s3_prefix(logger, nfs_cluster, prefix=None):
yield nfs_server_ip + "/"


def validate_s3_prefix(logger, prefix):
def validate_s3_prefix(logger, prefix, config_nfs=None):
"""
Validate a given prefix. A S3 prefix should start without "/" and end with "/".
<prefix string>/
:param logger: multiprocessing logger object
:param prefix: a string
:param(optional) config_nfs: nfs_share Config dict
:return: Success/Failure
"""
inv_prefix = False
if prefix.startswith("/") or not prefix.endswith("/"):
logger.error("WRONG specification of prefix. Should be in the format of <nfs_server_ip>/<prefix>/ ")
logger.fatal("WRONG specification of prefix. Should be in the format of <nfs_server_ip>/<prefix>/ ")
return False
if config_nfs:
cluster_ip = prefix.split('/')[0]
if cluster_ip in config_nfs:
for nfs_share in config_nfs[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if prefix.startswith(nfs_share_prefix):
inv_prefix = True
break
if not inv_prefix:
logger.fatal("Specified Prefix: {} does not match any entry in the Config file nfs_share list: "
"{}.".format(prefix, config_nfs[cluster_ip]))
return False
else:
logger.fatal("Specified Prefix IP: {} does not match any entry in the Config file nfs_share IP list: {}."
.format(cluster_ip, config_nfs))
return False
return True


Expand Down