-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: Add a new way to store metadata of regions #1237
Changes from 4 commits
8aad56c
1a741f7
abcb224
cbcd851
d48d655
308cdde
cb41f3d
9a0360e
54187e2
ed35f84
da984d2
1241a98
9f31426
547dd57
f50e3ca
155ebb5
81508b9
4db127e
1b02452
454c27b
fa03500
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
"github.com/pingcap/kvproto/pkg/metapb" | ||
"github.com/pingcap/kvproto/pkg/pdpb" | ||
"github.com/pingcap/pd/pkg/error_code" | ||
|
@@ -55,8 +56,10 @@ type RaftCluster struct { | |
|
||
coordinator *coordinator | ||
|
||
wg sync.WaitGroup | ||
quit chan struct{} | ||
wg sync.WaitGroup | ||
quit chan struct{} | ||
clients map[string]pdpb.PDClient | ||
regionSyncer *regionSyncer | ||
} | ||
|
||
// ClusterStatus saves some state information | ||
|
@@ -66,10 +69,12 @@ type ClusterStatus struct { | |
|
||
func newRaftCluster(s *Server, clusterID uint64) *RaftCluster { | ||
return &RaftCluster{ | ||
s: s, | ||
running: false, | ||
clusterID: clusterID, | ||
clusterRoot: s.getClusterRootPath(), | ||
s: s, | ||
running: false, | ||
clusterID: clusterID, | ||
clusterRoot: s.getClusterRootPath(), | ||
clients: make(map[string]pdpb.PDClient), | ||
regionSyncer: newRegionSyncer(s), | ||
} | ||
} | ||
|
||
|
@@ -115,15 +120,55 @@ func (c *RaftCluster) start() error { | |
c.cachedCluster.regionStats = newRegionStatistics(c.s.scheduleOpt, c.s.classifier) | ||
c.quit = make(chan struct{}) | ||
|
||
c.wg.Add(2) | ||
c.wg.Add(3) | ||
go c.runCoordinator() | ||
go c.runBackgroundJobs(backgroundJobInterval) | ||
|
||
go c.runSyncRegions() | ||
c.running = true | ||
|
||
return nil | ||
} | ||
|
||
func (c *RaftCluster) runSyncRegions() { | ||
defer logutil.LogPanic() | ||
defer c.wg.Done() | ||
var requests []*metapb.Region | ||
// grpc has limit on message size | ||
maxBatchSize := 100 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better to use a constant? |
||
for { | ||
select { | ||
case <-c.quit: | ||
return | ||
case first := <-c.cachedCluster.getChangedRegions(): | ||
requests = append(requests, first.GetMeta()) | ||
pending := len(c.cachedCluster.getChangedRegions()) | ||
for i := 0; i < pending && i < maxBatchSize; i++ { | ||
region := <-c.cachedCluster.getChangedRegions() | ||
requests = append(requests, region.GetMeta()) | ||
} | ||
msg := &pdpb.MetaRegions{ | ||
Count: uint32(len(requests)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. em, do we really need the |
||
Regions: requests} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to put '}' in next new line... |
||
data, err := proto.Marshal(msg) | ||
if err != nil { | ||
log.Errorf("Report regions meet error: %s", err) | ||
continue | ||
} | ||
req := &pdpb.SyncRegionResponse{ | ||
Header: &pdpb.ResponseHeader{ClusterId: c.s.clusterID}, | ||
Data: data, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why marshal region metas instead of including |
||
} | ||
for _, stream := range c.regionSyncer.streams { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Read |
||
err := stream.Send(req) | ||
if err != nil { | ||
log.Errorf("Report regions meet error: %s", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to close the stream and let followers to restart a new stream? |
||
} | ||
} | ||
} | ||
requests = requests[:0] | ||
} | ||
} | ||
|
||
func (c *RaftCluster) runCoordinator() { | ||
defer logutil.LogPanic() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,8 @@ type Config struct { | |
|
||
Namespace map[string]NamespaceConfig `json:"namespace"` | ||
|
||
PDServerCfg PDServerConfig `toml:"pd-server" json:"pd-server"` | ||
|
||
ClusterVersion semver.Version `json:"cluster-version"` | ||
|
||
// QuotaBackendBytes Raise alarms when backend size exceeds the given quota. 0 means use the default quota. | ||
|
@@ -633,6 +635,12 @@ func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) { | |
return tlsConfig, nil | ||
} | ||
|
||
// PDServerConfig is the configuration for pd server. | ||
type PDServerConfig struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this? Can we put There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use it to persist the config of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean by persist to |
||
// EnableRegionStorage enable the independent region storage. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. enables and remove the extra space |
||
EnableRegionStorage bool `toml:"enable-region-storage" json:"enable-region-storage"` | ||
} | ||
|
||
// StoreLabel is the config item of LabelPropertyConfig. | ||
type StoreLabel struct { | ||
Key string `toml:"key" json:"key"` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?