Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extract out content manager queue #741

Merged
merged 1 commit into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions api/v1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,12 @@ func (s *apiV1) handleAddIpfs(c echo.Context, u *util.User) error {
}

makeDeal := true
pinstatus, pinOp, contID, err := s.CM.PinContent(ctx, u.ID, rcid, filename, cols, origins, 0, nil, makeDeal)
pinstatus, pinOp, err := s.CM.PinContent(ctx, u.ID, rcid, filename, cols, origins, 0, nil, makeDeal)
if err != nil {
return err
}

s.pinMgr.Add(pinOp)
s.CM.ToCheck(contID)

return c.JSON(http.StatusAccepted, pinstatus)
}

Expand Down Expand Up @@ -3412,13 +3410,11 @@ func (s *apiV1) handleCommitCollection(c echo.Context, u *util.User) error {
ctx := c.Request().Context()
makeDeal := false

pinstatus, pinOp, contID, err := s.CM.PinContent(ctx, u.ID, collectionNode.Cid(), collectionNode.Cid().String(), nil, origins, 0, nil, makeDeal)
pinstatus, pinOp, err := s.CM.PinContent(ctx, u.ID, collectionNode.Cid(), collectionNode.Cid().String(), nil, origins, 0, nil, makeDeal)
if err != nil {
return err
}

s.pinMgr.Add(pinOp)
s.CM.ToCheck(contID)

return c.JSON(http.StatusOK, pinstatus)
}
Expand Down
8 changes: 2 additions & 6 deletions api/v1/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,11 @@ func (s *apiV1) handleAddPin(e echo.Context, u *util.User) error {
}

makeDeal := true
status, pinOp, contID, err := s.CM.PinContent(ctx, u.ID, obj, pin.Name, cols, origins, 0, pin.Meta, makeDeal)
status, pinOp, err := s.CM.PinContent(ctx, u.ID, obj, pin.Name, cols, origins, 0, pin.Meta, makeDeal)
if err != nil {
return err
}

s.pinMgr.Add(pinOp)
s.CM.ToCheck(contID)

return e.JSON(http.StatusAccepted, status)
}
Expand Down Expand Up @@ -406,13 +404,11 @@ func (s *apiV1) handleReplacePin(e echo.Context, u *util.User) error {
}

makeDeal := true
status, pinOp, contID, err := s.CM.PinContent(e.Request().Context(), u.ID, pinCID, pin.Name, nil, origins, uint(pinID), pin.Meta, makeDeal)
status, pinOp, err := s.CM.PinContent(e.Request().Context(), u.ID, pinCID, pin.Name, nil, origins, uint(pinID), pin.Meta, makeDeal)
if err != nil {
return err
}

s.pinMgr.Add(pinOp)
s.CM.ToCheck(contID)

return e.JSON(http.StatusAccepted, status)
}
Expand Down
47 changes: 37 additions & 10 deletions contentmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package contentmgr

import (
"sync"
"time"

"github.com/application-research/estuary/config"
contentqueue "github.com/application-research/estuary/contentmgr/queue"
"github.com/application-research/estuary/drpc"
"github.com/application-research/estuary/miner"
"github.com/application-research/estuary/node"
"golang.org/x/xerrors"

"github.com/application-research/estuary/util"
"github.com/application-research/filclient"
datatransfer "github.com/filecoin-project/go-data-transfer"
Expand All @@ -28,8 +32,7 @@ type ContentManager struct {
tracer trace.Tracer
blockstore node.EstuaryBlockstore
notifyBlockstore *node.NotifyBlockstore
toCheck chan uint
queueMgr *queueManager
queueMgr contentqueue.IQueueManager
retrLk sync.Mutex
retrievalsInProgress map[uint]*util.RetrievalProgress
contentLk sync.RWMutex
Expand Down Expand Up @@ -61,12 +64,14 @@ type ContentManager struct {
TrackingChannels map[string]*util.ChanTrack
}

func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tbs *util.TrackingBlockstore, nd *node.Node, cfg *config.Estuary, minerManager miner.IMinerManager, log *zap.SugaredLogger) (*ContentManager, error) {
func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tbs *util.TrackingBlockstore, nd *node.Node, cfg *config.Estuary, minerManager miner.IMinerManager, log *zap.SugaredLogger) (*ContentManager, contentqueue.IQueueManager, error) {
cache, err := lru.NewARC(50000)
if err != nil {
return nil, err
return nil, nil, err
}

queueMgr := contentqueue.NewQueueManager(cfg.DisableFilecoinStorage)

cm := &ContentManager{
cfg: cfg,
db: db,
Expand All @@ -75,7 +80,6 @@ func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tb
blockstore: tbs.Under().(node.EstuaryBlockstore),
node: nd,
notifyBlockstore: nd.NotifBlockstore,
toCheck: make(chan uint, 100000),
retrievalsInProgress: make(map[uint]*util.RetrievalProgress),
remoteTransferStatus: cache,
Shuttles: make(map[string]*ShuttleConnection),
Expand All @@ -88,14 +92,12 @@ func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tb
consolidatingZones: make(map[uint]bool),
aggregatingZones: make(map[uint]bool),

queueMgr: queueMgr,

// TODO move out to filc package
TrackingChannels: make(map[string]*util.ChanTrack),
}

cm.queueMgr = newQueueManager(func(c uint) {
cm.ToCheck(c)
})
return cm, nil
return cm, queueMgr, nil
}

// TODO move out to filc package
Expand All @@ -108,3 +110,28 @@ func (cm *ContentManager) TrackTransfer(chanid *datatransfer.ChannelID, dealdbid
Last: st,
}
}

