From e5881bf3a2ac72e6382f3b60fb6e5f47d682b131 Mon Sep 17 00:00:00 2001 From: Annpurna Shahani <30636132+Annu149@users.noreply.github.com> Date: Fri, 28 Oct 2022 16:02:28 +0530 Subject: [PATCH 1/9] Replace scp with rsync (#14145) Security vulnerability was found in the scp program shipped with the openssh-clients package and CVSS score for this security vulnerability is 7.8 (https://access.redhat.com/security/cve/cve-2020-15778). Recommended action was to replace scp with rsync so, * Replaced scp with rsync in gpdb cm utilities and utility files * Renamed gpscp utility to gpsync after replacing scp with rsync in utility code --- gpMgmt/bin/Makefile | 23 +- gpMgmt/bin/README.md | 28 +- gpMgmt/bin/gpcheckperf | 8 +- gpMgmt/bin/gpexpand | 10 +- gpMgmt/bin/gpmemwatcher | 2 +- gpMgmt/bin/gppylib/commands/gp.py | 4 +- gpMgmt/bin/gppylib/commands/unix.py | 6 +- .../gppylib/operations/buildMirrorSegments.py | 2 +- gpMgmt/bin/gppylib/operations/package.py | 32 +- .../test/regress/test_package/__init__.py | 4 +- .../test_regress_muck_with_internals.py | 14 +- ..._regress_muck_with_internals_on_standby.py | 16 +- .../gppylib/operations/test/test_package.py | 6 +- .../gppylib/test/unit/test_unit_package.py | 6 +- gpMgmt/bin/gpssh-exkeys | 4 +- gpMgmt/bin/{gpscp => gpsync} | 17 +- gpMgmt/bin/lib/gp_bash_functions.sh | 4 +- gpMgmt/test/behave/mgmt_utils/gppkg.feature | 2 +- .../mgmt_utils/steps/gpconfig_mgmt_utils.py | 4 +- .../steps/gpssh_exkeys_mgmt_utils.py | 2 +- .../behave/mgmt_utils/steps/mgmt_utils.py | 485 +++++++++++++++--- 21 files changed, 507 insertions(+), 172 deletions(-) rename gpMgmt/bin/{gpscp => gpsync} (86%) diff --git a/gpMgmt/bin/Makefile b/gpMgmt/bin/Makefile index 24e70491184..70d650fc772 100644 --- a/gpMgmt/bin/Makefile +++ b/gpMgmt/bin/Makefile @@ -13,27 +13,19 @@ SUBDIRS += ifaddrs $(recurse) PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \ - gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \ + gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \ gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \ - gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \ - gpstate gpstop minirepro gpmemwatcher gpmemreport gpdemo gpdirtableload - -GPDEMO_LIBS = gpdemo-defaults.sh lalshell generate_certs.sh demo_cluster.sh \ - probe_config.sh README + gppkg gprecoverseg gpreload gpsync gpsd gpssh gpssh-exkeys gpstart \ + gpstate gpstop minirepro gpmemwatcher gpmemreport installdirs: $(MKDIR_P) '$(DESTDIR)$(bindir)/lib' - $(MKDIR_P) '$(DESTDIR)$(bindir)/lib/gpdemo' installprograms: installdirs for file in $(PROGRAMS); do \ $(INSTALL_SCRIPT) $$file '$(DESTDIR)$(bindir)/'$$file ; \ $(PERL) $(top_builddir)/putversion '$(DESTDIR)$(bindir)/'$$file ; \ done - # install dependencies of gpdemo - for file in $(GPDEMO_LIBS); do \ - $(INSTALL_SCRIPT) $(top_builddir)/gpAux/gpdemo/$$file '$(DESTDIR)$(bindir)/lib/gpdemo/'$$file ; \ - done # Symlink gpcheckcat from bin to bin/lib to maintain backward compatibility if [ ! -L $(DESTDIR)$(bindir)/lib/gpcheckcat ]; then \ cd $(DESTDIR)$(bindir)/lib/ && $(LN_S) ../gpcheckcat gpcheckcat; \ @@ -44,9 +36,6 @@ uninstall: for file in $(PROGRAMS); do \ rm -f '$(DESTDIR)$(bindir)/'$$file ; \ done - for file in $(GPDEMO_LIBS); do \ - rm -f '$(DESTDIR)$(bindir)/lib/gpdemo/'$$file ; \ - done rm -f '$(DESTDIR)$(bindir)/gpload.bat' # @@ -115,7 +104,7 @@ pyyaml: @echo "--- pyyaml" cd $(PYLIB_SRC_EXT)/ && $(TAR) xzf $(PYYAML_DIR).tar.gz cd $(PYLIB_SRC_EXT)/$(PYYAML_DIR)/ && env -u CC python3 setup.py build - cp -r $(PYLIB_SRC_EXT)/$(PYYAML_DIR)/build/lib*-3*/* $(PYLIB_DIR) + cp -r $(PYLIB_SRC_EXT)/$(PYYAML_DIR)/build/lib*-3.*/* $(PYLIB_DIR) # # PYLINT @@ -194,7 +183,7 @@ clean distclean: rm -rf *.pyc rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \ gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \ - gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \ - gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \ + gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \ + gppkgc gprecoversegc gpreloadc gpscpc gpsyncc gpsdc gpssh-exkeysc gpsshc \ gpstartc gpstatec gpstopc minireproc rm -f gpconfig_modules/gucs_disallowed_in_file.txt diff --git a/gpMgmt/bin/README.md b/gpMgmt/bin/README.md index 674a209e0f0..b003592b16c 100644 --- a/gpMgmt/bin/README.md +++ b/gpMgmt/bin/README.md @@ -28,21 +28,21 @@ Where Things Go List of Management Scripts Written in Bash ------------------------------------------ -bin/gpinitsystem - Creates a new Cloudberry Database +bin/gpinitsystem - Creates a new Greenplum Database bin/gpload - Sets env variables and calls gpload.py List of Management Scripts Written in Python (no libraries) ----------------------------------------------------------- -bin/gpload.py - Loads data into a Cloudberry Database +bin/gpload.py - Loads data into a Greenplum Database List of Management Scripts Written in Python (gpmlib - old libraries) --------------------------------------------------------------------- bin/gpaddmirrors - Adds mirrors to an array (needs rewrite) bin/gprecoverseg - Recovers a failed segment (needs rewrite) -bin/gpcheckperf - Checks the hardware for Cloudberry Database -bin/gpscp - Copies files to many hosts +bin/gpcheckperf - Checks the hardware for Greenplum Database +bin/gpsync - Copies files to many hosts bin/gpssh - Remote shell to many hosts bin/gpssh-exkeys - Exchange ssh keys between many hosts @@ -51,12 +51,12 @@ List of Management Scripts Written in Python (gppylib - current libraries) -------------------------------------------------------------------------- bin/gpactivatestandby - Activates the Standby Coordinator bin/gpconfig_helper - Edits postgresql.conf file for all segments -bin/gpdeletesystem - Deletes a Cloudberry Database -bin/gpexpand - Adds additional segments to a Cloudberry Database +bin/gpdeletesystem - Deletes a Greenplum Database +bin/gpexpand - Adds additional segments to a Greenplum Database bin/gpinitstandby - Initializes standby coordinator bin/gplogfilter - Filters log files -bin/gpstart - Start a Cloudberry Database -bin/gpstop - Stop a Cloudberry Database +bin/gpstart - Start a Greenplum Database +bin/gpstop - Stop a Greenplum Database sbin/gpconfig_helper.py - Helper script for gpconfig sbin/gpsegcopy - Helper script for gpexpand @@ -76,10 +76,10 @@ gparray.py +- SegmentPair - Configuration information for a single content id | \- Contains multiple Segment objects | - +- GpArray - Configuration information for a Cloudberry Database + +- GpArray - Configuration information for a Greenplum Database \- Contains multiple SegmentPair objects -gplog.py - Utility functions to assist in Cloudberry standard logging +gplog.py - Utility functions to assist in Greenplum standard logging gpparseopts.py - Wrapper around optparse library to aid in locating help files @@ -143,9 +143,9 @@ db/dbconn.py - Connections to the database | +- Should have a wrapper class around a pygresql connection object! -util/gp_utils.py - Cloudberry related utility functions that are not Commands -util/ssh_session.py - SSH and SCP related utility functions brought in from gpmlib.py/gplib.py - that are used by gpssh, gpscp and gpssh-exkeys +util/gp_utils.py - Greenplum related utility functions that are not Commands +util/ssh_session.py - SSH and RSYNC related utility functions brought in from gpmlib.py/gplib.py + that are used by gpssh, gpsync and gpssh-exkeys ## Testing Management Scripts (unit tests) @@ -175,7 +175,7 @@ tests that do not require a running cluster. ## Testing Management Scripts (behave tests) -Behave tests require a running Cloudberry cluster, and additional python libraries for testing, available to gpadmin. +Behave tests require a running Greenplum cluster, and additional python libraries for testing, available to gpadmin. Thus, you can install these additional python libraries using any of the following methods: diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index 79ca3ec2792..09bb496a669 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -103,8 +103,8 @@ def gpssh(cmd): return not rc, out -def gpscp(src, dst): - c = ['%s/bin/gpscp' % GPHOME] +def gpsync(src, dst): + c = ['%s/bin/gpsync' % GPHOME] if GV.opt['-V']: c.append('-v') if GV.opt['-f']: @@ -391,9 +391,9 @@ def copyExecOver(fname): if not os.access(path, os.X_OK): sys.exit('[Exit] file not executable: ' + path) - (ok, out) = gpscp(path, '=:%s' % target) + (ok, out) = gpsync(path, '=:%s' % target) if not ok: - sys.exit('[Error] command failed: gpscp %s =:%s with output: %s' % (path, target, out)) + sys.exit('[Error] command failed: gpsync %s =:%s with output: %s' % (path, target, out)) # chmod +x file (ok, out) = gpssh('chmod a+rx %s' % target) diff --git a/gpMgmt/bin/gpexpand b/gpMgmt/bin/gpexpand index f9d0fe36b2b..3825095829a 100755 --- a/gpMgmt/bin/gpexpand +++ b/gpMgmt/bin/gpexpand @@ -442,7 +442,7 @@ class GpExpandStatus(): def _sync_status_file(self): """Syncs the gpexpand status file with the coordinator mirror""" - cpCmd = Scp('gpexpand copying status file to coordinator mirror', + cpCmd = Rsync('gpexpand copying status file to coordinator mirror', srcFile=self._status_filename, dstFile=self._status_standby_filename, dstHost=self._coordinator_mirror.getSegmentHostName()) @@ -512,7 +512,7 @@ class GpExpandStatus(): """ Sync the segment configuration backup file to standby """ if self._coordinator_mirror: self.logger.debug("Sync segment configuration backup file") - cpCmd = Scp('gpexpand copying segment configuration backup file to coordinator mirror', + cpCmd = Rsync('gpexpand copying segment configuration backup file to coordinator mirror', srcFile=self._gp_segment_configuration_backup, dstFile=self._segment_configuration_standby_filename, dstHost=self._coordinator_mirror.getSegmentHostName()) @@ -757,7 +757,7 @@ class SegmentTemplate: """Distributes template tar file to hosts""" for host in self.hosts: logger.debug('Copying tar file to %s' % host) - cpCmd = Scp(name='gpexpand distribute tar file to new hosts', + cpCmd = Rsync(name='gpexpand distribute tar file to new hosts', srcFile=self.schema_tar_file, dstFile=self.segTarDir, dstHost=host) @@ -846,7 +846,7 @@ class SegmentTemplate: localHostname = self.gparray.coordinator.getSegmentHostName() cmdName = 'gpexpand copying postgresql.conf to %s:%s/postgresql.conf' \ % (self.srcSegHostname, self.srcSegDataDir) - cpCmd = Scp(name=cmdName, srcFile=self.srcSegDataDir + '/postgresql.conf', + cpCmd = Rsync(name=cmdName, srcFile=self.srcSegDataDir + '/postgresql.conf', dstFile=self.tempDir, dstHost=localHostname, ctxt=REMOTE, remoteHost=self.srcSegHostname) cpCmd.run(validateAfter=True) @@ -854,7 +854,7 @@ class SegmentTemplate: self.logger.info('Copying pg_hba.conf from existing segment into template') cmdName = 'gpexpand copy pg_hba.conf to %s:%s/pg_hba.conf' \ % (self.srcSegHostname, self.srcSegDataDir) - cpCmd = Scp(name=cmdName, srcFile=self.srcSegDataDir + '/pg_hba.conf', + cpCmd = Rsync(name=cmdName, srcFile=self.srcSegDataDir + '/pg_hba.conf', dstFile=self.tempDir, dstHost=localHostname,ctxt=REMOTE, remoteHost=self.srcSegHostname) cpCmd.run(validateAfter=True) diff --git a/gpMgmt/bin/gpmemwatcher b/gpMgmt/bin/gpmemwatcher index 29895cbe75c..6569015bc09 100755 --- a/gpMgmt/bin/gpmemwatcher +++ b/gpMgmt/bin/gpmemwatcher @@ -171,7 +171,7 @@ def stopProcesses(host, workdir): return try: - subprocess.check_call('scp -q %s:%s/%s ./%s.%s' % (host, dest_dir, ps_file, host, ps_file), shell=True) + subprocess.check_call('rsync -q %s:%s/%s ./%s.%s' % (host, dest_dir, ps_file, host, ps_file), shell=True) except subprocess.CalledProcessError as e: print('Error retrieving data from host: ' + host, file=sys.stderr) print(e) diff --git a/gpMgmt/bin/gppylib/commands/gp.py b/gpMgmt/bin/gppylib/commands/gp.py index 46d2f636b7d..8aa208ba576 100644 --- a/gpMgmt/bin/gppylib/commands/gp.py +++ b/gpMgmt/bin/gppylib/commands/gp.py @@ -1169,8 +1169,8 @@ def distribute_tarball(queue,list,tarball): hostname = db.getSegmentHostName() datadir = db.getSegmentDataDirectory() (head,tail)=os.path.split(datadir) - scp_cmd=Scp(name="copy coordinator",srcFile=tarball,dstHost=hostname,dstFile=head) - queue.addCommand(scp_cmd) + rsync_cmd=Rsync(name="copy coordinator",srcFile=tarball,dstHost=hostname,dstFile=head) + queue.addCommand(rsync_cmd) queue.join() queue.check_results() logger.debug("distributeTarBall finished") diff --git a/gpMgmt/bin/gppylib/commands/unix.py b/gpMgmt/bin/gppylib/commands/unix.py index 361bac0f69b..3d0b0582d14 100644 --- a/gpMgmt/bin/gppylib/commands/unix.py +++ b/gpMgmt/bin/gppylib/commands/unix.py @@ -501,7 +501,7 @@ def filedir_exists(self): return (not self.results.rc) -# -------------scp------------------ +# -------------rsync------------------ # MPP-13617 def canonicalize(addr): @@ -510,10 +510,10 @@ def canonicalize(addr): return '[' + addr + ']' -class Scp(Command): +class Rsync(Command): def __init__(self, name, srcFile, dstFile, srcHost=None, dstHost=None, recursive=False, ctxt=LOCAL, remoteHost=None): - cmdStr = findCmdInPath('scp') + " " + cmdStr = findCmdInPath('rsync') + " " if recursive: cmdStr = cmdStr + "-r " diff --git a/gpMgmt/bin/gppylib/operations/buildMirrorSegments.py b/gpMgmt/bin/gppylib/operations/buildMirrorSegments.py index 91c537e4ffa..4bfe87ecd13 100644 --- a/gpMgmt/bin/gppylib/operations/buildMirrorSegments.py +++ b/gpMgmt/bin/gppylib/operations/buildMirrorSegments.py @@ -18,7 +18,7 @@ from gppylib.operations.utils import ParallelOperation, RemoteOperation from gppylib.system import configurationInterface as configInterface from gppylib.commands.gp import is_pid_postmaster, get_pid_from_remotehost -from gppylib.commands.unix import check_pid_on_remotehost, Scp +from gppylib.commands.unix import check_pid_on_remotehost from gppylib.programs.clsRecoverSegment_triples import RecoveryTriplet diff --git a/gpMgmt/bin/gppylib/operations/package.py b/gpMgmt/bin/gppylib/operations/package.py index 7ba8d34affa..f1e73563846 100644 --- a/gpMgmt/bin/gppylib/operations/package.py +++ b/gpMgmt/bin/gppylib/operations/package.py @@ -11,7 +11,7 @@ from gppylib import gplog from gppylib.commands import gp from gppylib.commands.base import Command, REMOTE, WorkerPool, ExecutionError - from gppylib.commands.unix import Scp + from gppylib.commands.unix import Rsync from gppylib.gpversion import GpVersion from gppylib.mainUtils import ExceptionNoStackTraceNeeded from gppylib.operations import Operation @@ -302,7 +302,7 @@ class RemoteCommand(Operation): """ DEPRECATED - TODO: AK: Rename as GpSsh, like GpScp below. + TODO: AK: Rename as GpSsh, like GpRsync below. """ def __init__(self, cmd_str, host_list): @@ -1019,7 +1019,7 @@ def execute(self): for package in install_package_set: logger.debug('copying %s to %s' % (package, self.host)) dstFile = os.path.join(GPHOME, package) - Scp(name='copying %s to %s' % (package, self.host), + Rsync(name='copying %s to %s' % (package, self.host), srcFile=os.path.join(GPPKG_ARCHIVE_PATH, package), dstFile=dstFile, dstHost=self.host).run(validateAfter=True) @@ -1072,12 +1072,12 @@ def execute(self): if linux_distribution_id() == 'ubuntu': # install package on segments if self.segment_host_list: - GpScp(srcFile, dstFile, self.segment_host_list).run() + GpRsync(srcFile, dstFile, self.segment_host_list).run() HostOperation(InstallDebPackageLocally(dstFile), self.segment_host_list).run() # install package on standby if self.standby_host: - Scp(name='copying %s to %s' % (srcFile, self.standby_host), + Rsync(name='copying %s to %s' % (srcFile, self.standby_host), srcFile=srcFile, dstFile=dstFile, dstHost=self.standby_host).run(validateAfter=True) @@ -1088,12 +1088,12 @@ def execute(self): else: # install package on segments if self.segment_host_list: - GpScp(srcFile, dstFile, self.segment_host_list).run() + GpRsync(srcFile, dstFile, self.segment_host_list).run() HostOperation(InstallPackageLocally(dstFile), self.segment_host_list).run() # install package on standby if self.standby_host: - Scp(name='copying %s to %s' % (srcFile, self.standby_host), + Rsync(name='copying %s to %s' % (srcFile, self.standby_host), srcFile=srcFile, dstFile=dstFile, dstHost=self.standby_host).run(validateAfter=True) @@ -1408,14 +1408,14 @@ def execute(self): # distribute package to segments srcFile = self.gppkg.abspath dstFile = os.path.join(GPHOME, self.gppkg.pkg) - GpScp(srcFile, dstFile, self.segment_host_list).run() + GpRsync(srcFile, dstFile, self.segment_host_list).run() # update package on segments HostOperation(UpdatePackageLocally(dstFile), self.segment_host_list).run() # update package on standby if self.standby_host: - Scp(name='copying %s to %s' % (srcFile, self.standby_host), + Rsync(name='copying %s to %s' % (srcFile, self.standby_host), srcFile=srcFile, dstFile=dstFile, dstHost=self.standby_host).run(validateAfter=True) @@ -1552,7 +1552,7 @@ def execute(self): logger.info('The package migration has completed.') -class GpScp(Operation): +class GpRsync(Operation): """ TODO: AK: This obviously does not belong here. My preference would be that it remain here until the following problem is solved. @@ -1562,14 +1562,14 @@ class GpScp(Operation): I suggest: We consume an extra parameter 'fanout'. We partition the host_list into a number of buckets - given by 'fanout'. For each bucket, we scp the artifact to the first host in the bucket, and then - we recursively invoke GpScp on that machine for the remaining hosts in its bucket. + given by 'fanout'. For each bucket, we rsync the artifact to the first host in the bucket, and then + we recursively invoke GpRsync on that machine for the remaining hosts in its bucket. - GpScp := ParallelOperation([ A(i) for i in range(0, n) ]) + GpRsync := ParallelOperation([ A(i) for i in range(0, n) ]) A := SerialOperation(B, C) - B := scp source_path target_path @ host_i + B := rsync source_path target_path @ host_i where host_i := the first host in the ith bucket - C := RemoteOperation(GpScp(target_path, target_path, host_list_i)) + C := RemoteOperation(GpRsync(target_path, target_path, host_list_i)) where host_list_i := the remaining hosts in the ith bucket """ @@ -1582,7 +1582,7 @@ def __init__(self, source_path, target_path, host_list): def execute(self): self.pool = WorkerPool() for host in self.host_list: - self.pool.addCommand(Scp(name='copying %s to %s' % (self.source_path, host), + self.pool.addCommand(Rsync(name='copying %s to %s' % (self.source_path, host), srcFile=self.source_path, dstFile=self.target_path, dstHost=host)) diff --git a/gpMgmt/bin/gppylib/operations/test/regress/test_package/__init__.py b/gpMgmt/bin/gppylib/operations/test/regress/test_package/__init__.py index 692ca12d6e0..e5c8c0d8f80 100644 --- a/gpMgmt/bin/gppylib/operations/test/regress/test_package/__init__.py +++ b/gpMgmt/bin/gppylib/operations/test/regress/test_package/__init__.py @@ -10,11 +10,11 @@ from gppylib.gpversion import MAIN_VERSION from contextlib import closing from gppylib.commands import gp -from gppylib.commands.unix import Scp +from gppylib.commands.unix import Rsync from gppylib.commands.base import Command, ExecutionError, REMOTE from gppylib.operations import Operation from gppylib.operations.unix import CheckFile, CheckRemoteFile, RemoveRemoteFile -from gppylib.operations.package import dereference_symlink, GpScp, linux_distribution_id, linux_distribution_version +from gppylib.operations.package import dereference_symlink, GpRsync, linux_distribution_id, linux_distribution_version from gppylib.commands.base import Command, REMOTE def get_os(): diff --git a/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals.py b/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals.py index 011e4fd996a..38ceb55cda7 100755 --- a/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals.py +++ b/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals.py @@ -5,9 +5,9 @@ import shutil from contextlib import closing -from gppylib.commands.unix import Scp +from gppylib.commands.unix import Rsync from gppylib.commands.base import ExecutionError -from gppylib.operations.package import GpScp +from gppylib.operations.package import GpRsync from gppylib.operations.unix import RemoveRemoteFile from gppylib.operations.test.regress.test_package import GppkgTestCase, unittest, get_host_list, ARCHIVE_PATH, RPM_DATABASE, run_command, skipIfSingleNode @@ -132,7 +132,7 @@ def test06_delete_package_from_archive_on_segment_and_install(self): try: self.install(gppkg_file) except ExecutionError as e: - Scp(name = "copy gppkg to segment", + Rsync(name = "copy gppkg to segment", srcFile = gppkg_file, dstFile = archive_file, srcHost = None, @@ -159,7 +159,7 @@ def test07_delete_package_from_archive_on_segment_and_uninstall(self): try: self.remove(gppkg_file) except ExecutionError as e: - GpScp(source_path = gppkg_file, + GpRsync(source_path = gppkg_file, target_path = archive_file, host_list = segment_host_list).run() self.fail("ExecutionError %s" % str(e)) @@ -187,7 +187,7 @@ def test08_uninstall_rpm_on_segments_and_install(self): #Install the rpm with closing(tarfile.open(self.alpha_spec.get_filename())) as tf: tf.extract(self.A_spec.get_filename()) - Scp(name = "copy rpm to segment", + Rsync(name = "copy rpm to segment", srcFile = self.A_spec.get_filename(), dstFile = self.A_spec.get_filename(), srcHost = None, @@ -229,7 +229,7 @@ def test10_install_rpm_on_segments_and_install(self): #Install the rpm with closing(tarfile.open(self.alpha_spec.get_filename())) as tf: tf.extract(self.A_spec.get_filename()) - Scp(name = "copy rpm to segment", + Rsync(name = "copy rpm to segment", srcFile = self.A_spec.get_filename(), dstFile = self.A_spec.get_filename(), srcHost = None, @@ -255,7 +255,7 @@ def test11_install_rpm_on_segments_and_uninstall(self): #Install the rpm with closing(tarfile.open(self.alpha_spec.get_filename())) as tf: tf.extract(self.A_spec.get_filename()) - Scp(name = "copy rpm to segment", + Rsync(name = "copy rpm to segment", srcFile = self.A_spec.get_filename(), dstFile = self.A_spec.get_filename(), srcHost = None, diff --git a/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals_on_standby.py b/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals_on_standby.py index 1e8bd93a73f..e7fcf823688 100755 --- a/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals_on_standby.py +++ b/gpMgmt/bin/gppylib/operations/test/regress/test_package/test_regress_muck_with_internals_on_standby.py @@ -5,8 +5,8 @@ from contextlib import closing from gppylib.commands.base import ExecutionError -from gppylib.commands.unix import Scp -from gppylib.operations.package import GpScp +from gppylib.commands.unix import Rsync +from gppylib.operations.package import GpRsync from gppylib.operations.test.regress.test_package import GppkgTestCase, unittest, skipIfNoStandby, get_host_list, ARCHIVE_PATH, run_command from gppylib.operations.unix import RemoveRemoteFile @@ -25,7 +25,7 @@ def test00_delete_package_from_archive_on_standby_and_install(self): try: self.install(gppkg_file) except ExecutionError as e: - Scp(name = "copy gppkg to standby", + Rsync(name = "copy gppkg to standby", srcFile = gppkg_file, dstFile = archive_file, srcHost = None, @@ -48,10 +48,10 @@ def test01_delete_package_from_archive_on_standby_and_uninstall(self): try: self.remove(gppkg_file) except ExecutionError as e: - GpScp(source_path = gppkg_file, + GpRsync(source_path = gppkg_file, target_path = archive_file, host_list = get_host_list()[1]).run() - Scp(name = "copy gppkg to standby", + Rsync(name = "copy gppkg to standby", srcFile = gppkg_file, dstFile = archive_file, srcHost = None, @@ -73,7 +73,7 @@ def test02_uninstall_rpm_on_standby_and_install(self): #Install the rpm with closing(tarfile.open(self.alpha_spec.get_filename())) as tf: tf.extract(self.A_spec.get_filename()) - Scp(name = "copy rpm to standby", + Rsync(name = "copy rpm to standby", srcFile = self.A_spec.get_filename(), dstFile = self.A_spec.get_filename(), srcHost = None, @@ -101,7 +101,7 @@ def test04_install_rpm_on_standby_and_install(self): #Install the rpm with closing(tarfile.open(self.alpha_spec.get_filename())) as tf: tf.extract(self.A_spec.get_filename()) - Scp(name = "copy the rpm to standby", + Rsync(name = "copy the rpm to standby", srcFile = self.A_spec.get_filename(), dstFile = self.A_spec.get_filename(), srcHost = None, @@ -119,7 +119,7 @@ def test05_install_rpm_on_standby_and_uninstall(self): #Install the rpm with closing(tarfile.open(self.alpha_spec.get_filename())) as tf: tf.extract(self.A_spec.get_filename()) - Scp(name = "copy rpm to standby", + Rsync(name = "copy rpm to standby", srcFile = self.A_spec.get_filename(), dstFile = self.A_spec.get_filename(), srcHost = None, diff --git a/gpMgmt/bin/gppylib/operations/test/test_package.py b/gpMgmt/bin/gppylib/operations/test/test_package.py index d4f65648d12..202a53db17c 100755 --- a/gpMgmt/bin/gppylib/operations/test/test_package.py +++ b/gpMgmt/bin/gppylib/operations/test/test_package.py @@ -12,7 +12,7 @@ from gppylib.gparray import GpArray from contextlib import closing from gppylib.commands import gp -from gppylib.commands.unix import Scp +from gppylib.commands.unix import Rsync from gppylib.commands.base import Command, ExecutionError from gppylib.operations import Operation from gppylib.operations.unix import CheckFile, RemoveRemoteFile @@ -632,7 +632,7 @@ def test04_delete_package_from_archive_on_segment(self): try: self.install(gppkg_file) except ExecutionError as e: - Scp(name="copy to segment", srcFile=gppkg_file, dstFile=archive_file, srcHost=None, + Rsync(name="copy to segment", srcFile=gppkg_file, dstFile=archive_file, srcHost=None, dstHost=segment_host_list[0]).run(validateAfter=True) self.fail("ExecutionError %s" % e) @@ -653,7 +653,7 @@ def test05_delete_package_from_archive_on_standby(self): try: self.install(gppkg_file) except ExecutionError as e: - Scp(name="copy to segment", srcFile=gppkg_file, dstFile=archive_file, srcHost=None, dstHost=standby).run( + Rsync(name="copy to segment", srcFile=gppkg_file, dstFile=archive_file, srcHost=None, dstHost=standby).run( validateAfter=True) self.fail("ExecutionError %s" % e) diff --git a/gpMgmt/bin/gppylib/test/unit/test_unit_package.py b/gpMgmt/bin/gppylib/test/unit/test_unit_package.py index 8ea90face04..a4d3288e37b 100644 --- a/gpMgmt/bin/gppylib/test/unit/test_unit_package.py +++ b/gpMgmt/bin/gppylib/test/unit/test_unit_package.py @@ -183,7 +183,7 @@ def setUp(self): patch('gppylib.operations.package.MakeDir'), patch('gppylib.operations.package.CheckRemoteDir'), patch('gppylib.operations.package.MakeRemoteDir'), - patch('gppylib.operations.package.Scp'), + patch('gppylib.operations.package.Rsync'), patch('gppylib.operations.package.RemoteOperation'), patch('gppylib.operations.package.RemoveRemoteFile'), patch('gppylib.operations.package.InstallPackageLocally'), @@ -199,7 +199,7 @@ def setUp(self): self.mock_listdir = self.get_mock_from_apply_patch('listdir') self.mock_command = self.get_mock_from_apply_patch('Command') self.mock_logger = self.get_mock_from_apply_patch('logger') - self.mock_scp = self.get_mock_from_apply_patch('Scp') + self.mock_rsync = self.get_mock_from_apply_patch('Rsync') self.mock_install_packages_locally = self.get_mock_from_apply_patch('InstallPackageLocally') @@ -228,7 +228,7 @@ def test__execute_install_on_segments_when_package_are_missing(self): hostname = 'localhost' subject = SyncPackages(hostname) subject.execute() - self.assertEqual(self.mock_scp.call_count, 2) + self.assertEqual(self.mock_rsync.call_count, 2) self.assertEqual(self.make_dir_mock.call_count, 1) self.assertEqual(self.make_remote_dir_mock.call_count, 1) diff --git a/gpMgmt/bin/gpssh-exkeys b/gpMgmt/bin/gpssh-exkeys index 1a1b2ec91dd..cf9853360fd 100755 --- a/gpMgmt/bin/gpssh-exkeys +++ b/gpMgmt/bin/gpssh-exkeys @@ -751,7 +751,7 @@ try: for h in GV.newHosts: - cmd = ('scp -q -o "BatchMode yes" -o "NumberOfPasswordPrompts 0" ' + + cmd = ('rsync -q -e "ssh -o BatchMode=yes -o NumberOfPasswordPrompts=0" ' + '%s %s %s %s %s:.ssh/ 2>&1' % (GV.authorized_keys_fname, GV.known_hosts_fname, @@ -792,7 +792,7 @@ try: remoteIdentity = GV.id_rsa_fname remoteIdentityPub = GV.id_rsa_pub_fname - cmd = ('scp -q -o "BatchMode yes" -o "NumberOfPasswordPrompts 0" ' + + cmd = ('rsync -q -e "ssh -o BatchMode=yes -o NumberOfPasswordPrompts=0" ' + '%s %s %s %s %s:.ssh/ 2>&1' % (remoteAuthKeysFile, remoteKnownHostsFile, diff --git a/gpMgmt/bin/gpscp b/gpMgmt/bin/gpsync similarity index 86% rename from gpMgmt/bin/gpscp rename to gpMgmt/bin/gpsync index 21be83e90d6..e656ca97a88 100755 --- a/gpMgmt/bin/gpscp +++ b/gpMgmt/bin/gpsync @@ -1,15 +1,16 @@ #!/usr/bin/env python3 ''' -gpscp -- scp to multiple hosts at once +gpsync -- rsync to multiple hosts at once -Usage: gpscp [--version] [-?v] [-r] [-p port] [-u user] +Usage: gpsync [--version] [-?v] [-r] [-a] [-p port] [-u user] [-h host] [-f hostfile] [-J host_substitution_character] [[user@]host1:]file1 [...] [[user@]hosts2:]file2 --version : print version information -? : print this help screen -v : verbose mode -r : recursively copy entire directories + -a : archive mode; equals -rlptgoD (no -H,-A,-X) -h host : ssh host to connect to (multiple -h is okay) -f file : a file listing all hosts to connect to -J character : character to be substitute as hostname [default='='] @@ -36,6 +37,7 @@ class Global: opt['-f'] = None opt['-J'] = '=:' opt['-r'] = False + opt['-a'] = False filePath = [] @@ -61,7 +63,7 @@ def print_version(): ############# def parseCommandLine(): try: - (options, args) = getopt.getopt(sys.argv[1:], '?vrJ:p:u:h:f:', ['version']) + (options, args) = getopt.getopt(sys.argv[1:], '?vraJ:p:u:h:f:', ['version']) except Exception as e: usage('Error: ' + str(e)) @@ -78,6 +80,8 @@ def parseCommandLine(): GV.opt[switch] = val + ':' elif (switch == '-r'): GV.opt[switch] = True + elif (switch == '-a'): + GV.opt[switch] = True elif (switch == '--version'): print_version() @@ -109,13 +113,14 @@ try: if len(GV.opt['-h']) == 0: usage('Error: missing hosts in -h and/or -f arguments') - scp = 'scp -o "BatchMode yes" -o "StrictHostKeyChecking no"' - if GV.opt['-r']: scp += ' -r' + rsync = 'rsync -e "ssh -o BatchMode=yes -o StrictHostKeyChecking=no"' + if GV.opt['-r']: rsync += ' -r' + if GV.opt['-a']: rsync += ' -a' proc = [] for peer in GV.opt['-h']: peer = canonicalize_address(peer) # MPP-13617 - cmd = scp + ' ' + cmd = rsync + ' ' for f in GV.filePath: cmd += f.replace(GV.opt['-J'], '%s:' % peer) + ' ' if GV.opt['-v']: print('[INFO]', cmd) diff --git a/gpMgmt/bin/lib/gp_bash_functions.sh b/gpMgmt/bin/lib/gp_bash_functions.sh index dc996cb6411..e23f7d9f5ad 100755 --- a/gpMgmt/bin/lib/gp_bash_functions.sh +++ b/gpMgmt/bin/lib/gp_bash_functions.sh @@ -87,7 +87,7 @@ MV=`findCmdInPath mv` MKDIR=`findCmdInPath mkdir` PING=`findCmdInPath ping` RM=`findCmdInPath rm` -SCP=`findCmdInPath scp` +RSYNC=`findCmdInPath rsync` SED=`findCmdInPath sed` SLEEP=`findCmdInPath sleep` SORT=`findCmdInPath sort` @@ -158,7 +158,7 @@ PG_CONF=postgresql.conf PG_INTERNAL_CONF=internal.auto.conf PG_HBA=pg_hba.conf if [ x"$TRUSTED_SHELL" = x"" ]; then TRUSTED_SHELL="$SSH"; fi -if [ x"$TRUSTED_COPY" = x"" ]; then TRUSTED_COPY="$SCP"; fi +if [ x"$TRUSTED_COPY" = x"" ]; then TRUSTED_COPY="$RSYNC "; fi PG_CONF_ADD_FILE=$WORKDIR/postgresql_conf_gp_additions DEFAULTDB=template1 ETCD_CONFIG_TMP_FILE=/tmp/cbdb_etcd.conf diff --git a/gpMgmt/test/behave/mgmt_utils/gppkg.feature b/gpMgmt/test/behave/mgmt_utils/gppkg.feature index 68adcd247ea..b6a070e901d 100644 --- a/gpMgmt/test/behave/mgmt_utils/gppkg.feature +++ b/gpMgmt/test/behave/mgmt_utils/gppkg.feature @@ -120,7 +120,7 @@ Feature: gppkg tests Scenario: gppkg --migrate copies all packages from coordinator to all segment hosts Given the database is running And the user runs "gppkg -r sample" - And a gphome copy is created at /tmp/gppkg_migrate on all hosts + And a gphome copy is created at /tmp/gppkg_migrate/ on all hosts When a user runs "COORDINATOR_DATA_DIRECTORY=$COORDINATOR_DATA_DIRECTORY gppkg -r sample" with gphome "/tmp/gppkg_migrate" And "sample" gppkg files do not exist on any hosts When a user runs "COORDINATOR_DATA_DIRECTORY=$COORDINATOR_DATA_DIRECTORY gppkg --install $(pwd)/test/behave/mgmt_utils/steps/data/sample.gppkg" with gphome "/tmp/gppkg_migrate" diff --git a/gpMgmt/test/behave/mgmt_utils/steps/gpconfig_mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/gpconfig_mgmt_utils.py index 9913ae5b937..c8e2dfc07fb 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/gpconfig_mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/gpconfig_mgmt_utils.py @@ -29,8 +29,8 @@ def impl(context): os.mkdir(segment_tmp_directory) backup_path = path.join(segment_tmp_directory, 'postgresql.conf') original_path = path.join(segment.datadir, 'postgresql.conf') - copy_command = ('scp %s:%s %s' % (segment.hostname, original_path, backup_path)).split(' ') - restore_command = ('scp %s %s:%s' % (backup_path, segment.hostname, original_path)).split(' ') + copy_command = ('rsync %s:%s %s' % (segment.hostname, original_path, backup_path)).split(' ') + restore_command = ('rsync %s %s:%s' % (backup_path, segment.hostname, original_path)).split(' ') restore_commands.append(restore_command) subprocess.check_call(copy_command) diff --git a/gpMgmt/test/behave/mgmt_utils/steps/gpssh_exkeys_mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/gpssh_exkeys_mgmt_utils.py index 5add9165f57..9e8d57d8a65 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/gpssh_exkeys_mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/gpssh_exkeys_mgmt_utils.py @@ -295,7 +295,7 @@ def impl(context): # This blows away any existing authorized_keys file on the segments. subprocess.check_call([ - 'gpscp', + 'gpsync', '-v', ] + host_opts + [ '~/.ssh/id_rsa.pub', diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py index 9b455177079..24049c068b5 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py @@ -23,8 +23,9 @@ from gppylib.gparray import GpArray, ROLE_PRIMARY, ROLE_MIRROR from gppylib.commands.gp import SegmentStart, GpStandbyStart, CoordinatorStop from gppylib.commands import gp -from gppylib.commands.unix import findCmdInPath, Scp +from gppylib.commands.pg import PgBaseBackup from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS +from gppylib.operations.buildMirrorSegments import get_recovery_progress_pattern from gppylib.operations.unix import ListRemoteFilesByPattern, CheckRemoteFile from test.behave_utils.gpfdist_utils.gpfdist_mgmt import Gpfdist from test.behave_utils.utils import * @@ -34,7 +35,7 @@ from gppylib.commands.base import Command, REMOTE from gppylib import pgconf from gppylib.operations.package import linux_distribution_id, linux_distribution_version - +from gppylib.commands.gp import get_coordinatordatadir coordinator_data_dir = gp.get_coordinatordatadir() if coordinator_data_dir is None: @@ -340,7 +341,7 @@ def impl(context, dbname): drop_database(context, dbname) -@given('{env_var} environment variable is not set') +@given('"{env_var}" environment variable is not set') def impl(context, env_var): if not hasattr(context, 'orig_env'): context.orig_env = dict() @@ -349,7 +350,15 @@ def impl(context, env_var): if env_var in os.environ: del os.environ[env_var] -@then('{env_var} environment variable should be restored') +@given('the environment variable "{env_var}" is set to "{val}"') +def impl(context, env_var, val): + if not hasattr(context, 'orig_env'): + context.orig_env = dict() + + context.orig_env[env_var] = os.environ.get(env_var) + os.environ[env_var] = val + +@then('"{env_var}" environment variable should be restored') def impl(context, env_var): if not hasattr(context, 'orig_env'): raise Exception('%s can not be reset' % env_var) @@ -357,11 +366,28 @@ def impl(context, env_var): if env_var not in context.orig_env: raise Exception('%s can not be reset.' % env_var) - os.environ[env_var] = context.orig_env[env_var] + if context.orig_env[env_var] is None: + del os.environ[env_var] + else: + os.environ[env_var] = context.orig_env[env_var] del context.orig_env[env_var] + +@given('all files in pg_wal directory are deleted from data directory of preferred primary of content {content_ids}') +def impl(context, content_ids): + all_segments = GpArray.initFromCatalog(dbconn.DbURL()).getDbList() + segments = filter(lambda seg: seg.getSegmentPreferredRole() == ROLE_PRIMARY and + seg.getSegmentContentId() in [int(c) for c in content_ids.split(',')], all_segments) + for seg in segments: + cmd = Command(name="Remove pg_wal files", + cmdStr='rm -rf {}'.format(os.path.join(seg.getSegmentDataDirectory(), 'pg_wal')), + remoteHost=seg.getSegmentHostName(), ctxt=REMOTE) + cmd.run(validateAfter=True) + + @given('the user {action} the walsender on the {segment} on content {content}') +@when('the user {action} the walsender on the {segment} on content {content}') @then('the user {action} the walsender on the {segment} on content {content}') def impl(context, action, segment, content): if segment == 'mirror': @@ -401,11 +427,84 @@ def impl(context, segment, content): @then('the user waits until all bytes are sent to mirror on content {content}') def impl(context, content): host, port = get_primary_segment_host_port_for_content(content) - query = "SELECT CASE WHEN sync_state='sync' THEN (pg_current_wal_lsn() - sent_lsn) ELSE 1 END FROM pg_stat_replication;" + query = "SELECT pg_current_wal_lsn() - sent_lsn FROM pg_stat_replication;" desired_result = 0 - wait_for_desired_query_result_on_segment(host, port, query, desired_result) + dburl = dbconn.DbURL(hostname=host, port=port, dbname='template1') + wait_for_desired_query_result(dburl, query, desired_result, utility=True) + + # Wait for replication state to be in 'sync' + query = "SELECT sync_state FROM pg_stat_replication;" + desired_result = 'sync' + dburl = dbconn.DbURL(hostname=host, port=port, dbname='template1') + wait_for_desired_query_result(dburl, query, desired_result, utility=True) + +@given('the user waits until recovery_progress.file is created in {logdir} and verifies its format') +@when('the user waits until recovery_progress.file is created in {logdir} and verifies its format') +@then('the user waits until recovery_progress.file is created in {logdir} and verifies its format') +def impl(context, logdir): + attempt = 0 + num_retries = 60000 + log_dir = _get_gpAdminLogs_directory() if logdir == 'gpAdminLogs' else logdir + recovery_progress_file = '{}/recovery_progress.file'.format(log_dir) + while attempt < num_retries: + attempt += 1 + if os.path.exists(recovery_progress_file): + with open(recovery_progress_file, 'r') as fp: + context.recovery_lines = fp.readlines() + for line in context.recovery_lines: + recovery_type, dbid, progress = line.strip().split(':', 2) + progress_pattern = re.compile(get_recovery_progress_pattern()) + # TODO: assert progress line in the actual hosts bb/rewind progress file + if re.search(progress_pattern, progress) and dbid.isdigit() and recovery_type in ['full', 'incremental']: + return + else: + raise Exception('File present but incorrect format line "{}"'.format(line)) + time.sleep(0.01) + if attempt == num_retries: + raise Exception('Timed out after {} retries'.format(num_retries)) + + +@then( 'verify if the gprecoverseg.lock directory is present in coordinator_data_directory') +def impl(context): + gprecoverseg_lock_file = "%s/gprecoverseg.lock" % gp.get_coordinatordatadir() + if not os.path.exists(gprecoverseg_lock_file): + raise Exception('gprecoverseg.lock directory does not exist') + else: + return +@then('verify that lines from recovery_progress.file are present in segment progress files in {logdir}') +def impl(context, logdir): + all_progress_lines_by_dbid = {} + for line in context.recovery_lines: + recovery_type, dbid, line_from_combined_progress_file = line.strip().split(':', 2) + all_progress_lines_by_dbid[int(dbid)] = [recovery_type, line_from_combined_progress_file] + + all_segments = GpArray.initFromCatalog(dbconn.DbURL()).getDbList() + + log_dir = _get_gpAdminLogs_directory() if logdir == 'gpAdminLogs' else logdir + for seg in all_segments: + seg_dbid = seg.getSegmentDbId() + if seg_dbid in all_progress_lines_by_dbid: + recovery_type, line_from_combined_progress_file = all_progress_lines_by_dbid[seg_dbid] + process_name = 'pg_basebackup' if recovery_type == 'full' else 'pg_rewind' + seg_progress_file = '{}/{}.*.dbid{}.out'.format(log_dir, process_name, seg_dbid) + check_cmd_str = 'grep "{}" {}'.format(line_from_combined_progress_file, seg_progress_file) + check_cmd = Command(name='check line in segment progress file', + cmdStr=check_cmd_str, + ctxt=REMOTE, + remoteHost=seg.getSegmentHostName()) + check_cmd.run() + if check_cmd.get_return_code() != 0: + raise Exception('Expected line {} in segment progress file {} on host {} but not found.' + .format(line_from_combined_progress_file, seg_progress_file, seg.getSegmentHostName())) + + +@then('recovery_progress.file should not exist in {logdir}') +def impl(context, logdir): + log_dir = _get_gpAdminLogs_directory() if logdir == 'gpAdminLogs' else logdir + if os.path.exists('{}/recovery_progress.file'.format(log_dir)): + raise Exception('recovery_progress.file is still present under {}'.format(log_dir)) def backup_bashrc(): home_dir = os.environ.get('HOME') @@ -439,6 +538,7 @@ def restore_bashrc(): def impl(context, command): run_gpcommand(context, command) + @when('the user sets banner on host') def impl(context): file = '~/.bashrc' @@ -518,14 +618,43 @@ def impl(context, process_name, secs): run_async_command(context, command) -@when('the user asynchronously sets up to end gpinitsystem process when {log_msg} is printed in the logs') -def impl(context, log_msg): +@when('the user asynchronously sets up to end {process_name} process when {log_msg} is printed in the logs') +def impl(context, process_name, log_msg): command = "while sleep 0.1; " \ - "do if egrep --quiet %s ~/gpAdminLogs/gpinitsystem*log ; " \ - "then ps ux | grep bin/gpinitsystem |awk '{print $2}' | xargs kill ;break 2; " \ - "fi; done" % (log_msg) + "do if egrep --quiet %s ~/gpAdminLogs/%s*log ; " \ + "then ps ux | grep bin/%s |awk '{print $2}' | xargs kill ;break 2; " \ + "fi; done" % (log_msg, process_name, process_name) run_async_command(context, command) +@then('the user asynchronously sets up to end {kill_process_name} process when {log_msg} is printed in the {logfile_name} logs') +def impl(context, kill_process_name, log_msg, logfile_name): + command = "while sleep 0.1; " \ + "do if egrep --quiet %s ~/gpAdminLogs/%s*log ; " \ + "then ps ux | grep bin/%s |awk '{print $2}' | xargs kill -2 ;break 2; " \ + "fi; done" % (log_msg, logfile_name, kill_process_name) + run_async_command(context, command) + +@given('the user asynchronously sets up to end {process_name} process with SIGINT') +@when('the user asynchronously sets up to end {process_name} process with SIGINT') +@then('the user asynchronously sets up to end {process_name} process with SIGINT') +def impl(context, process_name): + command = "ps ux | grep bin/%s | awk '{print $2}' | xargs kill -2" % (process_name) + run_async_command(context, command) + + +@given('the user asynchronously sets up to end {process_name} process with SIGHUP') +@when('the user asynchronously sets up to end {process_name} process with SIGHUP') +@then('the user asynchronously sets up to end {process_name} process with SIGHUP') +def impl(context, process_name): + command = "ps ux | grep bin/%s | awk '{print $2}' | xargs kill -9" % (process_name) + run_async_command(context, command) + +@given('the user asynchronously ends {process_name} process with SIGHUP') +@when('the user asynchronously ends {process_name} process with SIGHUP') +@then('the user asynchronously ends {process_name} process with SIGHUP') +def impl(context, process_name): + command = "ps ux | grep %s | awk '{print $2}' | xargs kill -9" % (process_name) + run_async_command(context, command) @when('the user asynchronously sets up to end gpcreateseg process when it starts') def impl(context): @@ -559,6 +688,7 @@ def impl(context, ret_code): @when('the user waits until saved async process is completed') +@then('the user waits until saved async process is completed') def impl(context): context.asyncproc.communicate2() @@ -698,9 +828,45 @@ def impl(context, command, out_msg, num): raise Exception("Expected %s to occur %s times. Found %d. stdout: %s" % (out_msg, num, count, msg_list)) +@given('the user records the current timestamp in log_timestamp table') +@when('the user records the current timestamp in log_timestamp table') +@then('the user records the current timestamp in log_timestamp table') +def impl(context): + sql = "CREATE TABLE log_timestamp AS SELECT CURRENT_TIMESTAMP;" + rc, output, error = run_cmd("psql -d template1 -c \'%s\'" %sql) + if rc: + raise Exception(error) + + +@then('the user drops log_timestamp table') +def impl(context): + rc, output, error = run_cmd("psql -d template1 -c \"DROP TABLE log_timestamp;\"") + if rc: + raise Exception(error) + + +@then('the pg_log files on primary segments should not contain "{msg}"') +def impl(context, msg): + + gparray = GpArray.initFromCatalog(dbconn.DbURL()) + segments = gparray.getDbList() + conn = dbconn.connect(dbconn.DbURL(dbname='template1'), unsetSearchPath=False) + + for seg in segments: + if seg.isSegmentPrimary(): + segname = "seg"+str(seg.content) + sql = "select * from gp_toolkit.__gp_log_segment_ext where logsegment='%s' and logtime > (select * from log_timestamp) and logmessage like '%s'" %(segname, msg) + try: + cursor = dbconn.query(conn, sql) + if cursor.fetchone(): + raise Exception("Fatal message exists in pg_log file on primary segment %s" %segname) + finally: + pass + conn.close() + def lines_matching_both(in_str, str_1, str_2): lines = [x.strip() for x in in_str.split('\n')] - return [x for x in lines if x.count(str_1) and x.count(str_2)] + return [line for line in lines if line.count(str_1) and line.count(str_2)] @then('check if {command} ran "{called_command}" {num} times with args "{args}"') @@ -748,6 +914,21 @@ def impl(context): raise Exception('segments are not in sync after %d seconds' % (times * sleeptime)) +@then('the segments are synchronized for content {content_ids}') +def impl(context, content_ids): + if content_ids == 'None': + return + times = 60 + sleeptime = 10 + content_ids_to_check = [int(c) for c in content_ids.split(',')] + for i in range(times): + if are_segments_synchronized_for_content_ids(content_ids_to_check): + return + time.sleep(sleeptime) + + raise Exception('segments are not in sync after %d seconds' % (times * sleeptime)) + + @then('verify that there is no table "{tablename}" in "{dbname}"') def impl(context, tablename, dbname): dbname = replace_special_char_env(dbname) @@ -1203,6 +1384,8 @@ def stop_all_primary_or_mirror_segments_on_hosts(context, segment_type, hosts): @given('the {role} on content {contentID} is stopped') +@when('the {role} on content {contentID} is stopped') +@then('the {role} on content {contentID} is stopped') def stop_segments_on_contentID(context, role, contentID): if role not in ("primary", "mirror"): raise Exception("Expected segment_type to be 'primary' or 'mirror', but found '%s'." % role) @@ -1210,6 +1393,18 @@ def stop_segments_on_contentID(context, role, contentID): role = ROLE_PRIMARY if role == 'primary' else ROLE_MIRROR stop_segments(context, lambda seg: seg.getSegmentRole() == role and seg.content == int(contentID)) +@given('the {role} on content {contents} is stopped with the immediate flag') +@when('the {role} on content {contents} is stopped with the immediate flag') +@then('the {role} on content {contents} is stopped with the immediate flag') +def stop_segments_on_contentID(context, role, contents): + if role not in ("primary", "mirror"): + raise Exception("Expected segment_type to be 'primary' or 'mirror', but found '%s'." % role) + content_ids = [int(i) for i in contents.split(',')] + + role = ROLE_PRIMARY if role == 'primary' else ROLE_MIRROR + stop_segments_immediate(context, lambda seg: seg.getSegmentRole() == role and seg.content in content_ids) + + # where_clause is a lambda that takes a segment to select what segments to stop def stop_segments(context, where_clause): @@ -1224,6 +1419,17 @@ def stop_segments(context, where_clause): pipes.quote(os.environ.get("GPHOME")), pipes.quote(seg.getSegmentDataDirectory())) ]) + +@given('user immediately stops all {segment_type} processes for content {content}') +@then('user immediately stops all {segment_type} processes for content {content}') +def stop_all_primary_or_mirror_segments(context, segment_type, content): + if segment_type not in ("primary", "mirror"): + raise Exception("Expected segment_type to be 'primary' or 'mirror', but found '%s'." % segment_type) + content_ids = [int(i) for i in content.split(',')] + role = ROLE_PRIMARY if segment_type == 'primary' else ROLE_MIRROR + stop_segments_immediate(context, lambda seg: seg.getSegmentRole() == role and seg.content in content_ids) + + @given('user immediately stops all {segment_type} processes') @when('user immediately stops all {segment_type} processes') @then('user immediately stops all {segment_type} processes') @@ -1253,13 +1459,6 @@ def stop_segments_immediate(context, where_clause): def impl(context): wait_for_unblocked_transactions(context) - -@given('the environment variable "{var}" is set to "{val}"') -def impl(context, var, val): - context.env_var = os.environ.get(var) - os.environ[var] = val - - @given('below sql is executed in "{dbname}" db') @when('below sql is executed in "{dbname}" db') def impl(context, dbname): @@ -1342,24 +1541,22 @@ def impl(context): return -@given('the "{seg}" segment information is saved') -@when('the "{seg}" segment information is saved') -@then('the "{seg}" segment information is saved') -def impl(context, seg): - gparray = GpArray.initFromCatalog(dbconn.DbURL()) +@given('verify that mirror on content {content_ids} is {expected_status}') +@when('verify that mirror on content {content_ids} is {expected_status}') +@then('verify that mirror on content {content_ids} is {expected_status}') +def impl(context, content_ids, expected_status): + if content_ids == 'None': + return + if expected_status not in ('up', 'down'): + raise Exception("expected_status can only be 'up' or 'down'") - if seg == "primary": - primary_segs = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary()] - context.pseg = primary_segs[0] - context.pseg_data_dir = context.pseg.getSegmentDataDirectory() - context.pseg_hostname = context.pseg.getSegmentHostName() - context.pseg_dbid = context.pseg.getSegmentDbId() - elif seg == "mirror": - mirror_segs = [seg for seg in gparray.getDbList() if seg.isSegmentMirror()] - context.mseg = mirror_segs[0] - context.mseg_hostname = context.mseg.getSegmentHostName() - context.mseg_dbid = context.mseg.getSegmentDbId() - context.mseg_data_dir = context.mseg.getSegmentDataDirectory() + for content in content_ids.split(','): + if expected_status == 'up' and not is_segment_running(ROLE_MIRROR, int(content)): + raise Exception("mirror for content {} is not up".format(content)) + elif expected_status == 'down' and is_segment_running(ROLE_MIRROR, int(content)): + raise Exception("mirror for content {} is not down".format(content)) + + return @given('the cluster configuration has no segments where "{filter}"') @@ -1413,7 +1610,7 @@ def impl(context, seg): remoteHost=hostname, ctxt=REMOTE) cmd.run(validateAfter=True) - cmd = Command(name="Copy background script to remote host", cmdStr='scp %s %s:/tmp' % (filename, hostname)) + cmd = Command(name="Copy background script to remote host", cmdStr='rsync %s %s:/tmp' % (filename, hostname)) cmd.run(validateAfter=True) cmd = Command(name="Run Bg process to save pid", @@ -1463,23 +1660,20 @@ def impl(context, seg): pid_file = os.path.join(data_dir, 'postmaster.pid') pid_file_orig = pid_file + '.orig' - cmd = Command(name="Copy pid file", cmdStr='cp %s %s' % (pid_file_orig, pid_file), remoteHost=hostname, ctxt=REMOTE) - cmd.run(validateAfter=True) - - cpCmd = Command(name='copy pid file to coordinator for editing', cmdStr='scp %s:%s /tmp' % (hostname, pid_file)) + cpCmd = Command(name='copy pid file to coordinator for editing', cmdStr='rsync %s:%s /tmp' % (hostname, pid_file_orig)) cpCmd.run(validateAfter=True) - with open('/tmp/postmaster.pid', 'r') as fr: + with open('/tmp/postmaster.pid.orig', 'r') as fr: lines = fr.readlines() lines[0] = "%s\n" % context.bg_pid - with open('/tmp/postmaster.pid', 'w') as fw: + with open('/tmp/postmaster.pid.orig', 'w') as fw: fw.writelines(lines) cpCmd = Command(name='copy pid file to segment after editing', - cmdStr='scp /tmp/postmaster.pid %s:%s' % (hostname, pid_file)) + cmdStr='rsync /tmp/postmaster.pid.orig %s:%s' % (hostname, pid_file)) cpCmd.run(validateAfter=True) @@ -1503,7 +1697,7 @@ def impl(context, seg): cmd = Command(name="Copy pid file", cmdStr='cp %s %s' % (pid_file_orig, pid_file), remoteHost=hostname, ctxt=REMOTE) cmd.run(validateAfter=True) - cpCmd = Command(name='copy pid file to coordinator for editing', cmdStr='scp %s:%s /tmp' % (hostname, pid_file)) + cpCmd = Command(name='copy pid file to coordinator for editing', cmdStr='rsync %s:%s /tmp' % (hostname, pid_file)) cpCmd.run(validateAfter=True) @@ -1526,7 +1720,7 @@ def impl(context, seg): fw.writelines(lines) cpCmd = Command(name='copy pid file to segment after editing', - cmdStr='scp /tmp/postmaster.pid %s:%s' % (hostname, pid_file)) + cmdStr='rsync /tmp/postmaster.pid %s:%s' % (hostname, pid_file)) cpCmd.run(validateAfter=True) @@ -2388,10 +2582,10 @@ def impl(context, location): host_opts.extend(['-h', host]) subprocess.check_call([ - 'gpscp', - '-rv', + 'gpsync', + '-av', ] + host_opts + [ - os.getenv('GPHOME'), + os.getenv('GPHOME')+'/', '=:{}'.format(location), ]) @@ -2402,6 +2596,7 @@ def impl(context, location): ]) @given('all files in gpAdminLogs directory are deleted') +@when('all files in gpAdminLogs directory are deleted') @then('all files in gpAdminLogs directory are deleted') def impl(context): log_dir = _get_gpAdminLogs_directory() @@ -2409,6 +2604,40 @@ def impl(context): for file in files_found: os.remove(file) + +@given('all files in gpAdminLogs directory are deleted on hosts {hosts}') +def impl(context, hosts): + host_list = hosts.split(',') + log_dir = _get_gpAdminLogs_directory() + for host in host_list: + rm_cmd = Command(name="remove files in gpAdminLogs", + cmdStr="rm -rf {}/*".format(log_dir), + remoteHost=host, ctxt=REMOTE) + rm_cmd.run(validateAfter=True) + +@given('all files in "{dir}" directory are deleted on all hosts in the cluster') +@then('all files in "{dir}" directory are deleted on all hosts in the cluster') +def impl(context, dir): + host_list = GpArray.initFromCatalog(dbconn.DbURL()).getHostList() + for host in host_list: + rm_cmd = Command(name="remove files in {}".format(dir), + cmdStr="rm -rf {}/*".format(dir), + remoteHost=host, ctxt=REMOTE) + rm_cmd.run(validateAfter=True) + +@given('all files in gpAdminLogs directory are deleted on all hosts in the cluster') +@when('all files in gpAdminLogs directory are deleted on all hosts in the cluster') +@then('all files in gpAdminLogs directory are deleted on all hosts in the cluster') +def impl(context): + host_list = GpArray.initFromCatalog(dbconn.DbURL()).getHostList() + log_dir = _get_gpAdminLogs_directory() + for host in host_list: + rm_cmd = Command(name="remove files in gpAdminLogs", + cmdStr="rm -rf {}/*".format(log_dir), + remoteHost=host, ctxt=REMOTE) + rm_cmd.run(validateAfter=True) + + @then('gpAdminLogs directory {has} "{expected_file}" files') def impl(context, has, expected_file): log_dir = _get_gpAdminLogs_directory() @@ -2418,6 +2647,61 @@ def impl(context, has, expected_file): if (not files_found) and (has == 'has'): raise Exception("expected %s file in %s, but not found" % (expected_file, log_dir)) + +@then('gpAdminLogs directory has "{expected_file}" files on respective hosts only for content {content_ids}') +def impl(context, expected_file, content_ids): + content_list = [int(c) for c in content_ids.split(',')] + all_segments = GpArray.initFromCatalog(dbconn.DbURL()).getDbList() + segments = filter(lambda seg: seg.getSegmentRole() == ROLE_MIRROR and + seg.content in content_list, all_segments) + host_to_seg_dbids = {} + for seg in segments: + segHost = seg.getSegmentHostName() + if segHost in host_to_seg_dbids: + host_to_seg_dbids[segHost].append('dbid{}'.format(seg.dbid)) + else: + host_to_seg_dbids[segHost] = ['dbid{}'.format(seg.dbid)] + + for segHost, expected_files_on_host in host_to_seg_dbids.items(): + log_dir = "%s/gpAdminLogs" % os.path.expanduser("~") + listdir_cmd = Command(name="list logfiles on host", + cmdStr="ls -l {}/{}".format(log_dir, expected_file), + remoteHost=segHost, ctxt=REMOTE) + listdir_cmd.run(validateAfter=True) + ls_outs = listdir_cmd.get_results().stdout.split('\n') + files_found = [ls_line.split(' ')[-1] for ls_line in ls_outs if ls_line] + + if not files_found: + raise Exception("expected {} files in {} on host {}, but not found".format(expected_file, log_dir, segHost)) + + if len(files_found) != len(expected_files_on_host): + raise Exception("expected {} {} files in {} on host {}, but found {}: {}" + .format(len(expected_files_on_host), expected_file, log_dir, segHost, len(files_found), + files_found)) + for file in files_found: + if file.split('.')[-2] not in expected_files_on_host: + raise Exception("Found unexpected file {} in {}".format(file, log_dir)) + + +@then('gpAdminLogs directory {has} "{expected_file}" files on all segment hosts') +def impl(context, has, expected_file): + all_segments = GpArray.initFromCatalog(dbconn.DbURL()).getDbList() + all_segment_hosts = [seg.getSegmentHostName() for seg in all_segments if seg.getSegmentContentId() >= 0] + + for seg_host in all_segment_hosts: + log_dir = "%s/gpAdminLogs" % os.path.expanduser("~") + listdir_cmd = Command(name="list logfiles on host", + cmdStr="ls -l {}/{} | wc -l".format(log_dir, expected_file), + remoteHost=seg_host, ctxt=REMOTE) + listdir_cmd.run(validateAfter=True) + ls_outs = listdir_cmd.get_results().stdout + files_found = int(ls_outs) + if files_found > 0 and (has == 'has no'): + raise Exception("expected no {} files in {} on host {}, but found".format(expected_file, log_dir, seg_host)) + if files_found == 0 and (has == 'has'): + raise Exception("expected {} files in {} on host {}, but not found".format(expected_file, log_dir, seg_host)) + + @given('"{filepath}" is copied to the install directory') def impl(context, filepath): gphome = os.getenv("GPHOME") @@ -2646,6 +2930,7 @@ def step_impl(context, segment_id): wait_for_unblocked_transactions(context) @given('the user runs gpexpand with a static inputfile for a two-node cluster with mirrors') +@when('the user runs gpexpand with a static inputfile for a two-node cluster with mirrors') def impl(context): inputfile_contents = """ sdw1|sdw1|20502|/tmp/gpexpand_behave/two_nodes/data/primary/gpseg2|6|2|p @@ -2886,7 +3171,7 @@ def impl(context, hostnames): @given("a temporary directory under '{tmp_base_dir}' with mode '{mode}' is created") @given('a temporary directory under "{tmp_base_dir}" to expand into') -def make_temp_dir(context,tmp_base_dir, mode=''): +def make_temp_dir(context, tmp_base_dir, mode=''): if not tmp_base_dir: raise Exception("tmp_base_dir cannot be empty") if not os.path.exists(tmp_base_dir): @@ -2894,7 +3179,7 @@ def make_temp_dir(context,tmp_base_dir, mode=''): context.temp_base_dir = tempfile.mkdtemp(dir=tmp_base_dir) if mode: os.chmod(path.normpath(path.join(tmp_base_dir, context.temp_base_dir)), - int(mode,8)) + int(mode, 8)) @given('the new host "{hostnames}" is ready to go') def impl(context, hostnames): @@ -3009,27 +3294,32 @@ def impl(context, config_file): @when('check segment conf: postgresql.conf') @then('check segment conf: postgresql.conf') +@given('check segment conf: postgresql.conf') def step_impl(context): query = "select dbid, port, hostname, datadir from gp_segment_configuration where content >= 0" conn = dbconn.connect(dbconn.DbURL(dbname='postgres'), unsetSearchPath=False) - segments = dbconn.query(conn, query).fetchall() - for segment in segments: - dbid = "'%s'" % segment[0] - port = "'%s'" % segment[1] - hostname = segment[2] - datadir = segment[3] - - ## check postgresql.conf - remote_postgresql_conf = "%s/%s" % (datadir, 'postgresql.conf') - local_conf_copy = os.path.join(gp.get_coordinatordatadir(), "%s.%s" % ('postgresql.conf', hostname)) - cmd = Command(name="Copy remote conf to local to diff", - cmdStr='scp %s:%s %s' % (hostname, remote_postgresql_conf, local_conf_copy)) - cmd.run(validateAfter=True) - - dic = pgconf.readfile(filename=local_conf_copy) - if str(dic['port']) != port: - raise Exception("port value in postgresql.conf of %s is incorrect. Expected:%s, given:%s" % - (hostname, port, dic['port'])) + try: + segments = dbconn.query(conn, query).fetchall() + for segment in segments: + dbid = "'%s'" % segment[0] + port = "'%s'" % segment[1] + hostname = segment[2] + datadir = segment[3] + + ## check postgresql.conf + remote_postgresql_conf = "%s/%s" % (datadir, 'postgresql.conf') + local_conf_copy = os.path.join(gp.get_coordinatordatadir(), "%s.%s" % ('postgresql.conf', hostname)) + cmd = Command(name="Copy remote conf to local to diff", + cmdStr='rsync %s:%s %s' % (hostname, remote_postgresql_conf, local_conf_copy)) + cmd.run(validateAfter=True) + + dic = pgconf.readfile(filename=local_conf_copy) + if str(dic['port']) != port: + raise Exception("port value in postgresql.conf of %s is incorrect. Expected:%s, given:%s" % + (hostname, port, dic['port'])) + finally: + if conn: + conn.close() @given('the transactions are started for dml') def impl(context): @@ -3370,10 +3660,10 @@ def impl(context): standby_local_gp_segment_configuration_file = "%s/%s.standby" % \ (coordinator_datadir, gp_segment_configuration_backup) - cmd = Command(name="Copy standby file to coordinator", cmdStr='scp %s %s' % \ + cmd = Command(name="Copy standby file to coordinator", cmdStr='rsync %s %s' % \ (standby_remote_statusfile, standby_local_statusfile)) cmd.run(validateAfter=True) - cmd = Command(name="Copy standby file to coordinator", cmdStr='scp %s %s' % \ + cmd = Command(name="Copy standby file to coordinator", cmdStr='rsync %s %s' % \ (standby_remote_gp_segment_configuration_file, standby_local_gp_segment_configuration_file)) cmd.run(validateAfter=True) @@ -3493,3 +3783,54 @@ def impl(context, args): def impl(context): locale = get_en_utf_locale() context.execute_steps('''When a demo cluster is created using gpinitsystem args "--lc-ctype=%s"''' % locale) + + +@given('the user asynchronously runs pg_basebackup with {segment} of content {contentid} as source and the process is saved') +@when('the user asynchronously runs pg_basebackup with {segment} of content {contentid} as source and the process is saved') +@then('the user asynchronously runs pg_basebackup with {segment} of content {contentid} as source and the process is saved') +def impl(context, segment, contentid): + if segment == 'mirror': + role = 'm' + elif segment == 'primary': + role = 'p' + + all_segments = GpArray.initFromCatalog(dbconn.DbURL()).getDbList() + + basebackup_target = all_segments[0] + basebackup_source = all_segments[0] + for seg in all_segments: + if role == seg.role and str(seg.content) == contentid: + basebackup_source = seg + elif str(seg.content) == contentid: + basebackup_target = seg + + make_temp_dir(context, '/tmp') + + cmd = PgBaseBackup(target_datadir=context.temp_base_dir, + source_host=basebackup_source.getSegmentHostName(), + source_port=str(basebackup_source.getSegmentPort()), + create_slot=True, + replication_slot_name="replication_slot", + forceoverwrite=True, + target_gp_dbid=basebackup_target.getSegmentDbId()) + asyncproc = cmd.runNoWait() + context.asyncproc = asyncproc + + +@given('gp_stat_replication table has pg_basebackup entry for content {contentid}') +@when('gp_stat_replication table has pg_basebackup entry for content {contentid}') +@then('gp_stat_replication table has pg_basebackup entry for content {contentid}') +def impl(context, contentid): + sql = "select gp_segment_id from gp_stat_replication where application_name = 'pg_basebackup'" + + try: + with closing(dbconn.connect(dbconn.DbURL())) as conn: + res = dbconn.query(conn, sql) + rows = res.fetchall() + except Exception as e: + raise Exception("Failed to query gp_stat_replication: %s" % str(e)) + + segments_with_running_basebackup = {str(row[0]) for row in rows} + + if str(contentid) not in segments_with_running_basebackup: + raise Exception("pg_basebackup entry was not found for content %s in gp_stat_replication" % contentid) From d1b53fe4e70930451a820dd660a2fa88298e24a6 Mon Sep 17 00:00:00 2001 From: Piyush Chandwadkar <65647926+piyushc01@users.noreply.github.com> Date: Fri, 20 Jan 2023 17:14:11 +0530 Subject: [PATCH 2/9] Fixing gpcheckperf failure on -V with -f option (#14310) Gpcheckperf was throwing an exception when run with -f and -V option together. This was happening at with -V option, gpssh command outpuot is having few extra lines which are causing trouble while parsing the output. With this change, provided flag to skip verbose mode when running ssh command and used this non-verbose SSH mode to execute the command when getting host-name. Corrected run-time errors due to python3 in gpcheckperf. Also added the test case to cover the scenario relating the host file and -V option. --- gpMgmt/bin/gpcheckperf | 37 +++-- .../behave/mgmt_utils/gpcheckperf.feature | 37 +++++ .../behave/mgmt_utils/steps/mgmt_utils.py | 153 +++++++++++++++++- 3 files changed, 210 insertions(+), 17 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index 09bb496a669..c6c4a906158 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -82,9 +82,9 @@ def strcmd(cmd): return reduce(lambda x, y: x + ' ' + y, map(lambda x: x.find(' ') > 0 and "'" + x + "'" or x, cmd)) -def gpssh(cmd): +def gpssh(cmd, call_verbose=True): c = ['%s/bin/gpssh' % GPHOME] - if GV.opt['-V']: + if GV.opt['-V'] and call_verbose: c.append('-v') if GV.opt['-f']: c.append('-f') @@ -541,7 +541,7 @@ def spawnNetperfTestBetween(x, y, netperf_path, netserver_port, sec=5): x, cmd] proc = None try: - if GV.opt['-v']: + if GV.opt['-v'] or GV.opt['-V']: print('[Info]', strcmd(c)) proc = subprocess.Popen(c, stdout=subprocess.PIPE) except KeyboardInterrupt: @@ -740,13 +740,23 @@ def get_host_map(hostlist): uniqhosts = dict() # unique host list # get list of hostnames - rc, out = gpssh('hostname') + # disabling verbose mode for gpssh as it is adding extra lines of output + rc, out = gpssh('hostname', False) + if not rc: raise Exception('Encountered error running hostname') + ''' Sample output: + [sdw1] sdw1 + [sdw2] sdw2 + ''' + # get unique hostname list for line in out.splitlines(): - seg, host = line.translate(None, '[]').split() + seg, host = line.translate(str.maketrans('','','[]')).split() + # removing \r and b coming in the output of the command in hostname + host = host.replace('\\r\'', '') + host = host.replace('b\'', '') uniqhosts[host] = seg # get list of segments associated with each host (can't use gpssh since it de-dupes hosts) @@ -755,7 +765,8 @@ def get_host_map(hostlist): proc = None try: - if GV.opt['-v']: print('[Info]', strcmd(cmd)) + if GV.opt['-v'] or GV.opt['-V']: + print('[Info]', strcmd(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = proc.stdout.read(-1) rc = proc.wait() @@ -781,7 +792,7 @@ def runNetPerfTestMatrix(): ''' (netperf, hostlist, netserver_port) = setupNetPerfTest() if not netperf: - return None + return None, None # dict() of seglist[segname] = hostname, uniqhosts[hostname] = 1 segment name seglist, uniqhosts = get_host_map(hostlist) @@ -807,19 +818,21 @@ def runNetPerfTestMatrix(): def printMatrixResult(result, seglist): + if not result: + return print('Full matrix netperf bandwidth test') # sum up Rx/Tx rate for each host netTx = dict() netRx = dict() for h in result: - if netTx.has_key(h[0]): + if h[0] in netTx: netTx[h[0]] += float(h[6]) else: netTx[h[0]] = float(h[6]) # netRx requires that we lookup the hostname for a given segment name - if netRx.has_key(seglist[h[1]]): + if seglist[h[1]] in netRx: netRx[seglist[h[1]]] += float(h[6]) else: netRx[seglist[h[1]]] = float(h[6]) @@ -850,7 +863,7 @@ def printMatrixResult(result, seglist): copy = n[:] copy.sort() - median = copy[len(copy) / 2] + median = copy[len(copy) // 2] print('') print('Summary:') @@ -863,6 +876,8 @@ def printMatrixResult(result, seglist): def printNetResult(result): + if not result: + return print('Netperf bisection bandwidth test') for h in result: print('%s -> %s = %f' % (h[0], h[1], h[6])) @@ -894,6 +909,8 @@ def printNetResult(result): def printResult(title, result): + if not result: + return totTime = 0 totBytes = 0 totMBPS = 0 diff --git a/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature b/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature index a59f309f112..9b660b491a8 100644 --- a/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature +++ b/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature @@ -15,3 +15,40 @@ Feature: Tests for gpcheckperf Then gpcheckperf should return a return code of 0 And gpcheckperf should print "avg = " to stdout And gpcheckperf should not print "NOTICE: -t is deprecated " to stdout + + @concourse_cluster + Scenario: gpcheckperf runs tests by passing hostfile in super verbose mode + Given the database is running + And create a gpcheckperf input host file + When the user runs "gpcheckperf -f /tmp/hostfile1 -r M -d /data/gpdata/ --duration=3m -V" + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "Full matrix netperf bandwidth test" to stdout + And gpcheckperf should not print "IndexError: list index out of range" to stdout + + @concourse_cluster + Scenario: gpcheckperf runs tests by passing hostfile in verbose mode + Given the database is running + And create a gpcheckperf input host file + When the user runs "gpcheckperf -f /tmp/hostfile1 -r M -d /data/gpdata/ --duration=3m -v" + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "Full matrix netperf bandwidth test" to stdout + And gpcheckperf should not print "IndexError: list index out of range" to stdout + + @concourse_cluster + Scenario: gpcheckperf runs tests by passing hostfile in regular mode + Given the database is running + And create a gpcheckperf input host file + When the user runs "gpcheckperf -f /tmp/hostfile1 -r M -d /data/gpdata/ --duration=3m" + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "Full matrix netperf bandwidth test" to stdout + And gpcheckperf should not print "IndexError: list index out of range" to stdout + + @concourse_cluster + Scenario: gpcheckperf does not throws typeerror when run with single host + Given the database is running + And create a gpcheckperf input host file + When the user runs "gpcheckperf -h sdw1 -r M -d /data/gpdata/ --duration=3m" + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "single host only - abandon netperf test" to stdout + And gpcheckperf should not print "TypeError:" to stdout + diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py index 24049c068b5..7b346b04dde 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py @@ -46,7 +46,7 @@ def show_all_installed(gphome): name = x[0].lower() if 'ubuntu' in name: return "dpkg --get-selections --admindir=%s/share/packages/database/deb | awk '{print $1}'" % gphome - elif 'centos' in name or 'rhel' in name: + elif 'centos' in name or 'rhel' in name or 'rocky' in name or 'ol' in name: return "rpm -qa --dbpath %s/share/packages/database" % gphome else: raise Exception('UNKNOWN platform: %s' % str(x)) @@ -56,7 +56,7 @@ def remove_native_package_command(gphome, full_gppkg_name): name = x[0].lower() if 'ubuntu' in name: return 'fakeroot dpkg --force-not-root --log=/dev/null --instdir=%s --admindir=%s/share/packages/database/deb -r %s' % (gphome, gphome, full_gppkg_name) - elif 'centos' in name or 'rhel' in name: + elif 'centos' in name or 'rhel' in name or 'rocky' in name or 'ol' in name: return 'rpm -e %s --dbpath %s/share/packages/database' % (full_gppkg_name, gphome) else: raise Exception('UNKNOWN platform: %s' % str(x)) @@ -438,12 +438,30 @@ def impl(context, content): dburl = dbconn.DbURL(hostname=host, port=port, dbname='template1') wait_for_desired_query_result(dburl, query, desired_result, utility=True) + +@given('the user just waits until recovery_progress.file is created in {logdir}') +@when('the user just waits until recovery_progress.file is created in {logdir}') +@then('the user just waits until recovery_progress.file is created in {logdir}') +def impl(context, logdir): + attempt = 0 + num_retries = 6000 + log_dir = _get_gpAdminLogs_directory() if logdir == 'gpAdminLogs' else logdir + recovery_progress_file = '{}/recovery_progress.file'.format(log_dir) + while attempt < num_retries: + attempt += 1 + if os.path.exists(recovery_progress_file): + return + time.sleep(0.1) + if attempt == num_retries: + raise Exception('Timed out after {} retries'.format(num_retries)) + + @given('the user waits until recovery_progress.file is created in {logdir} and verifies its format') @when('the user waits until recovery_progress.file is created in {logdir} and verifies its format') @then('the user waits until recovery_progress.file is created in {logdir} and verifies its format') def impl(context, logdir): attempt = 0 - num_retries = 60000 + num_retries = 6000 log_dir = _get_gpAdminLogs_directory() if logdir == 'gpAdminLogs' else logdir recovery_progress_file = '{}/recovery_progress.file'.format(log_dir) while attempt < num_retries: @@ -459,7 +477,7 @@ def impl(context, logdir): return else: raise Exception('File present but incorrect format line "{}"'.format(line)) - time.sleep(0.01) + time.sleep(0.1) if attempt == num_retries: raise Exception('Timed out after {} retries'.format(num_retries)) @@ -3685,8 +3703,8 @@ def impl(context, command, input): context.error_message = stderr.decode() def are_on_different_subnets(primary_hostname, mirror_hostname): - primary_broadcast = check_output(['ssh', '-n', primary_hostname, "/sbin/ip addr show eth0 | grep 'inet .* brd' | awk '{ print $4 }'"]) - mirror_broadcast = check_output(['ssh', '-n', mirror_hostname, "/sbin/ip addr show eth0 | grep 'inet .* brd' | awk '{ print $4 }'"]) + primary_broadcast = check_output(['ssh', '-n', primary_hostname, "/sbin/ip addr show | grep 'inet .* brd' | awk '{ print $4 }'"]) + mirror_broadcast = check_output(['ssh', '-n', mirror_hostname, "/sbin/ip addr show | grep 'inet .* brd' | awk '{ print $4 }'"]) if not primary_broadcast: raise Exception("primary hostname %s has no broadcast address" % primary_hostname) if not mirror_broadcast: @@ -3784,7 +3802,6 @@ def impl(context): locale = get_en_utf_locale() context.execute_steps('''When a demo cluster is created using gpinitsystem args "--lc-ctype=%s"''' % locale) - @given('the user asynchronously runs pg_basebackup with {segment} of content {contentid} as source and the process is saved') @when('the user asynchronously runs pg_basebackup with {segment} of content {contentid} as source and the process is saved') @then('the user asynchronously runs pg_basebackup with {segment} of content {contentid} as source and the process is saved') @@ -3834,3 +3851,125 @@ def impl(context, contentid): if str(contentid) not in segments_with_running_basebackup: raise Exception("pg_basebackup entry was not found for content %s in gp_stat_replication" % contentid) + +@given('create a gpcheckperf input host file') +def impl(context): + cmd = Command(name='create input host file', cmdStr='echo sdw1 > /tmp/hostfile1;echo mdw >> /tmp/hostfile1;') + cmd.run(validateAfter=True) + +@given('backup /etc/hosts file and update hostname entry for localhost') +def impl(context): + # Backup current /etc/hosts file + cmd = Command(name='backup the hosts file', cmdStr='sudo cp /etc/hosts /tmp/hosts_orig') + cmd.run(validateAfter=True) + # Get the host-name + cmd = Command(name='get hostname', cmdStr='hostname') + cmd.run(validateAfter=True) + hostname = cmd.get_stdout() + # Update entry in current /etc/hosts file to add new host-address + cmd = Command(name='update hostlist with new hostname', cmdStr="sudo sed 's/%s/%s__1 %s/g' > /tmp/hosts; sudo cp -f /tmp/hosts /etc/hosts;rm /tmp/hosts" + %(hostname, hostname, hostname)) + cmd.run(validateAfter=True) + +@then('restore /etc/hosts file and cleanup hostlist file') +def impl(context): + cmd = "sudo mv -f /tmp/hosts_orig /etc/hosts; rm -f /tmp/clusterConfigFile-1; rm -f /tmp/hostfile--1" + context.execute_steps(u'''Then the user runs command "%s"''' % cmd) + +@given('update hostlist file with updated host-address') +def impl(context): + cmd = Command(name='get hostname', cmdStr='hostname') + cmd.run(validateAfter=True) + hostname = cmd.get_stdout() + # Update entry in hostfile to replace with address + cmd = Command(name='update temp hosts file', cmdStr= "sed 's/%s/%s__1/g' < ../gpAux/gpdemo/hostfile >> /tmp/hostfile--1" % (hostname, hostname)) + cmd.run(validateAfter=True) + +@given('update clusterConfig file with new port and host-address') +def impl(context): + cmd = Command(name='get hostname', cmdStr='hostname') + cmd.run(validateAfter=True) + hostname = cmd.get_stdout() + + # Create a copy of config file + cmd = Command(name='create a copy of config file', + cmdStr= "cp ../gpAux/gpdemo/clusterConfigFile /tmp/clusterConfigFile-1;") + cmd.run(validateAfter=True) + + # Update hostfile location + cmd = Command(name='update master hostname in config file', + cmdStr= "sed 's/MACHINE_LIST_FILE=.*/MACHINE_LIST_FILE=\/tmp\/hostfile--1/g' -i /tmp/clusterConfigFile-1") + cmd.run(validateAfter=True) + + +@then('verify that cluster config has host-name populated correctly') +def impl(context): + cmd = Command(name='get hostname', cmdStr='hostname') + cmd.run(validateAfter=True) + hostname_orig = cmd.get_stdout().strip() + hostname_new = "{}__1".format(hostname_orig) + # Verift host-address not populated in the config + with closing(dbconn.connect(dbconn.DbURL(), unsetSearchPath=False)) as conn: + sql = "SELECT count(*) FROM gp_segment_configuration WHERE hostname='%s'" % hostname_new + num_matching = dbconn.querySingleton(conn, sql) + if(num_matching != 0): + raise Exception("Found entries in gp_segment_configuration is host-address popoulated as host-name") + # Verify correct host-name is populated in the config + with closing(dbconn.connect(dbconn.DbURL(), unsetSearchPath=False)) as conn: + sql = "SELECT count( distinct hostname) FROM gp_segment_configuration WHERE hostname='%s'" % hostname_orig + num_matching = dbconn.querySingleton(conn, sql) + if(num_matching != 1): + raise Exception("Found no entries in gp_segment_configuration is host-address popoulated as host-name") + +@given('update the private keys for the new host address') +def impl(context): + cmd = Command(name='get hostname', cmdStr='hostname') + cmd.run(validateAfter=True) + hostname = "{}__1".format(cmd.get_stdout().strip()) + cmd_str = "rm -f ~/.ssh/id_rsa ~/.ssh/id_rsa.pub ~/.ssh/known_hosts; $GPHOME/bin/gpssh-exkeys -h {}".format(hostname) + cmd = Command(name='update ssh private keys', cmdStr=cmd_str) + cmd.run(validateAfter=True) + +@then('verify replication slot {slot} is available on all the segments') +@when('verify replication slot {slot} is available on all the segments') +@given('verify replication slot {slot} is available on all the segments') +def impl(context, slot): + gparray = GpArray.initFromCatalog(dbconn.DbURL()) + segments = gparray.getDbList() + dbname = "template1" + query = "SELECT count(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = '{}'".format(slot) + + for seg in segments: + if seg.isSegmentPrimary(): + host = seg.getSegmentHostName() + port = seg.getSegmentPort() + with closing(dbconn.connect(dbconn.DbURL(dbname=dbname, port=port, hostname=host), + utility=True, unsetSearchPath=False)) as conn: + result = dbconn.querySingleton(conn, query) + if result == 0: + raise Exception("Slot does not exist for host:{}, port:{}".format(host, port)) + + +@given('user waits until gp_stat_replication table has no pg_basebackup entries for content {contentids}') +@when('user waits until gp_stat_replication table has no pg_basebackup entries for content {contentids}') +@then('user waits until gp_stat_replication table has no pg_basebackup entries for content {contentids}') +def impl(context, contentids): + retries = 600 + content_ids = contentids.split(',') + content_ids = ', '.join(c for c in content_ids) + sql = "select count(*) from gp_stat_replication where application_name = 'pg_basebackup' and gp_segment_id in (%s)" %(content_ids) + no_basebackup = False + + for i in range(retries): + try: + with closing(dbconn.connect(dbconn.DbURL())) as conn: + res = dbconn.querySingleton(conn, sql) + except Exception as e: + raise Exception("Failed to query gp_stat_replication: %s" % str(e)) + if res == 0: + no_basebackup = True + break + time.sleep(1) + + if not no_basebackup: + raise Exception("pg_basebackup entry was found for contents %s in gp_stat_replication after %d retries" % (contentids, retries)) From 1c0c0eb39f8e4024c9829d04c75631fd0adc0641 Mon Sep 17 00:00:00 2001 From: Nihal Jain Date: Mon, 6 Mar 2023 11:31:12 +0530 Subject: [PATCH 3/9] gpcheckperf: Fix memory calculation function (#14865) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * gpcheckperf: Fix memory calculation function Currently, there is a bug in the `getMemory()` function in `gpcheckperf` because of the way we check the return code of the `run()` method which is called inside `getMemory()`. The `run()` method returns an integer value of `zero` in case of success and a `non-zero` value if it fails. We are checking this value using the condition `if not ok` which is incorrect because when the `run()` method succeeds (`ok = 0`), the condition would result as `False` causing the `getMemory()` function to assume that the `run()` method failed but in reality, it did not. A simple fix would be to change the condition from `if not ok` to `if ok != 0` to check for any possible failure from the `run()` method. Further, the way `getMemory()` handles errors is also incorrect. It just returns `0` whenever there is an error which can lead to the incorrect file size being used to perform disk performance tests. This is because the gpcheckperf utility internally calls `multidd` command to perform disk performance tests which also accepts a file size parameter with the -S option which when not set (or is equal to 0), uses its own default value of `2 * memory_size`, but instead of dividing it from the number of input directories, it uses this value for each directory i.e. the total file size value would be `2 * memory_size * no_of_input_directories`. Due to this, sometimes the user can meet File System Full error when the number of input directories is big. Hence we need to properly handle the error in the `getMemory()` function and exit from the code instead of just returning `0` Before: ``` $ gpcheckperf -d /tmp/test1 -d /tmp/test2 -d /tmp/test3 -h localhost -rd disk write avg time (sec): 5.46 disk write tot bytes: 12884901888 --> is equal to 2 * memory_size * no_of_input_directories disk write tot bandwidth (MB/s): 2250.55 disk write min bandwidth (MB/s): 2250.55 [localhost] disk write max bandwidth (MB/s): 2250.55 [localhost] ``` After: ``` $ gpcheckperf -d /tmp/test1 -d /tmp/test2 -d /tmp/test3 -h localhost -rd disk write avg time (sec): 1.87 disk write tot bytes: 4295000064 --> is equal to 2 * memory_size disk write tot bandwidth (MB/s): 2190.39 disk write min bandwidth (MB/s): 2190.39 [localhost] disk write max bandwidth (MB/s): 2190.39 [localhost] ``` Also added unit test cases to test the getMemory() function outputs and added a main section to the gpcheckperf code. --- gpMgmt/bin/gpcheckperf | 109 ++++++++++-------- .../test/unit/test_unit_gpcheckperf.py | 66 +++++++++++ .../behave/mgmt_utils/gpcheckperf.feature | 10 ++ 3 files changed, 136 insertions(+), 49 deletions(-) create mode 100644 gpMgmt/bin/gppylib/test/unit/test_unit_gpcheckperf.py diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index c6c4a906158..44c610861f8 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -133,26 +133,27 @@ def getPlatform(): def getMemory(): - if getPlatform() == 'linux': - ok, out = run("sh -c 'cat /proc/meminfo | grep MemTotal'") - if not ok: - return 0 + platform = getPlatform() + + if platform == 'linux': + rc, out = run("sh -c 'cat /proc/meminfo | grep MemTotal'") + if rc != 0: + return None word_list = out.strip().split(' ') val = int(word_list[len(word_list) - 2]) factor = word_list[len(word_list) - 1] if factor == 'kB': - return val * 1024 - return 0 + return val * 1024 if val else None - if getPlatform() == 'darwin': - ok, out = run("/usr/sbin/sysctl hw.physmem") - if not ok: - return 0 + if platform == 'darwin': + rc, out = run("/usr/sbin/sysctl hw.physmem") + if rc != 0: + return None word_list = out.strip().split(' ') val = int(word_list[1]) - return val + return val if val else None - return 0 + return None def parseMemorySize(line): @@ -261,7 +262,12 @@ def parseCommandLine(): usage('Error: maximum size for -B parameter is 1MB') if GV.opt['-S'] == 0: - GV.opt['-S'] = 2 * getMemory() / len(GV.opt['-d']) + system_mem_size = getMemory() + if system_mem_size is not None: + GV.opt['-S'] = 2 * system_mem_size / len(GV.opt['-d']) + else: + sys.exit('[Error] could not get system memory size. Instead, you can use the -S option to provide the file size value') + else: GV.opt['-S'] /= len(GV.opt['-d']) @@ -947,50 +953,55 @@ def printResult(title, result): print('') -try: - parseCommandLine() - runSetup() - diskWriteResult = diskReadResult = streamResult = netResult = None - tornDown = False +def main(): try: - if GV.opt['-r'].find('d') >= 0: - multidd = copyExecOver('multidd') - diskWriteResult = runDiskWriteTest(multidd) - diskReadResult = runDiskReadTest(multidd) + parseCommandLine() + runSetup() + diskWriteResult = diskReadResult = streamResult = netResult = None + tornDown = False + try: + if GV.opt['-r'].find('d') >= 0: + print('[Warning] Using %d bytes for disk performance test. This might take some time' % (GV.opt['-S'] * len(GV.opt['-d']))) + multidd = copyExecOver('multidd') + diskWriteResult = runDiskWriteTest(multidd) + diskReadResult = runDiskReadTest(multidd) - if GV.opt['-r'].find('s') >= 0: - streamResult = runStreamTest() + if GV.opt['-r'].find('s') >= 0: + streamResult = runStreamTest() - if GV.opt['--net'] == 'netperf': - netResult = runNetPerfTest() - elif GV.opt['--net'] == 'parallel': - netResult = runNetPerfTestParallel() - elif GV.opt['--net'] == 'matrix': - netResult, seglist = runNetPerfTestMatrix() + if GV.opt['--net'] == 'netperf': + netResult = runNetPerfTest() + elif GV.opt['--net'] == 'parallel': + netResult = runNetPerfTestParallel() + elif GV.opt['--net'] == 'matrix': + netResult, seglist = runNetPerfTestMatrix() - runTeardown() + runTeardown() + + finally: + print('') + print('====================') + print('== RESULT %s' % datetime.datetime.now().isoformat()) + print('====================') - finally: - print('') - print('====================') - print('== RESULT %s' % datetime.datetime.now().isoformat()) - print('====================') + if diskWriteResult: + printResult('disk write', diskWriteResult) - if diskWriteResult: - printResult('disk write', diskWriteResult) + if diskReadResult: + printResult('disk read', diskReadResult) - if diskReadResult: - printResult('disk read', diskReadResult) + if streamResult: + printResult('stream', streamResult) - if streamResult: - printResult('stream', streamResult) + if netResult and GV.opt['--net'] == 'matrix': + printMatrixResult(netResult, seglist) + elif netResult and GV.opt['--net']: + printNetResult(netResult) - if netResult and GV.opt['--net'] == 'matrix': - printMatrixResult(netResult, seglist) - elif netResult and GV.opt['--net']: - printNetResult(netResult) + runTeardown() - runTeardown() + except KeyboardInterrupt: + print('[Abort] Keyboard Interrupt ...') -except KeyboardInterrupt: - print('[Abort] Keyboard Interrupt ...') +if __name__ == '__main__': + main() diff --git a/gpMgmt/bin/gppylib/test/unit/test_unit_gpcheckperf.py b/gpMgmt/bin/gppylib/test/unit/test_unit_gpcheckperf.py new file mode 100644 index 00000000000..097d4a1b191 --- /dev/null +++ b/gpMgmt/bin/gppylib/test/unit/test_unit_gpcheckperf.py @@ -0,0 +1,66 @@ +import imp +import os +import sys +from mock import patch +from gppylib.test.unit.gp_unittest import GpTestCase,run_tests + +class GpCheckPerf(GpTestCase): + def setUp(self): + gpcheckcat_file = os.path.abspath(os.path.dirname(__file__) + "/../../../gpcheckperf") + self.subject = imp.load_source('gpcheckperf', gpcheckcat_file) + + def tearDown(self): + super(GpCheckPerf, self).tearDown() + + @patch('gpcheckperf.getPlatform', return_value='darwin') + @patch('gpcheckperf.run') + def test_get_memory_on_darwin(self, mock_run, mock_get_platform): + mock_run.return_value = [1, 'hw.physmem: 1234'] + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, None) + + mock_run.return_value = [0, 'hw.physmem: 0'] + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, None) + + mock_run.return_value = [0, 'hw.physmem: 1234'] + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, 1234) + + @patch('gpcheckperf.getPlatform', return_value='linux') + @patch('gpcheckperf.run') + def test_get_memory_on_linux(self, mock_run, mock_get_platform): + mock_run.return_value = [1, 'MemTotal: 10 kB'] + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, None) + + mock_run.return_value = [0, 'MemTotal: 0 kB'] + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, None) + + mock_run.return_value = [0, 'MemTotal: 10 kB'] + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, 10240) + + @patch('gpcheckperf.getPlatform', return_value='abc') + def test_get_memory_on_invalid_platform(self, mock_get_platform): + actual_result = self.subject.getMemory() + self.assertEquals(actual_result, None) + + @patch('gpcheckperf.getMemory', return_value=None) + def test_parseCommandLine_when_get_memory_fails(self, mock_get_memory): + sys.argv = ["gpcheckperf", "-h", "locahost", "-r", "d", "-d", "/tmp"] + with self.assertRaises(SystemExit) as e: + self.subject.parseCommandLine() + + self.assertEqual(e.exception.code, '[Error] could not get system memory size. Instead, you can use the -S option to provide the file size value') + + @patch('gpcheckperf.getMemory', return_value=123) + def test_parseCommandLine_when_get_memory_succeeds(self, mock_get_memory): + sys.argv = ["gpcheckperf", "-h", "locahost", "-r", "d", "-d", "/tmp"] + self.subject.parseCommandLine() + self.assertEqual(self.subject.GV.opt['-S'], 246.0) + + +if __name__ == '__main__': + run_tests() diff --git a/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature b/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature index 9b660b491a8..bfa13951c0b 100644 --- a/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature +++ b/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature @@ -52,3 +52,13 @@ Feature: Tests for gpcheckperf And gpcheckperf should print "single host only - abandon netperf test" to stdout And gpcheckperf should not print "TypeError:" to stdout + Scenario: gpcheckperf runs with -S option and prints a warning message + Given the database is running + When the user runs "gpcheckperf -h localhost -r d -d /tmp -S 1GB" + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "\[Warning] Using 1073741824 bytes for disk performance test. This might take some time" to stdout + + Scenario: gpcheckperf errors out when invalid value is passed to the -S option + Given the database is running + When the user runs "gpcheckperf -h localhost -r d -d /tmp -S abc" + Then gpcheckperf should return a return code of 1 From 1a367976d3a8645f789d77874e06a872043179f4 Mon Sep 17 00:00:00 2001 From: Ed Espino Date: Sun, 12 Mar 2023 12:04:37 -0700 Subject: [PATCH 4/9] gpcheckperf - Update a Python 3 reference of "python" to "python3" In gpcheckperf, a remaining "python" reference exists. This commit renames it to "python3" and allows it to run when the symbolic link /usr/bin/python pointing to /usr/bin/python3 does not exist. Tl;dr - An attempt was made to update all Greenplum utilities to reference 'python3'. A reference was left unchanged in gpcheckperf. Python 3 installations by default do not create the symbolic link /usr/bin/python --> /usr/bin/python3. This has been observed in Python 3 installation3 on Rockylinux 8 & 9 and Ubuntu 20.04 & 22.04. In the case of gpcheckperf, when invoked, the output below reveals the "Error" a user receives when /usr/bin/python does not exist: ``` gpadmin@cdw:~$ /usr/local/gp7/bin/gpcheckperf -h cdw -d /data -r d -S 1 -v -------------------- SETUP 2023-03-12T17:26:27.967030 -------------------- [Info] verify python interpreter exists [Info] /usr/local/gp7/bin/gpssh -h cdw 'python -c print' -------------------- TEARDOWN -------------------- [Info] /usr/local/gp7/bin/gpssh -h cdw 'rm -rf /data/gpcheckperf_$USER' [Error] unable to find python interpreter on some hosts verify PATH variables on the hosts gpadmin@cdw:~$ ``` --- gpMgmt/bin/gpcheckperf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index 44c610861f8..6847c18c475 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -346,14 +346,14 @@ def runSetup(): print('--------------------') okCount = 0 try: - # check python reachable + # Verify python3 is accessible if GV.opt['-v']: - print('[Info] verify python interpreter exists') - (ok, out) = gpssh('python -c print') + print('[Info] verify python3 interpreter exists') + (ok, out) = gpssh('python3 -c print') if not ok: if not GV.opt['-v']: print(out) - sys.exit("[Error] unable to find python interpreter on some hosts\n" + sys.exit("[Error] unable to find python3 interpreter on some hosts\n" + " verify PATH variables on the hosts") # mkdir cperf From 4f4d1cb98720637d6a1163ea891f75a078b976b2 Mon Sep 17 00:00:00 2001 From: Piyush Chandwadkar Date: Mon, 20 Mar 2023 18:25:26 +0530 Subject: [PATCH 5/9] gpcheckperf: incorporating parity changes from 6X PR Parity changes from 6X PR: https://github.com/greenplum-db/gpdb/pull/15192 Changes are as follows: 1. Controlling verbosity of gpssh() function through verbose parameter 2. Updating gpssh calls with verbose parameter 3. Removing non-required checks in print*Results() 4. Removing regex removal for b' and \r characters when getting hostname as it is fixed in gpssh. 5. Re-structured test cases for gpcheckperf --- gpMgmt/bin/gpcheckperf | 46 +++++----- .../behave/mgmt_utils/gpcheckperf.feature | 85 +++++++++++-------- 2 files changed, 70 insertions(+), 61 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index 6847c18c475..a2f259cb22d 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -82,9 +82,9 @@ def strcmd(cmd): return reduce(lambda x, y: x + ' ' + y, map(lambda x: x.find(' ') > 0 and "'" + x + "'" or x, cmd)) -def gpssh(cmd, call_verbose=True): +def gpssh(cmd, verbose): c = ['%s/bin/gpssh' % GPHOME] - if GV.opt['-V'] and call_verbose: + if verbose: c.append('-v') if GV.opt['-f']: c.append('-f') @@ -326,13 +326,13 @@ def runTeardown(): for d in GV.opt['-d']: dirs = '%s %s/gpcheckperf_$USER' % (dirs, d) try: - gpssh('rm -rf ' + dirs) + gpssh('rm -rf ' + dirs, GV.opt['-V']) except: pass try: if GV.opt['--net']: - gpssh(killall(GV.opt['--netserver'])) + gpssh(killall(GV.opt['--netserver']), GV.opt['-V']) except: pass @@ -349,7 +349,7 @@ def runSetup(): # Verify python3 is accessible if GV.opt['-v']: print('[Info] verify python3 interpreter exists') - (ok, out) = gpssh('python3 -c print') + (ok, out) = gpssh('python3 -c print', GV.opt['-V']) if not ok: if not GV.opt['-v']: print(out) @@ -364,7 +364,7 @@ def runSetup(): dirs = '%s %s/gpcheckperf_$USER' % (dirs, d) cmd = 'rm -rf %s ; mkdir -p %s' % (dirs, dirs) - (ok, out) = gpssh(cmd) + (ok, out) = gpssh(cmd, GV.opt['-V']) if not ok: print('failed gpssh: %s' % out) sys.exit("[Error] unable to make gpcheckperf directory. \n" @@ -402,7 +402,7 @@ def copyExecOver(fname): sys.exit('[Error] command failed: gpsync %s =:%s with output: %s' % (path, target, out)) # chmod +x file - (ok, out) = gpssh('chmod a+rx %s' % target) + (ok, out) = gpssh('chmod a+rx %s' % target, GV.opt['-V']) if not ok: sys.exit('[Error] command failed: chmod a+rx %s with output: %s' % (target, out)) @@ -458,7 +458,7 @@ def runDiskWriteTest(multidd): cmd = cmd + (' -B %d' % GV.opt['-B']) if GV.opt['-S']: cmd = cmd + (' -S %d' % GV.opt['-S']) - (ok, out) = gpssh(cmd) + (ok, out) = gpssh(cmd, GV.opt['-V']) if not ok: sys.exit('[Error] command failed: %s with output: %s' % (cmd, out)) return parseMultiDDResult(out) @@ -477,7 +477,7 @@ def runDiskReadTest(multidd): cmd = cmd + (' -B %d' % GV.opt['-B']) if GV.opt['-S']: cmd = cmd + (' -S %d' % GV.opt['-S']) - (ok, out) = gpssh(cmd) + (ok, out) = gpssh(cmd, GV.opt['-V']) if not ok: sys.exit('[Error] command failed: %s with output: %s' % (cmd, out)) return parseMultiDDResult(out) @@ -490,7 +490,7 @@ def runStreamTest(): print('--------------------') cmd = copyExecOver('stream') - (ok, out) = gpssh(cmd) + (ok, out) = gpssh(cmd, GV.opt['-V']) if not ok: sys.exit('[Error] command failed: %s with output: %s' % (cmd, out)) out = io.StringIO(out) @@ -512,9 +512,9 @@ def startNetServer(): for i in range(5): if i > 0: print('[Warning] retrying with port %d' % port) - (ok, out) = gpssh(killall(GV.opt['--netserver'])) + (ok, out) = gpssh(killall(GV.opt['--netserver']), GV.opt['-V']) - (ok, out) = gpssh('%s -p %d > /dev/null 2>&1' % (rmtPath, port)) + (ok, out) = gpssh('%s -p %d > /dev/null 2>&1' % (rmtPath, port), GV.opt['-V']) if ok: return port @@ -745,8 +745,15 @@ def get_host_map(hostlist): seglist = dict() # segment list uniqhosts = dict() # unique host list - # get list of hostnames - # disabling verbose mode for gpssh as it is adding extra lines of output + ''' + Get hostnames using non-verbose mode since verbose output makes parsing difficult with extra lines as show: + Using delaybeforesend 0.05 and prompt_validation_timeout 1.0 + [Reset ...] + [INFO] login sdw2 + [sdw2] sdw2 + [INFO] completed successfully + [Cleanup...] + ''' rc, out = gpssh('hostname', False) if not rc: @@ -760,9 +767,6 @@ def get_host_map(hostlist): # get unique hostname list for line in out.splitlines(): seg, host = line.translate(str.maketrans('','','[]')).split() - # removing \r and b coming in the output of the command in hostname - host = host.replace('\\r\'', '') - host = host.replace('b\'', '') uniqhosts[host] = seg # get list of segments associated with each host (can't use gpssh since it de-dupes hosts) @@ -771,7 +775,7 @@ def get_host_map(hostlist): proc = None try: - if GV.opt['-v'] or GV.opt['-V']: + if GV.opt['-v']: print('[Info]', strcmd(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = proc.stdout.read(-1) @@ -824,8 +828,6 @@ def runNetPerfTestMatrix(): def printMatrixResult(result, seglist): - if not result: - return print('Full matrix netperf bandwidth test') # sum up Rx/Tx rate for each host @@ -882,8 +884,6 @@ def printMatrixResult(result, seglist): def printNetResult(result): - if not result: - return print('Netperf bisection bandwidth test') for h in result: print('%s -> %s = %f' % (h[0], h[1], h[6])) @@ -915,8 +915,6 @@ def printNetResult(result): def printResult(title, result): - if not result: - return totTime = 0 totBytes = 0 totMBPS = 0 diff --git a/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature b/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature index bfa13951c0b..9fafafef7af 100644 --- a/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature +++ b/gpMgmt/test/behave/mgmt_utils/gpcheckperf.feature @@ -17,48 +17,59 @@ Feature: Tests for gpcheckperf And gpcheckperf should not print "NOTICE: -t is deprecated " to stdout @concourse_cluster - Scenario: gpcheckperf runs tests by passing hostfile in super verbose mode + Scenario Outline: gpcheckperf run test by passing hostfile in regular mode Given the database is running - And create a gpcheckperf input host file - When the user runs "gpcheckperf -f /tmp/hostfile1 -r M -d /data/gpdata/ --duration=3m -V" + And create a gpcheckperf input host file + When the user runs "gpcheckperf -f /tmp/hostfile1 -r -d /data/gpdata/ --duration=10s" Then gpcheckperf should return a return code of 0 - And gpcheckperf should print "Full matrix netperf bandwidth test" to stdout - And gpcheckperf should not print "IndexError: list index out of range" to stdout + And gpcheckperf should print "-- NETPERF TEST" to stdout + And gpcheckperf should print "" to stdout + And gpcheckperf should print "Summary:" to stdout + And gpcheckperf should print "sum =" to stdout + And gpcheckperf should print "min =" to stdout + And gpcheckperf should print "max =" to stdout + And gpcheckperf should print "avg =" to stdout + And gpcheckperf should print "median =" to stdout - @concourse_cluster - Scenario: gpcheckperf runs tests by passing hostfile in verbose mode - Given the database is running - And create a gpcheckperf input host file - When the user runs "gpcheckperf -f /tmp/hostfile1 -r M -d /data/gpdata/ --duration=3m -v" - Then gpcheckperf should return a return code of 0 - And gpcheckperf should print "Full matrix netperf bandwidth test" to stdout - And gpcheckperf should not print "IndexError: list index out of range" to stdout + Examples: + | test_type | cmd_param | print_message | + | network | N | Netperf bisection bandwidth test | + | matrix | M | Full matrix netperf bandwidth test | - @concourse_cluster - Scenario: gpcheckperf runs tests by passing hostfile in regular mode - Given the database is running - And create a gpcheckperf input host file - When the user runs "gpcheckperf -f /tmp/hostfile1 -r M -d /data/gpdata/ --duration=3m" - Then gpcheckperf should return a return code of 0 - And gpcheckperf should print "Full matrix netperf bandwidth test" to stdout - And gpcheckperf should not print "IndexError: list index out of range" to stdout + @concourse_cluster + Scenario Outline: gpcheckperf runs test with hostfile in mode + Given the database is running + And create a gpcheckperf input host file + When the user runs "gpcheckperf -f /tmp/hostfile1 -r -d /data/gpdata/ --duration=10s " + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "-- NETPERF TEST" to stdout + And gpcheckperf should print "" to stdout + And gpcheckperf should print "making gpcheckperf directory on all hosts ..." to stdout + And gpcheckperf should print "[Info].*gpssh .*hostfile1 .*gpnetbenchClient." to stdout + And gpcheckperf should print "[Info].*gpssh .*hostfile1 .*gpnetbenchServer." to stdout + And gpcheckperf should print "== RESULT*" to stdout + And gpcheckperf should print "Summary:" to stdout + And gpcheckperf should print "TEARDOWN" to stdout + + Examples: + | test_type | verbosity | cmd_param | verbose_flag | gpssh_param | print_message | + | network | verbose | N | -v | -f | Netperf bisection bandwidth test | + | network | extra verbose | N | -V | -v -f | Netperf bisection bandwidth test | + | matrix | verbose | M | -v | -f | Full matrix netperf bandwidth test | + | matrix | extra verbose | M | -V | -v -f | Full matrix netperf bandwidth test | @concourse_cluster - Scenario: gpcheckperf does not throws typeerror when run with single host - Given the database is running - And create a gpcheckperf input host file - When the user runs "gpcheckperf -h sdw1 -r M -d /data/gpdata/ --duration=3m" - Then gpcheckperf should return a return code of 0 - And gpcheckperf should print "single host only - abandon netperf test" to stdout - And gpcheckperf should not print "TypeError:" to stdout + Scenario Outline: running gpcheckperf single host test case + Given the database is running + And create a gpcheckperf input host file + When the user runs "gpcheckperf -h cdw -r -d /data/gpdata/ --duration=10s -v" + Then gpcheckperf should return a return code of 0 + And gpcheckperf should print "-- NETPERF TEST" to stdout + And gpcheckperf should print "single host only - abandon netperf test" to stdout + And gpcheckperf should print "TEARDOWN" to stdout - Scenario: gpcheckperf runs with -S option and prints a warning message - Given the database is running - When the user runs "gpcheckperf -h localhost -r d -d /tmp -S 1GB" - Then gpcheckperf should return a return code of 0 - And gpcheckperf should print "\[Warning] Using 1073741824 bytes for disk performance test. This might take some time" to stdout + Examples: + | test_name | cmd_param| + | matrix test | M | + | network test| N | - Scenario: gpcheckperf errors out when invalid value is passed to the -S option - Given the database is running - When the user runs "gpcheckperf -h localhost -r d -d /tmp -S abc" - Then gpcheckperf should return a return code of 1 From 39016c3c7c21cf3d6218262d0f91254f216b3d2b Mon Sep 17 00:00:00 2001 From: Piyush Chandwadkar Date: Mon, 20 Mar 2023 18:30:44 +0530 Subject: [PATCH 6/9] gpcheckperf: fixing string parsing in parseMultiDDResult() Issue: After updating gpssh to return string output, gpcheckperf failed to parse results. RCA: String before gpssh changes: "'multidd total bytes '" String after gpssh fix: "multidd total bytes '" In parseMultiDDResult(), str.Find() before gpssh changes were returning index 1 as it was getting string starting with "'". Post gpssh changes, substring is as the beginning of the string hence returning index 0. Fix: str.find() returns -1 if the substring is not found otherwise returns index. As the substring searched in parseMultiDDResult() is appearing at index zero, corrected condition to include index zero. --- gpMgmt/bin/gpcheckperf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index a2f259cb22d..cc8cba62af9 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -420,7 +420,7 @@ def parseMultiDDResult(out): o = line[i + 2:] - if o.find('multidd total bytes ') > 0: + if o.find('multidd total bytes ') >= 0: h = line[1:i] o = o.split() m = re.search("(^\d+)", o[-1]) @@ -429,7 +429,7 @@ def parseMultiDDResult(out): bytes = int(m.group(1)) continue - if o.find('real') > 0: + if o.find('real') >= 0: h = line[1:i] o = o.split() m = re.search("(^\d+.\d+)", o[1]) From 60319e985a3d79394280163eeaaa0804ef491658 Mon Sep 17 00:00:00 2001 From: Evgeniy Ratkov Date: Tue, 27 Jun 2023 08:08:49 +0300 Subject: [PATCH 7/9] gpcheckperf: add buffer size parameter (#14848) * gpcheckperf: add buffer size parameter (#14848) Before this patch, while running gpcheckperf utility the buffer was set by default for the underlying gpnetbenchClient utility as 32Kb. It led to problem with receiving annoying and misleading warnings about connections between hosts. Use '--buffer-size' flag with size in kilobytes to set buffer size, which will be used at gpnetbenchClient. It is an optional parameter. The default value is 8Kb. --- gpMgmt/bin/gpcheckperf | 20 ++++++++++++++---- gpMgmt/doc/gpcheckperf_help | 7 ++++++- .../behave/mgmt_utils/gpcheckperf.feature | 21 +++++++++++++++++++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index cc8cba62af9..98ade0638e7 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -29,6 +29,7 @@ Usage: gpcheckperf -f file : a file listing all hosts to connect to --duration : how long to run network test (default 5 seconds) --netperf : use netperf instead of gpnetbenchServer/gpnetbenchClient + --buffer-size : the size of the send buffer in kilobytes ( default 8 kilobytes) """ import datetime @@ -68,7 +69,7 @@ class Global(): opt = {'-d': [], '-D': False, '-v': False, '-V': False, '-r': '', '-B': 1024 * 32, '-S': 0, '-h': [], '-f': None, '--duration': 15, '--net': None, '--netserver': 'gpnetbenchServer', - '--netclient': 'gpnetbenchClient'} + '--netclient': 'gpnetbenchClient', '--buffer-size': 0} GV = Global() @@ -203,7 +204,7 @@ def print_version(): def parseCommandLine(): try: - (options, args) = getopt.getopt(sys.argv[1:], '?vVDd:r:B:S:p:h:f:', ['duration=', 'version', 'netperf']) + (options, args) = getopt.getopt(sys.argv[1:], '?vVDd:r:B:S:p:h:f:', ['duration=', 'version', 'netperf', 'buffer-size=']) except Exception as e: usage('Error: ' + str(e)) exit(1) @@ -226,6 +227,8 @@ def parseCommandLine(): elif switch == '--netperf': GV.opt['--netserver'] = 'netserver' GV.opt['--netclient'] = 'netperf' + elif switch == '--buffer-size': + GV.opt[switch] = int(val) # run default tests (if not specified) if GV.opt['-r'] == '': @@ -275,6 +278,14 @@ def parseCommandLine(): GV.opt['--duration'] = 15 print('[INFO] Invalid network duration specified. Using default (15 seconds)') + if GV.opt['--netclient'].find('netperf') >= 0: + if GV.opt['--buffer-size']: + print('[Warning] --buffer-size option will be ignored when the --netperf option is enabled') + else: + if GV.opt['--buffer-size'] <= 0: + print('[INFO] --buffer-size value is not specified or invalid. Using default (8 kilobytes)') + GV.opt['--buffer-size'] = 8 + # strip the last '/' from the dir dd = [] for d in GV.opt['-d']: @@ -540,8 +551,9 @@ def spawnNetperfTestBetween(x, y, netperf_path, netserver_port, sec=5): cmd = ('%s -H %s -p %d -t TCP_STREAM -l %s -f M -P 0 ' % (netperf_path, y, netserver_port, sec)) else: - cmd = ('%s -H %s -p %d -l %s -P 0 ' - % (netperf_path, y, netserver_port, sec)) + cmd = ('%s -H %s -p %d -l %s -P 0 -b %s' + % (netperf_path, y, netserver_port, sec, GV.opt['--buffer-size'])) + c = ['ssh', '-o', 'BatchMode yes', '-o', 'StrictHostKeyChecking no', x, cmd] diff --git a/gpMgmt/doc/gpcheckperf_help b/gpMgmt/doc/gpcheckperf_help index 23820d46ab1..8fd95568530 100755 --- a/gpMgmt/doc/gpcheckperf_help +++ b/gpMgmt/doc/gpcheckperf_help @@ -13,7 +13,7 @@ gpcheckperf -d [-d ...] gpcheckperf -d {-f | -h [-h ...]} - [ -r n|N|M [--duration