Skip to content

Commit

Permalink
Merge pull request #25 from OpenMPDK/invalid_prefix
Browse files Browse the repository at this point in the history
Fix for MIN-1312: Graceful exit of DataMover for invalid prefix
  • Loading branch information
grandsuri authored Mar 1, 2023
2 parents 951c0ed + 6b76ab6 commit 4224e7d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
17 changes: 15 additions & 2 deletions dss_datamover/master_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ def start(self):
self.logger.info("Performing {} operation".format(self.operation))
self.load_prefix_index_data() # Load prefix metadata.
self.load_prefix_keys_for_resume_operation() # Check if resume operation required.

# Validate S3 prefix before starting the workers, to allow graceful exit of application for bad prefix
# (Fix for MIN-1312)
if self.prefix:
if not validate_s3_prefix(self.logger, self.prefix, self.fs_config.get('nfs', {})):
self.stop_logging()
sys.exit("Invalid prefix. Shutting down DataMover application")

if not self.start_workers():
self.logger.info("Exit DataMover!")
self.stop_logging()
Expand Down Expand Up @@ -422,10 +430,15 @@ def start_indexing(self):
self.logger.info('start_indexing: Processing prefixes for indexing stopped')
break
self.logger.info("Processing prefix:{}".format(prefix))
(ip_address, nfs_share, ret) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
(ip_address, nfs_share, ret, out) = self.nfs_cluster_obj.mount_based_on_prefix(prefix)
if ret != 0:
self.nfs_cluster_obj.umount_all()
self.logger.error("NFS Mounting failed, Error: {}".format(out))
self.logger.fatal("Mounting failed, EXIT indexing!")
self.indexing_started_flag.value = -1
return
if ret == 0 and is_prefix_valid_for_nfs_share(self.logger, share=nfs_share, ip_address=ip_address,
prefix=prefix):

nfs_share_prefix_path = os.path.abspath("/" + prefix)
task = Task(operation="indexing",
data=nfs_share_prefix_path,
Expand Down
5 changes: 3 additions & 2 deletions dss_datamover/nfs_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,23 @@ def mount_based_on_prefix(self, prefix):
cluster_ip = prefix[0:first_delimiter_pos]
ret = -1
nfs_share = ""
console = ""
for nfs_share in self.config[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if prefix.startswith(nfs_share_prefix):
if (cluster_ip in self.local_mounts
and nfs_share in self.local_mounts[cluster_ip]):
self.logger.info("Prefix -{} is already mounted to {}".format(
prefix, "/" + nfs_share_prefix))
return cluster_ip, nfs_share, 0
return cluster_ip, nfs_share, 0, console
else:
ret, console = self.mount(cluster_ip, nfs_share)
break
if ret == 0:
self.logger.info("Mounted NFS shares {}:{}".format(cluster_ip, nfs_share))
self.nfs_cluster.append(cluster_ip)

return cluster_ip, nfs_share, ret
return cluster_ip, nfs_share, ret, console

@exception
def mount(self, cluster_ip, nfs_share):
Expand Down
22 changes: 20 additions & 2 deletions dss_datamover/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,35 @@ def get_s3_prefix(logger, nfs_cluster, prefix=None):
yield nfs_server_ip + "/"


def validate_s3_prefix(logger, prefix):
def validate_s3_prefix(logger, prefix, config_nfs=None):
"""
Validate a given prefix. A S3 prefix should start without "/" and end with "/".
<prefix string>/
:param logger: multiprocessing logger object
:param prefix: a string
:param(optional) config_nfs: nfs_share Config dict
:return: Success/Failure
"""
inv_prefix = False
if prefix.startswith("/") or not prefix.endswith("/"):
logger.error("WRONG specification of prefix. Should be in the format of <nfs_server_ip>/<prefix>/ ")
logger.fatal("WRONG specification of prefix. Should be in the format of <nfs_server_ip>/<prefix>/ ")
return False
if config_nfs:
cluster_ip = prefix.split('/')[0]
if cluster_ip in config_nfs:
for nfs_share in config_nfs[cluster_ip]:
nfs_share_prefix = cluster_ip + nfs_share
if prefix.startswith(nfs_share_prefix):
inv_prefix = True
break
if not inv_prefix:
logger.fatal("Specified Prefix: {} does not match any entry in the Config file nfs_share list: "
"{}.".format(prefix, config_nfs[cluster_ip]))
return False
else:
logger.fatal("Specified Prefix IP: {} does not match any entry in the Config file nfs_share IP list: {}."
.format(cluster_ip, config_nfs))
return False
return True


Expand Down

0 comments on commit 4224e7d

Please sign in to comment.