Skip to content

Commit

Permalink
add l2 caching for alloc (#1258)
Browse files Browse the repository at this point in the history
* add l2 caching for alloc

* rmv return from commit repo

* use update

* fix update

* add log

* rmv update object

* add log for allocation

* move lock to middleware

* fix save alloc update

* empty commit

* fix blobber size update

* fix unit test

* cleanup

* rmv commit in initMap

* fix unit test

* fix renamefile mock db

* add commit method to enhancedDB

---------

Co-authored-by: Yury <yuderbasov@gmail.com>
  • Loading branch information
Hitenjain14 and dabasov authored Oct 6, 2023
1 parent 80ed5b7 commit bfe27aa
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func setupMockForFileManagerInit(mock sqlmock.Sqlmock) {
WillReturnRows(
sqlmock.NewRows([]string{"file_size"}).AddRow(6553600),
)

mock.ExpectCommit()
}

func init() {
Expand Down
183 changes: 163 additions & 20 deletions code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
lru "github.com/hashicorp/golang-lru/v2"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand All @@ -14,17 +15,29 @@ import (
const (
SQLWhereGetById = "allocations.id = ?"
SQLWhereGetByTx = "allocations.tx = ?"
lruSize = 100
)

var (
Repo *Repository
)

type AllocationUpdate func(a *Allocation)

func init() {
Repo = &Repository{}
allocCache, _ := lru.New[string, *Allocation](lruSize)
Repo = &Repository{
allocCache: allocCache,
}
}

type Repository struct {
allocCache *lru.Cache[string, *Allocation]
}

type AllocationCache struct {
Allocation *Allocation
AllocationUpdates []AllocationUpdate
}

type Res struct {
Expand All @@ -43,6 +56,11 @@ func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error
}

if a, ok := cache[id]; ok {
return a.Allocation, nil
}

a := r.getAllocFromGlobalCache(id)
if a != nil {
return a, nil
}

Expand All @@ -52,8 +70,10 @@ func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error
return alloc, err
}

cache[id] = alloc

cache[id] = AllocationCache{
Allocation: alloc,
}
r.setAllocToGlobalCache(alloc)
return alloc, nil
}

Expand All @@ -69,16 +89,17 @@ func (r *Repository) GetByIdAndLock(ctx context.Context, id string) (*Allocation
}

alloc := &Allocation{}

err = tx.Model(&Allocation{}).
Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).
Where("id=?", id).
Take(alloc).Error
if err != nil {
return alloc, err
}
cache[id] = alloc

cache[id] = AllocationCache{
Allocation: alloc,
}
r.setAllocToGlobalCache(alloc)
return alloc, err
}

Expand All @@ -93,32 +114,49 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (
return nil, err
}
if a, ok := cache[allocationID]; ok {
if a.Tx == txHash {
return a, nil
if a.Allocation.Tx == txHash {
return a.Allocation, nil
}
}

a := r.getAllocFromGlobalCache(allocationID)
if a != nil && a.Tx == txHash {
return a, nil
}

alloc := &Allocation{}
err = tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, txHash).Take(alloc).Error
if err != nil {
return alloc, err
}
cache[allocationID] = alloc

cache[allocationID] = AllocationCache{
Allocation: alloc,
}
r.setAllocToGlobalCache(alloc)
return alloc, err
}

func (r *Repository) GetAllocations(ctx context.Context, offset int64) ([]*Allocation, error) {
var tx = datastore.GetStore().GetTransaction(ctx)

const query = `finalized = false AND cleaned_up = false`
allocs := make([]*Allocation, 0)
return allocs, tx.Model(&Allocation{}).
allocs := make([]*Allocation, 0, 10)
err := tx.Model(&Allocation{}).
Where(query).
Limit(UPDATE_LIMIT).
Offset(int(offset)).
Order("id ASC").
Find(&allocs).Error
if err != nil {
return allocs, err
}
for ind, alloc := range allocs {
if ind == lruSize {
break
}
r.setAllocToGlobalCache(alloc)
}
return allocs, nil
}

func (r *Repository) GetAllocationIds(ctx context.Context) []Res {
Expand Down Expand Up @@ -149,40 +187,145 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A
if err != nil {
return err
}
delete(cache, allocationID)

allocationUpdates := make(map[string]interface{})
allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot
allocationUpdates["is_redeem_required"] = false
err = tx.Model(allocationObj).Updates(allocationUpdates).Error
return err
if err != nil {
return err
}
allocationObj.LatestRedeemedWM = AllocationRoot
allocationObj.IsRedeemRequired = false
txnCache := cache[allocationID]
txnCache.Allocation = allocationObj
updateAlloc := func(a *Allocation) {
a.LatestRedeemedWM = AllocationRoot
a.IsRedeemRequired = false
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[allocationID] = txnCache
return nil
}

func (r *Repository) Save(ctx context.Context, a *Allocation) error {
func (r *Repository) UpdateAllocation(ctx context.Context, allocationObj *Allocation, updateMap map[string]interface{}, updateOption AllocationUpdate) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}
cache, err := getCache(tx)
if err != nil {
return err
}
err = tx.Model(allocationObj).Updates(updateMap).Error
if err != nil {
return err
}
txnCache := cache[allocationObj.ID]
txnCache.Allocation = allocationObj
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateOption)
cache[allocationObj.ID] = txnCache
return nil
}