func (cm *ContentManager) rebuildToCheckQueue() error {
cm.log.Info("rebuilding contents queue .......")

var allcontent []util.Content
if err := cm.db.Find(&allcontent, "active AND NOT aggregated_in > 0").Error; err != nil {
return xerrors.Errorf("finding all content in database: %w", err)
}

go func() {
for i, c := range allcontent {
// every 100 contents re-queued, wait 5 seconds to avoid over-saturating queues
// time to requeue all: 10m / 100 * 5 seconds = 5.78 days
if i%100 == 0 {
time.Sleep(time.Second * 5)
}
cm.queueMgr.ToCheck(c.ID)
}
}()
return nil
}

func (cm *ContentManager) ToCheck(contID uint) {
cm.queueMgr.ToCheck(contID)
}
21 changes: 11 additions & 10 deletions contentmgr/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,24 @@ func (cm *ContentManager) PinDelegatesForContent(cont util.Content) []string {
}
}

func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid, filename string, cols []*collections.CollectionRef, origins []*peer.AddrInfo, replaceID uint, meta map[string]interface{}, makeDeal bool) (*types.IpfsPinStatusResponse, *operation.PinningOperation, uint, error) {
func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid, filename string, cols []*collections.CollectionRef, origins []*peer.AddrInfo, replaceID uint, meta map[string]interface{}, makeDeal bool) (*types.IpfsPinStatusResponse, *operation.PinningOperation, error) {
loc, err := cm.selectLocationForContent(ctx, obj, user)
if err != nil {
return nil, nil, 0, xerrors.Errorf("selecting location for content failed: %w", err)
return nil, nil, xerrors.Errorf("selecting location for content failed: %w", err)
}

if replaceID > 0 {
// mark as replace since it will removed and so it should not be fetched anymore
if err := cm.db.Model(&util.Content{}).Where("id = ?", replaceID).Update("replace", true).Error; err != nil {
return nil, nil, 0, err
return nil, nil, err
}
}

var metaStr string
if meta != nil {
b, err := json.Marshal(meta)
if err != nil {
return nil, nil, 0, err
return nil, nil, err
}
metaStr = string(b)
}
Expand All @@ -115,7 +115,7 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid
if origins != nil {
b, err := json.Marshal(origins)
if err != nil {
return nil, nil, 0, err
return nil, nil, err
}
originsStr = string(b)
}
Expand All @@ -132,7 +132,7 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid
Origins: originsStr,
}
if err := cm.db.Create(&cont).Error; err != nil {
return nil, nil, 0, err
return nil, nil, err
}

if len(cols) > 0 {
Expand All @@ -144,7 +144,7 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid
Columns: []clause.Column{{Name: "path"}, {Name: "collection"}},
DoUpdates: clause.AssignmentColumns([]string{"created_at", "content"}),
}).Create(cols).Error; err != nil {
return nil, nil, 0, err
return nil, nil, err
}
}

Expand All @@ -153,15 +153,15 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid
pinOp = cm.GetPinOperation(cont, origins, replaceID, makeDeal)
} else {
if err := cm.PinContentOnShuttle(ctx, cont, origins, replaceID, loc, makeDeal); err != nil {
return nil, nil, 0, err
return nil, nil, err
}
}

ipfsRes, err := cm.PinStatus(cont, origins)
if err != nil {
return nil, nil, 0, err
return nil, nil, err
}
return ipfsRes, pinOp, cont.ID, nil
return ipfsRes, pinOp, nil
}

