Skip to content

Commit

Permalink
Support comma separated list of zero addresses in alpha (#5116)
Browse files Browse the repository at this point in the history
Currently, alpha supports specifying only a single zero instance
address via the `--zero` flag in `dgraph alpha` command. This commit
adds supports for a comma-separated list of `zero` addresses in
`dgraph alpha` so that any zero address can be used from the list
of zero addresses if one of them is unavailable.

We'll pick zero addresses starting from the first one in the list and
try to establish a connection to it. If it fails, we'll pick
the next zero address.

The `dgraph bulk/live` command accepts only a single zero address.
The comma-separated list is only for `dgraph alpha` command.

(cherry picked from commit 82adb16)
  • Loading branch information
Ibrahim Jarif authored and jarifibrahim committed Apr 21, 2020
1 parent 6b57f23 commit ad82923
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 14 deletions.
2 changes: 1 addition & 1 deletion contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func TestCountIndexConcurrentTxns(t *testing.T) {
require.Error(t, err,
"the txn2 should be aborted due to concurrent update on the count index of <0x01>")

// retry the mutatiton
// retry the mutation
txn3 := dg.NewTxn()
_, err = txn3.Mutate(ctxb, &mu)
x.Check(err)
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ they form a Raft group and provide synchronous replication.
flag.String("my", "",
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
"IP_ADDRESS:PORT of a Dgraph Zero.")
"Comma separated list of Dgraph zero addresses of the form IP_ADDRESS:PORT.")
flag.Uint64("idx", 0,
"Optional Raft ID that this Dgraph Alpha will use to join RAFT groups.")
flag.Int("max_retries", -1,
Expand Down Expand Up @@ -571,7 +571,7 @@ func run() {
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
MyAddr: Alpha.Conf.GetString("my"),
ZeroAddr: Alpha.Conf.GetString("zero"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
RaftId: cast.ToUint64(Alpha.Conf.GetString("idx")),
WhiteListedIPRanges: ips,
MaxRetries: Alpha.Conf.GetInt("max_retries"),
Expand Down
8 changes: 3 additions & 5 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,12 @@ func (r *reducer) run() error {

func (r *reducer) createBadger(i int) *badger.DB {
if r.opt.BadgerKeyFile != "" {
// need to set zero addr in WorkerConfig before doing license check.
x.WorkerConfig.ZeroAddr = r.opt.ZeroAddr
// non-nil key file
// Need to set zero addr in WorkerConfig before checking the license.
x.WorkerConfig.ZeroAddr = []string{r.opt.ZeroAddr}
if !worker.EnterpriseEnabled() {
// not licensed --> crash.
// Crash since the enterprise license is not enabled..
log.Fatal("Enterprise License needed for the Encryption feature.")
} else {
// licensed --> OK.
log.Printf("Encryption feature enabled. Using encryption key file: %v", r.opt.BadgerKeyFile)
}
}
Expand Down
17 changes: 13 additions & 4 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}

x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.")
x.AssertTruef(x.WorkerConfig.ZeroAddr != x.WorkerConfig.MyAddr,
"Dgraph Zero address and Dgraph address (IP:Port) can't be the same.")
for _, zeroAddr := range x.WorkerConfig.ZeroAddr {
x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr,
"Dgraph Zero address %s and Dgraph address (IP:Port) %s can't be the same.",
zeroAddr, x.WorkerConfig.MyAddr)
}

if x.WorkerConfig.RaftId == 0 {
id, err := raftwal.RaftId(walStore)
Expand All @@ -114,6 +117,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}
var connState *pb.ConnectionState
var err error

for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
pl := gr.connToZeroLeader()
if pl == nil {
Expand Down Expand Up @@ -630,14 +634,19 @@ func (g *groupi) connToZeroLeader() *conn.Pool {
// No leader found. Let's get the latest membership state from Zero.
delay := connBaseDelay
maxHalfDelay := time.Second
for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
time.Sleep(delay)
if delay <= maxHalfDelay {
delay *= 2
}

zAddrList := x.WorkerConfig.ZeroAddr
// Pick addresses in round robin manner.
addr := zAddrList[i%len(zAddrList)]

pl := g.AnyServer(0)
if pl == nil {
pl = conn.GetPools().Connect(x.WorkerConfig.ZeroAddr)
pl = conn.GetPools().Connect(addr)
}
if pl == nil {
glog.V(1).Infof("No healthy Zero server found. Retrying...")
Expand Down
6 changes: 4 additions & 2 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type WorkerOptions struct {
Tracing float64
// MyAddr stores the address and port for this alpha.
MyAddr string
// ZeroAddr stores the address and port for the zero instance associated with this alpha.
ZeroAddr string
// ZeroAddr stores the list of address:port for the zero instances associated with this alpha.
// Alpha would communicate via only one zero address from the list. All
// the other addresses serve as fallback.
ZeroAddr []string
// RaftId represents the id of this alpha instance for participating in the RAFT
// consensus protocol.
RaftId uint64
Expand Down

0 comments on commit ad82923

Please sign in to comment.