Skip to content

Commit

Permalink
Merge pull request #1336
Browse files Browse the repository at this point in the history
* rmv wm lock table

* fix unit tests

* rmv write_locks table

* rmv gorm tag

* empty commit
  • Loading branch information
Hitenjain14 authored Nov 29, 2023
1 parent c934e10 commit c72a891
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 121 deletions.
71 changes: 25 additions & 46 deletions code/go/0chain.net/blobbercore/writemarker/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package writemarker

import (
"context"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/errors"
"github.com/0chain/gosdk/constants"
"go.uber.org/zap"
"gorm.io/gorm"
)

// LockStatus lock status
Expand All @@ -28,6 +27,11 @@ type LockResult struct {
CreatedAt int64 `json:"created_at,omitempty"`
}

var (
lockPool = make(map[string]*WriteLock)
lockMutex sync.Mutex
)

// Mutex WriteMarker mutex
type Mutex struct {
// ML MapLocker
Expand All @@ -51,45 +55,28 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*L
l, _ := m.ML.GetLock(allocationID)
l.Lock()
defer l.Unlock()

db := datastore.GetStore().GetTransaction(ctx)

var lock WriteLock
err := db.Table(TableNameWriteLock).Where("allocation_id=?", allocationID).First(&lock).Error
if err != nil {
lockMutex.Lock()
defer lockMutex.Unlock()
lock, ok := lockPool[allocationID]
if !ok {
// new lock
logging.Logger.Info("Creating new lock")
if errors.Is(err, gorm.ErrRecordNotFound) {
lock = WriteLock{
AllocationID: allocationID,
ConnectionID: connectionID,
CreatedAt: time.Now(),
}

err = db.Table(TableNameWriteLock).Create(&lock).Error
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

return &LockResult{
Status: LockStatusOK,
CreatedAt: lock.CreatedAt.Unix(),
}, nil
lock = &WriteLock{
CreatedAt: time.Now(),
ConnectionID: connectionID,
}
logging.Logger.Error("Could not create lock")
//native postgres error
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
lockPool[allocationID] = lock
return &LockResult{
Status: LockStatusOK,
CreatedAt: lock.CreatedAt.Unix(),
}, nil
}

if lock.ConnectionID != connectionID {
if time.Since(lock.CreatedAt) > config.Configuration.WriteMarkerLockTimeout {
if time.Since(lock.CreatedAt) > config.Configuration.WriteMarkerLockTimeout || lock.ConnectionID == "" {
// Lock expired. Provide lock to other connection id
lock.ConnectionID = connectionID
lock.CreatedAt = time.Now()
err = db.Model(&WriteLock{}).Where("allocation_id=?", allocationID).Save(&lock).Error
if err != nil {
return nil, errors.New("db_error", err.Error())
}
return &LockResult{
Status: LockStatusOK,
CreatedAt: lock.CreatedAt.Unix(),
Expand All @@ -103,10 +90,6 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*L
}

lock.CreatedAt = time.Now()
err = db.Table(TableNameWriteLock).Where("allocation_id=?", allocationID).Save(&lock).Error
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

return &LockResult{
Status: LockStatusOK,
Expand All @@ -118,16 +101,12 @@ func (*Mutex) Unlock(ctx context.Context, allocationID string, connectionID stri
if allocationID == "" || connectionID == "" {
return nil
}

db := datastore.GetStore().GetTransaction(ctx)

err := db.Exec("DELETE FROM write_locks WHERE allocation_id = ? and connection_id = ? ", allocationID, connectionID).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return errors.ThrowLog(err.Error(), common.ErrBadDataStore)
lockMutex.Lock()
defer lockMutex.Unlock()
lock, ok := lockPool[allocationID]
// reset lock if connection id matches
if ok && lock.ConnectionID == connectionID {
lock.ConnectionID = ""
}

return nil
}
57 changes: 16 additions & 41 deletions code/go/0chain.net/blobbercore/writemarker/mutext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
gomocket "github.com/selvatico/go-mocket"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -56,16 +55,10 @@ func TestMutext_LockShouldWork(t *testing.T) {
connectionID: "lock_same_connection_id",
requestTime: now,
mock: func() {
gomocket.Catcher.NewMock().
WithQuery(`SELECT * FROM "write_locks" WHERE allocation_id=$1 ORDER BY "write_locks"."allocation_id" LIMIT 1`).
WithArgs("lock_same_allocation_id").
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_same_allocation_id",
"connection_id": "lock_same_connection_id",
"created_at": now,
},
})
lockPool["lock_same_allocation_id"] = &WriteLock{
CreatedAt: now,
ConnectionID: "lock_same_connection_id",
}
},
assert: func(test *testing.T, r *LockResult, err error) {
require.Nil(test, err)
Expand All @@ -79,16 +72,10 @@ func TestMutext_LockShouldWork(t *testing.T) {
connectionID: "lock_pending_connection_id",
requestTime: time.Now(),
mock: func() {
gomocket.Catcher.NewMock().
WithQuery(`SELECT * FROM "write_locks" WHERE allocation_id=$1 ORDER BY "write_locks"."allocation_id" LIMIT 1`).
WithArgs("lock_allocation_id").
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_allocation_id",
"connection_id": "lock_connection_id",
"created_at": time.Now().Add(-5 * time.Second),
},
})
lockPool["lock_allocation_id"] = &WriteLock{
CreatedAt: time.Now().Add(-5 * time.Second),
ConnectionID: "lock_connection_id",
}
},
assert: func(test *testing.T, r *LockResult, err error) {
require.Nil(test, err)
Expand All @@ -101,16 +88,10 @@ func TestMutext_LockShouldWork(t *testing.T) {
connectionID: "lock_timeout_2nd_connection_id",
requestTime: now,
mock: func() {
gomocket.Catcher.NewMock().
WithQuery(`SELECT * FROM "write_locks" WHERE allocation_id=$1 ORDER BY "write_locks"."allocation_id" LIMIT 1`).
WithArgs("lock_timeout_allocation_id").
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_timeout_allocation_id",
"connection_id": "lock_timeout_1st_connection_id",
"created_at": time.Now().Add(31 * time.Second),
},
})
lockPool["lock_timeout_allocation_id"] = &WriteLock{
CreatedAt: time.Now().Add(31 * time.Second),
ConnectionID: "lock_timeout_1st_connection_id",
}
},
assert: func(test *testing.T, r *LockResult, err error) {
require.Nil(test, err)
Expand All @@ -123,16 +104,10 @@ func TestMutext_LockShouldWork(t *testing.T) {
connectionID: "lock_same_timeout_connection_id",
requestTime: now,
mock: func() {
gomocket.Catcher.NewMock().
WithQuery(`SELECT * FROM "write_locks" WHERE allocation_id=$1 ORDER BY "write_locks"."allocation_id" LIMIT 1`).
WithArgs("lock_same_timeout_allocation_id").
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_same_timeout_allocation_id",
"connection_id": "lock_same_timeout_connection_id",
"created_at": now.Add(-config.Configuration.WriteMarkerLockTimeout),
},
})
lockPool["lock_same_timeout_allocation_id"] = &WriteLock{
CreatedAt: now.Add(-config.Configuration.WriteMarkerLockTimeout),
ConnectionID: "lock_same_timeout_connection_id",
}
},
assert: func(test *testing.T, r *LockResult, err error) {
require.Nil(test, err)
Expand Down
14 changes: 2 additions & 12 deletions code/go/0chain.net/blobbercore/writemarker/write_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,8 @@ package writemarker

import "time"

const (
TableNameWriteLock = "write_locks"
)

// WriteLock WriteMarker lock
type WriteLock struct {
AllocationID string `gorm:"column:allocation_id;size:64;primaryKey"`
ConnectionID string `gorm:"column:connection_id;size:64"`
CreatedAt time.Time `gorm:"column:created_at"`
}

// TableName get table name of migrate
func (WriteLock) TableName() string {
return TableNameWriteLock
ConnectionID string
CreatedAt time.Time
}
22 changes: 0 additions & 22 deletions goose/migrations/1698861371_full_db_snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -447,19 +447,6 @@ ALTER TABLE terms_id_seq OWNER TO blobber_user;
ALTER SEQUENCE terms_id_seq OWNED BY terms.id;


--
-- Name: write_locks; Type: TABLE; Schema: public; Owner: blobber_user
--

CREATE TABLE write_locks (
allocation_id character varying(64) NOT NULL,
connection_id character varying(64),
created_at timestamp with time zone
);


ALTER TABLE write_locks OWNER TO blobber_user;

--
-- Name: write_markers; Type: TABLE; Schema: public; Owner: blobber_user
--
Expand Down Expand Up @@ -737,15 +724,6 @@ ALTER TABLE ONLY settings
ALTER TABLE ONLY terms
ADD CONSTRAINT terms_pkey PRIMARY KEY (id);


--
-- Name: write_locks write_locks_pkey; Type: CONSTRAINT; Schema: public; Owner: blobber_user
--

ALTER TABLE ONLY write_locks
ADD CONSTRAINT write_locks_pkey PRIMARY KEY (allocation_id);


--
-- Name: write_markers write_markers_pkey; Type: CONSTRAINT; Schema: public; Owner: blobber_user
--
Expand Down

0 comments on commit c72a891

Please sign in to comment.