diff --git a/contrib/integration/testtxn/main_test.go b/contrib/integration/testtxn/main_test.go index e2258ed7e20..3259d5f9462 100644 --- a/contrib/integration/testtxn/main_test.go +++ b/contrib/integration/testtxn/main_test.go @@ -757,7 +757,7 @@ func TestCountIndexConcurrentTxns(t *testing.T) { require.Error(t, err, "the txn2 should be aborted due to concurrent update on the count index of <0x01>") - // retry the mutatiton + // retry the mutation txn3 := dg.NewTxn() _, err = txn3.Mutate(ctxb, &mu) x.Check(err) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 3984c297b18..77cfa58bdd8 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -138,7 +138,7 @@ they form a Raft group and provide synchronous replication. flag.String("my", "", "IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.") flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc), - "IP_ADDRESS:PORT of a Dgraph Zero.") + "Comma separated list of Dgraph zero addresses of the form IP_ADDRESS:PORT.") flag.Uint64("idx", 0, "Optional Raft ID that this Dgraph Alpha will use to join RAFT groups.") flag.Int("max_retries", -1, @@ -575,7 +575,7 @@ func run() { NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"), Tracing: Alpha.Conf.GetFloat64("trace"), MyAddr: Alpha.Conf.GetString("my"), - ZeroAddr: Alpha.Conf.GetString("zero"), + ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","), RaftId: cast.ToUint64(Alpha.Conf.GetString("idx")), WhiteListedIPRanges: ips, MaxRetries: Alpha.Conf.GetInt("max_retries"), diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 1ee488471a9..d7eaa3ba1c6 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -105,7 +105,7 @@ func (r *reducer) run() error { func (r *reducer) createBadger(i int) *badger.DB { if r.opt.BadgerKeyFile != "" { // Need to set zero addr in WorkerConfig before checking the license. - x.WorkerConfig.ZeroAddr = r.opt.ZeroAddr + x.WorkerConfig.ZeroAddr = []string{r.opt.ZeroAddr} if !worker.EnterpriseEnabled() { // Crash since the enterprise license is not enabled.. diff --git a/worker/groups.go b/worker/groups.go index aa5ef180746..7896dfd675c 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -87,8 +87,11 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { } x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.") - x.AssertTruef(x.WorkerConfig.ZeroAddr != x.WorkerConfig.MyAddr, - "Dgraph Zero address and Dgraph address (IP:Port) can't be the same.") + for _, zeroAddr := range x.WorkerConfig.ZeroAddr { + x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr, + "Dgraph Zero address %s and Dgraph address (IP:Port) %s can't be the same.", + zeroAddr, x.WorkerConfig.MyAddr) + } if x.WorkerConfig.RaftId == 0 { id, err := raftwal.RaftId(walStore) @@ -113,6 +116,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { } var connState *pb.ConnectionState var err error + for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 pl := gr.connToZeroLeader() if pl == nil { @@ -646,14 +650,19 @@ func (g *groupi) connToZeroLeader() *conn.Pool { // No leader found. Let's get the latest membership state from Zero. delay := connBaseDelay maxHalfDelay := time.Second - for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 time.Sleep(delay) if delay <= maxHalfDelay { delay *= 2 } + + zAddrList := x.WorkerConfig.ZeroAddr + // Pick addresses in round robin manner. + addr := zAddrList[i%len(zAddrList)] + pl := g.AnyServer(0) if pl == nil { - pl = conn.GetPools().Connect(x.WorkerConfig.ZeroAddr) + pl = conn.GetPools().Connect(addr) } if pl == nil { glog.V(1).Infof("No healthy Zero server found. Retrying...") diff --git a/x/config.go b/x/config.go index a43de03e3ad..cdd85acf27f 100644 --- a/x/config.go +++ b/x/config.go @@ -53,8 +53,10 @@ type WorkerOptions struct { Tracing float64 // MyAddr stores the address and port for this alpha. MyAddr string - // ZeroAddr stores the address and port for the zero instance associated with this alpha. - ZeroAddr string + // ZeroAddr stores the list of address:port for the zero instances associated with this alpha. + // Alpha would communicate via only one zero address from the list. All + // the other addresses serve as fallback. + ZeroAddr []string // RaftId represents the id of this alpha instance for participating in the RAFT // consensus protocol. RaftId uint64