Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max replica constraint #408

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/run/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ var DealPusherCmd = &cli.Command{
Aliases: []string{"d"},
Value: 3,
},
&cli.UintFlag{
Name: "max-replication-factor",
Usage: "Max number of replicas for each individual PieceCID across all clients and providers",
Aliases: []string{"M"},
DefaultText: "Unlimited",
},
},
Action: func(c *cli.Context) error {
db, closer, err := database.OpenFromCLI(c)
Expand All @@ -33,7 +39,7 @@ var DealPusherCmd = &cli.Command{
return errors.WithStack(err)
}

dm, err := dealpusher.NewDealPusher(db, c.String("lotus-api"), c.String("lotus-token"), c.Uint("deal-attempts"))
dm, err := dealpusher.NewDealPusher(db, c.String("lotus-api"), c.String("lotus-token"), c.Uint("deal-attempts"), c.Uint("max-replication-factor"))
if err != nil {
return errors.WithStack(err)
}
Expand Down
5 changes: 3 additions & 2 deletions docs/en/cli-reference/run/deal-pusher.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 21 additions & 5 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type DealPusher struct {
mutex sync.RWMutex // Mutex for providing mutual exclusion to protect shared resources.
sendDealAttempts uint // Number of attempts for sending a deal.
host host.Host // Libp2p host for making deals.
maxReplicas uint // Maximum number of replicas for each individual PieceCID across all clients and providers.
}

func (*DealPusher) Name() string {
Expand Down Expand Up @@ -227,6 +228,12 @@ func (d *DealPusher) updateScheduleUnsafe(ctx context.Context, schedule model.Sc
// 2. An error if any step of the process encounters an issue, otherwise nil.
func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) {
db := d.dbNoContext.WithContext(ctx)
overReplicatedCIDs := db.
Table("deals").
Select("piece_cid").
Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}).
Group("piece_cid").
Having("count(*) >= ?", d.maxReplicas)
var allowedPieceCIDs []model.CID
for _, c := range schedule.AllowedPieceCIDs {
c2, err := cid.Parse(c)
Expand Down Expand Up @@ -304,15 +311,23 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)
existingPieceCIDQuery = db.Table("deals").Select("piece_cid").Where("schedule_id = ?", schedule.ID)
}
if len(allowedPieceCIDs) == 0 {
err = db.Where("attachment_id IN ? AND piece_cid NOT IN (?)",
query := db.Where("attachment_id IN ? AND piece_cid NOT IN (?)",
underscore.Map(attachments, func(a model.SourceAttachment) model.SourceAttachmentID { return a.ID }),
existingPieceCIDQuery).First(&car).Error
existingPieceCIDQuery)
if d.maxReplicas > 0 && !schedule.Force {
query = query.Where("piece_cid NOT IN (?)", overReplicatedCIDs)
}
err = query.First(&car).Error
} else {
pieceCIDChunks := util.ChunkSlice(allowedPieceCIDs, util.BatchSize)
for _, pieceCIDChunk := range pieceCIDChunks {
err = db.Where("attachment_id IN ? AND piece_cid NOT IN (?) AND piece_cid IN ?",
query := db.Where("attachment_id IN ? AND piece_cid NOT IN (?) AND piece_cid IN ?",
underscore.Map(attachments, func(a model.SourceAttachment) model.SourceAttachmentID { return a.ID }),
existingPieceCIDQuery, pieceCIDChunk).First(&car).Error
existingPieceCIDQuery, pieceCIDChunk)
if d.maxReplicas > 0 && !schedule.Force {
query = query.Where("piece_cid NOT IN (?)", overReplicatedCIDs)
}
err = query.First(&car).Error
if err == nil {
break
}
Expand Down Expand Up @@ -389,7 +404,7 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)
}

func NewDealPusher(db *gorm.DB, lotusURL string,
lotusToken string, numAttempts uint) (*DealPusher, error) {
lotusToken string, numAttempts uint, maxReplicas uint) (*DealPusher, error) {
if numAttempts <= 1 {
numAttempts = 1
}
Expand All @@ -414,6 +429,7 @@ func NewDealPusher(db *gorm.DB, lotusURL string,
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor))),
sendDealAttempts: numAttempts,
host: h,
maxReplicas: maxReplicas,
}, nil
}

Expand Down
73 changes: 65 additions & 8 deletions service/dealpusher/dealpusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (m *MockDealMaker) MakeDeal(ctx context.Context, walletObj model.Wallet, ca

func TestDealMakerService_Start(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
ctx, cancel := context.WithCancel(ctx)
dones, _, err := service.Start(ctx)
Expand All @@ -79,9 +79,9 @@ func TestDealMakerService_Start(t *testing.T) {

func TestDealMakerService_MultipleInstances(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service1, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service1, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
service2, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service2, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
Expand All @@ -104,7 +104,7 @@ func TestDealMakerService_FailtoSend(t *testing.T) {
waitPendingInterval = time.Minute
}()
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 2)
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 2, 0)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestDealMakerService_Cron(t *testing.T) {
waitPendingInterval = time.Minute
}()
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestDealMakerService_ScheduleWithConstraints(t *testing.T) {
waitPendingInterval = time.Minute
}()
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestDealMakerService_ScheduleWithConstraints(t *testing.T) {

func TestDealmakerService_Force(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
Expand Down Expand Up @@ -421,9 +421,66 @@ func TestDealmakerService_Force(t *testing.T) {
})
}

func TestDealMakerService_MaxReplica(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 1)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
provider := "f0miner"
client := "f0client"
schedule := model.Schedule{
Preparation: &model.Preparation{
Wallets: []model.Wallet{
{
ID: client, Address: "f0xx",
},
},
SourceStorages: []model.Storage{{}},
},
State: model.ScheduleActive,
Provider: provider,
}
err = db.Create(&schedule).Error
require.NoError(t, err)
mockDealmaker.On("MakeDeal", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Deal{
ScheduleID: &schedule.ID,
}, nil)
err = db.Create([]model.Car{
{
AttachmentID: ptr.Of(model.SourceAttachmentID(1)),
PreparationID: 1,
PieceCID: pieceCID,
PieceSize: 1024,
StoragePath: "0",
},
}).Error
require.NoError(t, err)
err = db.Create([]model.Deal{
{
ScheduleID: &schedule.ID,
Provider: "another",
ClientID: client,
PieceCID: pieceCID,
PieceSize: 1024,
State: model.DealProposed,
}}).Error
require.NoError(t, err)
service.runOnce(ctx)
time.Sleep(time.Second)
var deals []model.Deal
err = db.Find(&deals).Error
require.NoError(t, err)
require.Len(t, deals, 1)
})
}

func TestDealMakerService_NewScheduleOneOff(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1, 10)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
Expand Down
1 change: 1 addition & 0 deletions util/testutil/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func getTestDB(t *testing.T, dialect string) (db *gorm.DB, closer io.Closer, con
if errors.As(err, &opError) {
return
}
require.NoError(t, err)
err = db1.Exec("CREATE DATABASE " + dbName + "").Error
require.NoError(t, err)
connStr = strings.ReplaceAll(connStr, "singularity?", dbName+"?")
Expand Down