Skip to content

Commit

Permalink
Enhancement: Add gpshrink to support elastic scaling
Browse files Browse the repository at this point in the history
In order to support gpshrink, similar to gpexpand, we first
support "alter table <tablename> shrink table to <segnum>"
to redistribute data on a specific number of segments.

For gpshrink implementation, it is mainly divided into two
stages similar to gpexpand:
1. Collect the tables that need to be shrink and write them
into gpshrink.status_detail.
2. Perform data redistribution on the tables that need to be
shrink, and delete specific segments in gp_segment_configuration.
  • Loading branch information
lss602726449 committed Mar 15, 2024
1 parent f8a6bf8 commit 9aa6272
Show file tree
Hide file tree
Showing 17 changed files with 3,613 additions and 26 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,49 @@ jobs:
/code/gpdb_src/src/test/isolation/expected/
/code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/singlenodedir/demoDataDir-1/log/
icw-expandshrink-test:
needs: build
runs-on: [ self-hosted, example ]
env:
MAKE_TEST_COMMAND: "-C src/test/isolation2 installcheck-expandshrink"
TEST_OS: "centos"
DUMP_DB: "true"
steps:
- uses: actions/checkout@v3
with:
path: "gpdb_src"
- uses: actions/download-artifact@v3
with:
name: cbdb-variables
path: /opt/
- uses: actions/download-artifact@v3
with:
name: cbdb-package
path: /opt/
- name: Run icw-test script
run: |
mkdir /code
cp -a gpdb_src/ /code
cd /code
echo $GITHUB_RUN_ID > gpdb_src/BUILD_NUMBER
gpdb_src/hd-ci/icw_cbdb.bash $FTS_MODE
- uses: actions/upload-artifact@v3
if: failure()
with:
name: cbdb-icw-expandshrink-test-log
path: |
/code/gpdb_src/src/test/isolation2/regression.out
/code/gpdb_src/src/test/isolation2/regression.diffs
/code/gpdb_src/src/test/isolation2/results/
/code/gpdb_src/src/test/isolation2/expected/
/code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/qddir/demoDataDir-1/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast1/demoDataDir0/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast2/demoDataDir1/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast3/demoDataDir2/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror1/demoDataDir0/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror2/demoDataDir1/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror3/demoDataDir2/log/
icw-orca-test:
needs: build
runs-on: [self-hosted, example]
Expand Down
4 changes: 2 additions & 2 deletions gpMgmt/bin/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ SUBDIRS += ifaddrs
$(recurse)

PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \
gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \
gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \
gpstate gpstop minirepro gpmemwatcher gpmemreport gpdemo
Expand Down Expand Up @@ -194,7 +194,7 @@ clean distclean:
rm -rf *.pyc
rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \
gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \
gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \
gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \
gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \
gpstartc gpstatec gpstopc minireproc
rm -f gpconfig_modules/gucs_disallowed_in_file.txt
30 changes: 30 additions & 0 deletions gpMgmt/bin/gpexpand
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,8 @@ class gpexpand:
tblspc_info = {}

