Skip to content
This repository has been archived by the owner on Sep 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #253 from sgotti/renew_lease_if_expired
Browse files Browse the repository at this point in the history
server: create new lease if expired
  • Loading branch information
barakmich authored Jun 13, 2016
2 parents dc28574 + 4c377de commit b00c7f7
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 11 deletions.
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,7 @@ var (

// ErrLocked is returned if the resource is locked.
ErrLocked = errors.New("torus: locked")

// ErrLeaseNotFound is returned if the lease cannot be found.
ErrLeaseNotFound = errors.New("torus: lease not found")
)
12 changes: 8 additions & 4 deletions heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *Server) BeginHeartbeat(addr *url.URL) error {
s.peerInfo.Address = advertiseURI.String()
}
var err error
s.lease, err = s.MDS.GetLease()
err = s.createOrRenewLease(context.Background())
if err != nil {
return err
}
Expand Down Expand Up @@ -89,11 +89,15 @@ func (s *Server) oneHeartbeat() {
s.peerInfo.UsedBlocks = s.Blocks.UsedBlocks()
s.mut.Unlock()

s.infoMut.Lock()
defer s.infoMut.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), heartbeatTimeout)
defer cancel()
err := s.MDS.WithContext(ctx).RegisterPeer(s.lease, s.peerInfo)
err := s.createOrRenewLease(ctx)
if err != nil {
clog.Warningf("failed to create or renew lease: %s", err)
}
s.infoMut.Lock()
defer s.infoMut.Unlock()
err = s.MDS.WithContext(ctx).RegisterPeer(s.lease, s.peerInfo)
if err != nil {
clog.Warningf("couldn't register heartbeat: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type MetadataService interface {
WithContext(ctx context.Context) MetadataService

GetLease() (int64, error)
RenewLease(int64) error
RegisterPeer(lease int64, pi *models.PeerInfo) error
GetPeers() (PeerInfoList, error)

Expand Down
18 changes: 13 additions & 5 deletions metadata/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/coreos/torus/ring"

etcdv3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/pkg/capnslog"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -190,11 +191,6 @@ func (c *etcdCtx) RegisterPeer(lease int64, p *models.PeerInfo) error {
}

lid := etcdv3.LeaseID(lease)
resp, err := c.etcd.Client.KeepAliveOnce(c.getContext(), lid)
if err != nil {
return err
}
clog.Tracef("updated lease for %d, TTL %d", resp.ID, resp.TTL)
_, err = c.etcd.Client.Put(
c.getContext(), MkKey("nodes", p.UUID), string(data), etcdv3.WithLease(lid))
return err
Expand Down Expand Up @@ -345,6 +341,18 @@ func (c *etcdCtx) GetLease() (int64, error) {
return int64(resp.ID), nil
}

func (c *etcdCtx) RenewLease(lease int64) error {
lid := etcdv3.LeaseID(lease)
resp, err := c.etcd.Client.KeepAliveOnce(c.getContext(), lid)
if err != nil {
if err == rpctypes.ErrLeaseNotFound {
return torus.ErrLeaseNotFound
}
return err
}
clog.Tracef("updated lease for %d, TTL %d", resp.ID, resp.TTL)
return nil
}
func (c *etcdCtx) GetRing() (torus.Ring, error) {
r, _, err := c.getRing()
return r, err
Expand Down
4 changes: 3 additions & 1 deletion metadata/temp/temp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (t *Client) UUID() string {
return t.uuid
}

func (t *Client) GetLease() (int64, error) { return 1, nil }
func (t *Client) GetLease() (int64, error) { return 1, nil }
func (t *Client) RenewLease(lease int64) error { return nil }

func (t *Client) GetPeers() (torus.PeerInfoList, error) {
t.srv.mut.Lock()
defer t.srv.mut.Unlock()
Expand Down
20 changes: 19 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,31 @@ type Server struct {
peerInfo *models.PeerInfo
ctx context.Context

lease int64
lease int64
leaseMut sync.Mutex

heartbeating bool
ReplicationOpen bool
timeoutCallbacks []func(string)
}

func (s *Server) createOrRenewLease(ctx context.Context) error {
s.infoMut.Lock()
defer s.infoMut.Unlock()
if s.lease != 0 {
err := s.MDS.WithContext(ctx).RenewLease(s.lease)
if err == nil {
return nil
}
}
var err error
s.lease, err = s.MDS.WithContext(ctx).GetLease()
return err
}

func (s *Server) Lease() int64 {
s.infoMut.Lock()
defer s.infoMut.Unlock()
return s.lease
}

Expand Down

0 comments on commit b00c7f7

Please sign in to comment.