func (r *Repository) Commit(tx *datastore.EnhancedDB) {
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}
cache, _ := getCache(tx)
if cache == nil {
return
}
for _, txnCache := range cache {
var alloc *Allocation
for _, update := range txnCache.AllocationUpdates {
alloc = r.getAllocFromGlobalCache(txnCache.Allocation.ID)
if alloc != nil {
update(alloc)
}
}
if alloc != nil {
r.setAllocToGlobalCache(alloc)
}
}
}

func (r *Repository) Save(ctx context.Context, alloc *Allocation) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}

cache, err := getCache(tx)
if err != nil {
return err
}

txnCache := cache[alloc.ID]
txnCache.Allocation = alloc
err = tx.Save(alloc).Error
if err != nil {
return err
}
updateAlloc := func(a *Allocation) {
*a = *alloc
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[alloc.ID] = txnCache
return nil
}

func (r *Repository) Create(ctx context.Context, alloc *Allocation) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}
cache, err := getCache(tx)
if err != nil {
return err
}

cache[a.ID] = a
return tx.Save(a).Error
txnCache := cache[alloc.ID]
txnCache.Allocation = alloc
err = tx.Create(alloc).Error
if err != nil {
return err
}
cache[alloc.ID] = txnCache
return nil
}

func getCache(tx *datastore.EnhancedDB) (map[string]*Allocation, error) {
func getCache(tx *datastore.EnhancedDB) (map[string]AllocationCache, error) {
c, ok := tx.SessionCache[TableNameAllocation]
if ok {
cache, ok := c.(map[string]*Allocation)
cache, ok := c.(map[string]AllocationCache)
if !ok {
return nil, fmt.Errorf("type assertion failed")
}
return cache, nil
}
cache := make(map[string]*Allocation)
cache := make(map[string]AllocationCache)
tx.SessionCache[TableNameAllocation] = cache
if tx.CommitAllocCache == nil {
tx.CommitAllocCache = func(tx *datastore.EnhancedDB) {
Repo.Commit(tx)
}
}
return cache, nil
}

func (r *Repository) getAllocFromGlobalCache(id string) *Allocation {
a, ok := r.allocCache.Get(id)
if !ok {
return nil
}
return a
}

func (r *Repository) setAllocToGlobalCache(a *Allocation) {
r.allocCache.Add(a.ID, a)
}

func (r *Repository) DeleteAllocation(allocationID string) {
r.allocCache.Remove(allocationID)
}
41 changes: 33 additions & 8 deletions code/go/0chain.net/blobbercore/allocation/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package allocation
import (
"context"
"encoding/json"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"math"
"time"

"github.com/0chain/blobber/code/go/0chain.net/core/node"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
Expand Down Expand Up @@ -49,10 +51,12 @@ func UpdateWorker(ctx context.Context, interval time.Duration) {
for {
select {
case <-tick:
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
updateCtx := datastore.GetStore().CreateTransaction(context.TODO())
_ = datastore.GetStore().WithTransaction(updateCtx, func(ctx context.Context) error {
updateWork(ctx)
return nil
})
updateCtx.Done()
case <-quit:
return
}
Expand Down Expand Up @@ -239,17 +243,42 @@ func requestExpiredAllocations() (allocs []string, err error) {
func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.StorageAllocation) (ua *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)
var changed bool = a.Tx != sa.Tx
if !changed {
return a, nil
}

// transaction
a.Tx = sa.Tx
a.OwnerID = sa.OwnerID
a.OwnerPublicKey = sa.OwnerPublicKey

// update fields
// // update fields
a.Expiration = sa.Expiration
a.TotalSize = sa.Size
a.Finalized = sa.Finalized
a.FileOptions = sa.FileOptions
a.BlobberSize = int64(math.Ceil(float64(sa.Size) / float64(sa.DataShards)))

updateMap := make(map[string]interface{})
updateMap["tx"] = a.Tx
updateMap["owner_id"] = a.OwnerID
updateMap["owner_public_key"] = a.OwnerPublicKey
updateMap["expiration"] = a.Expiration
updateMap["total_size"] = a.TotalSize
updateMap["finalized"] = a.Finalized
updateMap["file_options"] = a.FileOptions
updateMap["blobber_size"] = a.BlobberSize

updateOption := func(alloc *Allocation) {
alloc.Tx = a.Tx
alloc.OwnerID = a.OwnerID
alloc.OwnerPublicKey = a.OwnerPublicKey
alloc.Expiration = a.Expiration
alloc.TotalSize = a.TotalSize
alloc.Finalized = a.Finalized
alloc.FileOptions = a.FileOptions
alloc.BlobberSize = a.BlobberSize
}

// update terms
a.Terms = make([]*Terms, 0, len(sa.BlobberDetails))
Expand All @@ -263,14 +292,10 @@ func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.St
}

// save allocations
if err := Repo.Save(ctx, a); err != nil {
if err := Repo.UpdateAllocation(ctx, a, updateMap, updateOption); err != nil {
return nil, err
}

if !changed {
return a, nil
}

// save allocation terms
for _, t := range a.Terms {
if err := tx.Save(t).Error; err != nil {
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/allocation/zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func SyncAllocation(allocationId string) (*Allocation, error) {

err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
var e error
if e := Repo.Save(ctx, alloc); e != nil {
if e := Repo.Create(ctx, alloc); e != nil {
return e
}
tx := datastore.GetStore().GetTransaction(ctx)
Expand Down
Loading

0 comments on commit bfe27aa

Please sign in to comment.