func (cm *ContentManager) GetPinOperation(cont util.Content, peers []*peer.AddrInfo, replaceID uint, makeDeal bool) *operation.PinningOperation {
Expand Down Expand Up @@ -393,6 +393,7 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri
}).Error; err != nil {
return xerrors.Errorf("failed to update content in database: %w", err)
}
cm.ToCheck(cont.ID)
return nil
}

Expand Down
55 changes: 29 additions & 26 deletions contentmgr/queue.go → contentmgr/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package contentmgr
package contentqueue

import (
"container/heap"
"context"
"sync"
"time"

"github.com/application-research/estuary/util"
"github.com/ipfs/go-metrics-interface"
"golang.org/x/xerrors"
)

type IQueueManager interface {
ToCheck(contID uint)
Add(content uint, wait time.Duration)
NextContent() chan uint
Len() int
NextEvent() time.Time
}

type queueManager struct {
queue *entryQueue
cb func(uint)
Expand All @@ -21,6 +27,9 @@ type queueManager struct {

qsizeMetr metrics.Gauge
qnextMetr metrics.Gauge

toCheck chan uint
isDisabled bool
}

type queueEntry struct {
Expand Down Expand Up @@ -58,24 +67,27 @@ func (eq *entryQueue) PopEntry() *queueEntry {
return heap.Pop(eq).(*queueEntry)
}

func newQueueManager(cb func(c uint)) *queueManager {
func NewQueueManager(isDisabled bool) *queueManager {
metCtx := metrics.CtxScope(context.Background(), "content_manager")
qsizeMetr := metrics.NewCtx(metCtx, "queue_size", "number of items in the replicator queue").Gauge()
qnextMetr := metrics.NewCtx(metCtx, "queue_next", "next event time for queue").Gauge()

qm := &queueManager{
queue: new(entryQueue),
cb: cb,

qsizeMetr: qsizeMetr,
qnextMetr: qnextMetr,

toCheck: make(chan uint, 100000),
isDisabled: isDisabled,
}
qm.cb = qm.ToCheck

heap.Init(qm.queue)
return qm
}

func (qm *queueManager) add(content uint, wait time.Duration) {
func (qm *queueManager) Add(content uint, wait time.Duration) {
qm.qlk.Lock()
defer qm.qlk.Unlock()

Expand Down Expand Up @@ -122,30 +134,21 @@ func (qm *queueManager) processQueue() {
qm.nextEvent = time.Time{}
}

func (cm *ContentManager) ToCheck(contID uint) {
func (qm *queueManager) ToCheck(contID uint) {
// if DisableFilecoinStorage is not enabled, queue content for deal making
if !cm.cfg.DisableFilecoinStorage {
cm.toCheck <- contID
if !qm.isDisabled {
qm.toCheck <- contID
}
}

func (cm *ContentManager) rebuildToCheckQueue() error {
cm.log.Info("rebuilding contents queue .......")
func (qm *queueManager) NextContent() chan uint {
return qm.toCheck
}

var allcontent []util.Content
if err := cm.db.Find(&allcontent, "active AND NOT aggregated_in > 0").Error; err != nil {
return xerrors.Errorf("finding all content in database: %w", err)
}
func (qm *queueManager) Len() int {
return len(qm.queue.elems)
}

go func() {
for i, c := range allcontent {
// every 100 contents re-queued, wait 5 seconds to avoid over-saturating queues
// time to requeue all: 10m / 100 * 5 seconds = 5.78 days
if i%100 == 0 {
time.Sleep(time.Second * 5)
}
cm.ToCheck(c.ID)
}
}()
return nil
func (qm *queueManager) NextEvent() time.Time {
return qm.nextEvent
}
6 changes: 3 additions & 3 deletions contentmgr/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (cm *ContentManager) runDealWorker(ctx context.Context) {
// run the deal reconciliation and deal making worker
for {
select {
case c := <-cm.toCheck:
case c := <-cm.queueMgr.NextContent():
cm.log.Debugf("checking content: %d", c)

var content util.Content
Expand All @@ -62,11 +62,11 @@ func (cm *ContentManager) runDealWorker(ctx context.Context) {
}

err := cm.ensureStorage(context.TODO(), content, func(dur time.Duration) {
cm.queueMgr.add(content.ID, dur)
cm.queueMgr.Add(content.ID, dur)
})
if err != nil {
cm.log.Errorf("failed to ensure replication of content %d: %s", content.ID, err)
cm.queueMgr.add(content.ID, time.Minute*5)
cm.queueMgr.Add(content.ID, time.Minute*5)
}
}
}
Expand Down
Loading