Skip to content

Commit

Permalink
comment(lock): add lock comments (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
LXPWing authored Oct 28, 2021
1 parent b3a9740 commit b695318
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 0 deletions.
11 changes: 11 additions & 0 deletions components/lock/etcd/etcd_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"mosn.io/pkg/log"
)

// Etcd lock store
type EtcdLock struct {
client *clientv3.Client
metadata utils.EtcdMetadata
Expand All @@ -44,6 +45,7 @@ func NewEtcdLock(logger log.ErrorLogger) *EtcdLock {
return s
}

// Init EtcdLock
func (e *EtcdLock) Init(metadata lock.Metadata) error {
// 1. parse config
m, err := utils.ParseEtcdMetadata(metadata.Properties)
Expand All @@ -61,10 +63,12 @@ func (e *EtcdLock) Init(metadata lock.Metadata) error {
return err
}

// Features is to get EtcdLock's features
func (e *EtcdLock) Features() []lock.Feature {
return e.features
}

// Node tries to acquire a etcd lock
func (e *EtcdLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
var leaseId clientv3.LeaseID
//1.Create new lease
Expand Down Expand Up @@ -95,14 +99,18 @@ func (e *EtcdLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, err
}, nil
}

// Node tries to release a etcd lock
func (e *EtcdLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
key := e.getKey(req.ResourceId)

// 1.Create new KV
kv := clientv3.NewKV(e.client)
// 2.Create txn
txn := kv.Txn(e.ctx)
txn.If(clientv3.Compare(clientv3.Value(key), "=", req.LockOwner)).Then(
clientv3.OpDelete(key)).Else(
clientv3.OpGet(key))
// 3.Commit and try release lock
txnResponse, err := txn.Commit()
if err != nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[etcdLock]: Unlock returned error: %s.ResourceId: %s", err, req.ResourceId)
Expand All @@ -120,16 +128,19 @@ func (e *EtcdLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error)
}
}

// Close shuts down the client's etcd connections.
func (e *EtcdLock) Close() error {
e.cancel()

return e.client.Close()
}

// getkey is to return string of type KeyPrefix + resourceId
func (e *EtcdLock) getKey(resourceId string) string {
return fmt.Sprintf("%s%s", e.metadata.KeyPrefix, resourceId)
}

// newInternalErrorUnlockResponse is to return lock release error
func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.INTERNAL_ERROR,
Expand Down
4 changes: 4 additions & 0 deletions components/lock/lock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
package lock

type LockStore interface {
// Init lock
Init(metadata Metadata) error
// Get lock's features
Features() []Feature
// Node tries to acquire a lock
TryLock(req *TryLockRequest) (*TryLockResponse, error)
// Node tries to release a lock
Unlock(req *UnlockRequest) (*UnlockResponse, error)
}
8 changes: 8 additions & 0 deletions components/lock/redis/standalone_redis_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewStandaloneRedisLock(logger log.ErrorLogger) *StandaloneRedisLock {
return s
}

// Init StandaloneRedisLock
func (p *StandaloneRedisLock) Init(metadata lock.Metadata) error {
// 1. parse config
m, err := utils.ParseRedisMetadata(metadata.Properties)
Expand All @@ -63,15 +64,19 @@ func (p *StandaloneRedisLock) Init(metadata lock.Metadata) error {
return err
}

// Features is to get StandaloneRedisLock's features
func (p *StandaloneRedisLock) Features() []lock.Feature {
return p.features
}

// Node tries to acquire a redis lock
func (p *StandaloneRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
// 1.Setting redis expiration time
nx := p.client.SetNX(p.ctx, req.ResourceId, req.LockOwner, time.Second*time.Duration(req.Expire))
if nx == nil {
return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceId: %s", req.ResourceId)
}
// 2. check error
err := nx.Err()
if err != nil {
return &lock.TryLockResponse{}, err
Expand All @@ -84,6 +89,7 @@ func (p *StandaloneRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockRe

const unlockScript = "local v = redis.call(\"get\",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call(\"del\",KEYS[1]) end"

// Node tries to release a redis lock
func (p *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
// 1. delegate to client.eval lua script
eval := p.client.Eval(p.ctx, unlockScript, []string{req.ResourceId}, req.LockOwner)
Expand Down Expand Up @@ -115,12 +121,14 @@ func (p *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockRespo
}, nil
}

// newInternalErrorUnlockResponse is to return lock release error
func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.INTERNAL_ERROR,
}
}

// Close shuts down the client's redis connections.
func (p *StandaloneRedisLock) Close() error {
p.cancel()

Expand Down
7 changes: 7 additions & 0 deletions components/lock/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,42 @@ package lock

type Feature string

// Lock's metadata
type Config struct {
Metadata map[string]string `json:"metadata"`
}

// Lock's properties
type Metadata struct {
Properties map[string]string `json:"properties"`
}

// Lock acquire request
type TryLockRequest struct {
ResourceId string
LockOwner string
Expire int32
}

// Lock acquire request was successful or not
type TryLockResponse struct {
Success bool
}

// Lock release request
type UnlockRequest struct {
ResourceId string
LockOwner string
}

// Status when releasing the lock
type UnlockResponse struct {
Status LockStatus
}

type LockStatus int32

// lock status
const (
SUCCESS LockStatus = 0
LOCK_UNEXIST LockStatus = 1
Expand Down
8 changes: 8 additions & 0 deletions components/lock/zookeeper/zookeeper_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"
)

// Zookeeper lock store
type ZookeeperLock struct {
//trylock reestablish connection every time
factory utils.ConnectionFactory
Expand All @@ -31,13 +32,15 @@ type ZookeeperLock struct {
logger log.ErrorLogger
}

// Create ZookeeperLock
func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {
lock := &ZookeeperLock{
logger: logger,
}
return lock
}

// Init ZookeeperLock
func (p *ZookeeperLock) Init(metadata lock.Metadata) error {

m, err := utils.ParseZookeeperMetadata(metadata.Properties)
Expand All @@ -57,9 +60,12 @@ func (p *ZookeeperLock) Init(metadata lock.Metadata) error {
return nil
}

// Features is to get ZookeeperLock's features
func (p *ZookeeperLock) Features() []lock.Feature {
return nil
}

// Node tries to acquire a zookeeper lock
func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {

conn, err := p.factory.NewConnection(time.Duration(req.Expire)*time.Second, p.metadata)
Expand Down Expand Up @@ -97,6 +103,8 @@ func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse
}, nil

}

// Node tries to release a zookeeper lock
func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {

conn := p.unlockConn
Expand Down

0 comments on commit b695318

Please sign in to comment.