Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support comma separated list of zero addresses in alpha #5116

Merged
merged 3 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestCountIndexConcurrentTxns(t *testing.T) {
require.Error(t, err,
"the txn2 should be aborted due to concurrent update on the count index of <0x01>")

// retry the mutatiton
// retry the mutation
txn3 := dg.NewTxn()
_, err = txn3.Mutate(ctxb, &mu)
x.Check(err)
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ they form a Raft group and provide synchronous replication.
flag.String("my", "",
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
"IP_ADDRESS:PORT of a Dgraph Zero.")
"Comma separated list of Dgraph zero addresses of the form IP_ADDRESS:PORT.")
flag.Uint64("idx", 0,
"Optional Raft ID that this Dgraph Alpha will use to join RAFT groups.")
flag.Int("max_retries", -1,
Expand Down Expand Up @@ -575,7 +575,7 @@ func run() {
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
MyAddr: Alpha.Conf.GetString("my"),
ZeroAddr: Alpha.Conf.GetString("zero"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
RaftId: cast.ToUint64(Alpha.Conf.GetString("idx")),
WhiteListedIPRanges: ips,
MaxRetries: Alpha.Conf.GetInt("max_retries"),
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *reducer) run() error {
func (r *reducer) createBadger(i int) *badger.DB {
if r.opt.BadgerKeyFile != "" {
// Need to set zero addr in WorkerConfig before checking the license.
x.WorkerConfig.ZeroAddr = r.opt.ZeroAddr
x.WorkerConfig.ZeroAddr = []string{r.opt.ZeroAddr}

if !worker.EnterpriseEnabled() {
// Crash since the enterprise license is not enabled..
Expand Down
17 changes: 13 additions & 4 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}

x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.")
x.AssertTruef(x.WorkerConfig.ZeroAddr != x.WorkerConfig.MyAddr,
"Dgraph Zero address and Dgraph address (IP:Port) can't be the same.")
for _, zeroAddr := range x.WorkerConfig.ZeroAddr {
x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr,
jarifibrahim marked this conversation as resolved.
Show resolved Hide resolved
"Dgraph Zero address %d and Dgraph address (IP:Port) %d can't be the same.",
zeroAddr, x.WorkerConfig.MyAddr)
}

if x.WorkerConfig.RaftId == 0 {
id, err := raftwal.RaftId(walStore)
Expand All @@ -113,6 +116,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}
var connState *pb.ConnectionState
var err error

for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
pl := gr.connToZeroLeader()
if pl == nil {
Expand Down Expand Up @@ -646,14 +650,19 @@ func (g *groupi) connToZeroLeader() *conn.Pool {
// No leader found. Let's get the latest membership state from Zero.
delay := connBaseDelay
maxHalfDelay := time.Second
for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
time.Sleep(delay)
if delay <= maxHalfDelay {
delay *= 2
}

zAddrList := x.WorkerConfig.ZeroAddr
// Pick addresses in round robin manner.
addr := zAddrList[i%len(zAddrList)]

pl := g.AnyServer(0)
if pl == nil {
pl = conn.GetPools().Connect(x.WorkerConfig.ZeroAddr)
pl = conn.GetPools().Connect(addr)
}
if pl == nil {
glog.V(1).Infof("No healthy Zero server found. Retrying...")
Expand Down
6 changes: 4 additions & 2 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type WorkerOptions struct {
Tracing float64
// MyAddr stores the address and port for this alpha.
MyAddr string
// ZeroAddr stores the address and port for the zero instance associated with this alpha.
ZeroAddr string
// ZeroAddr stores the list of address:port for the zero instances associated with this alpha.
// Alpha would communicate via only one zero address from the list. All
// the other addresses serve as fallback.
ZeroAddr []string
// RaftId represents the id of this alpha instance for participating in the RAFT
// consensus protocol.
RaftId uint64
Expand Down