for oid in tblspc_oids:
if oid not in tblspc_oid_names:
continue
location = os.path.dirname(os.readlink(os.path.join(coordinator_tblspc_dir,
oid)))
tblspc_info[oid] = {"location": location,
Expand Down Expand Up @@ -1222,6 +1224,15 @@ class gpexpand:
coordinator_tblspc_dir = self.gparray.coordinator.getSegmentTableSpaceDirectory()
if not os.listdir(coordinator_tblspc_dir):
return None

tblspc_oids = os.listdir(coordinator_tblspc_dir)
tblspc_oid_names = self.get_tablespace_oid_names()
flag = False
for oid in tblspc_oids:
if oid in tblspc_oid_names:
flag = True
if not flag:
return None

if not self.options.filename:
raise ExpansionError('Missing tablespace input file')
Expand Down Expand Up @@ -1385,6 +1396,25 @@ class gpexpand:
self.pool.join()
self.pool.check_results()


for i in range(1,12):
flag = True
for segment in newSegments:
if seg.isSegmentMirror() == True:
continue

cmd = Command('pg_isready for segment',
"pg_isready -q -h %s -p %d -d %s" % (segment.getSegmentHostName(), segment.getSegmentPort(), segment.getSegmentDataDirectory()))
cmd.run()
rc = cmd.get_return_code()
if rc != 0:
flag &= False
if flag:
break
time.sleep(10)
self.logger.info("Waiting for segment ready last for %s second" % (i*10))


"""
Build the list of delete statements based on the COORDINATOR_ONLY_TABLES
defined in gpcatalog.py
Expand Down
123 changes: 117 additions & 6 deletions gpMgmt/bin/gppylib/gparray.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __equal(self, other, ignoreAttr=[]):
return True

def __eq__(self, other):
return self.__equal(other)
return self.__equal(other, ['mode'])


def __hash__(self):
Expand Down Expand Up @@ -429,6 +429,9 @@ def __str__(self):
return "(Primary: %s, Mirror: %s)" % (str(self.primaryDB),
str(self.mirrorDB))

def __eq__(self, other):
return self.primaryDB == other.primaryDB and self.mirrorDB == other.mirrorDB

# --------------------------------------------------------------------
def addPrimary(self,segDB):
self.primaryDB=segDB
Expand Down Expand Up @@ -799,6 +802,7 @@ def __init__(self, segments, segmentsAsLoadedFromDb=None):
self.standbyCoordinator = None
self.segmentPairs = []
self.expansionSegmentPairs=[]
self.shrinkSegmentPairs=[]
self.numPrimarySegments = 0

self.recoveredSegmentDbids = []
Expand Down Expand Up @@ -1045,7 +1049,7 @@ def dumpToFile(self, filename):
fp.close()

# --------------------------------------------------------------------
def getDbList(self, includeExpansionSegs=False):
def getDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""
Return a list of all Segment objects that make up the array
"""
Expand All @@ -1054,8 +1058,8 @@ def getDbList(self, includeExpansionSegs=False):
dbs.append(self.coordinator)
if self.standbyCoordinator:
dbs.append(self.standbyCoordinator)
if includeExpansionSegs:
dbs.extend(self.getSegDbList(True))
if includeExpansionSegs or removeShrinkSegs:
dbs.extend(self.getSegDbList(includeExpansionSegs, removeShrinkSegs))
else:
dbs.extend(self.getSegDbList())
return dbs
Expand Down Expand Up @@ -1105,23 +1109,29 @@ def getDbIdToPeerMap(self):


# --------------------------------------------------------------------
def getSegDbList(self, includeExpansionSegs=False):
def getSegDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""Return a list of all Segment objects for all segments in the array"""
dbs=[]
for segPair in self.segmentPairs:
dbs.extend(segPair.get_dbs())
if includeExpansionSegs:
for segPair in self.expansionSegmentPairs:
dbs.extend(segPair.get_dbs())
if removeShrinkSegs:
for segPair in self.shrinkSegmentPairs:
dbs = list(filter(lambda x: segPair.primaryDB != x and segPair.mirrorDB != x, dbs))
return dbs

# --------------------------------------------------------------------
def getSegmentList(self, includeExpansionSegs=False):
def getSegmentList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""Return a list of SegmentPair objects for all segments in the array"""
dbs=[]
dbs.extend(self.segmentPairs)
if includeExpansionSegs:
dbs.extend(self.expansionSegmentPairs)
if removeShrinkSegs:
for segPair in self.shrinkSegmentPairs:
dbs.remove(segPair)
return dbs

# --------------------------------------------------------------------
Expand All @@ -1148,6 +1158,21 @@ def getExpansionSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.expansionSegmentPairs

# --------------------------------------------------------------------
def getShrinkSegDbList(self):
"""Returns a list of all Segment objects that make up the new segments
of an expansion"""
dbs=[]
for segPair in self.shrinkSegmentPairs:
dbs.extend(segPair.get_dbs())
return dbs

# --------------------------------------------------------------------
def getShrinkSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.shrinkSegmentPairs

# --------------------------------------------------------------------
def getSegmentContainingDb(self, db):
Expand All @@ -1164,6 +1189,15 @@ def getExpansionSegmentContainingDb(self, db):
if db.getSegmentDbId() == segDb.getSegmentDbId():
return segPair
return None

# --------------------------------------------------------------------
def getShrinkSegmentContainingDb(self, db):
for segPair in self.shrinkSegmentPairs:
for segDb in segPair.get_dbs():
if db.getSegmentDbId() == segDb.getSegmentDbId():
return segPair
return None

# --------------------------------------------------------------------
def get_invalid_segdbs(self):
dbs=[]
Expand Down Expand Up @@ -1488,6 +1522,37 @@ def addExpansionSeg(self, content, preferred_role, dbid, role,
else:
seg.addMirror(segdb)

# --------------------------------------------------------------------
def addShrinkSeg(self, content, preferred_role, dbid, role,
hostname, address, port, datadir):
"""
Add a segment to the gparray as an shrink segment.
Note: may work better to construct the new Segment in gpshrink and
simply pass it in.
"""

segdb = Segment(content = content,
preferred_role = preferred_role,
dbid = dbid,
role = role,
mode = MODE_SYNCHRONIZED,
status = STATUS_UP,
hostname = hostname,
address = address,
port = port,
datadir = datadir)

if preferred_role == ROLE_PRIMARY:
self.shrinkSegmentPairs.append(SegmentPair())
seg = self.shrinkSegmentPairs[-1]
if seg.primaryDB:
raise Exception('Duplicate content id for primary segment')
seg.addPrimary(segdb)
else:
seg = self.shrinkSegmentPairs[-1]
seg.addMirror(segdb)

# --------------------------------------------------------------------
def reOrderExpansionSegs(self):
"""
Expand Down Expand Up @@ -1595,6 +1660,52 @@ def validateExpansionSegs(self):
else:
used_ports[hostname] = []
used_ports[hostname].append(db.port)

# --------------------------------------------------------------------
def validateShrinkSegs(self):
""" Checks the segments added for various inconsistencies and errors.
"""

# make sure we have added at least one segment
if len(self.shrinkSegmentPairs) == 0:
raise Exception('No shrink segments defined')

totalsize = len(self.segmentPairs)
removesize = len(self.shrinkSegmentPairs)

if removesize >= totalsize:
self.logger.error('removed segment num %d more than or equal to total segment num %d', removesize, totalsize)
exit(1)
elif removesize < 1:
self.logger.error('removed segment num %d less than 1', removesize)
exit(1)

for segPair in self.shrinkSegmentPairs:
if self.hasMirrors:
if segPair.mirrorDB is None:
raise Exception('primaryDB and mirrorDB should be removed simultaneously')

if segPair.primaryDB.content != segPair.mirrorDB.content:
raise Exception('primaryDB content is not equal mirrorDB content')

# If shrinkSegmentPairs not in the segmentPairs raise exception
flag = False
for segPair_ in self.segmentPairs :
if segPair_ == segPair :
flag = True

if flag == False:
raise Exception('Shrink segments not in the gp_segment_configuration table')

# If shrinkSegmentPairs is not the last n segment.
self.shrinkSegmentPairs.sort(key=lambda segPair: segPair.primaryDB.content)

if self.shrinkSegmentPairs[-1].primaryDB.content != self.get_max_contentid():
raise Exception('please remove segment from max contentid')

if self.shrinkSegmentPairs[0].primaryDB.content != self.get_max_contentid()-len(self.shrinkSegmentPairs)+1:
raise Exception('please remove segment in continuous contentid')


# --------------------------------------------------------------------
def addExpansionHosts(self, hosts, mirror_type):
Expand Down
2 changes: 1 addition & 1 deletion gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary):
self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()])

# 'goalsegmap' reflects the desired state of the catalog
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)])
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True, removeShrinkSegs=True)])

# find mirrors and primaries to remove
self.mirror_to_remove = [
Expand Down
Loading

0 comments on commit 9aa6272

Please sign in to comment.