Skip to content

Commit a6f2457

Browse files
yiweichicolinlyguo
andauthored
feat(coordinator): assign static prover first and avoid reassigning failed task to same prover (#1584)
Co-authored-by: yiweichi <yiweichi@users.noreply.github.com> Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
1 parent fa0927c commit a6f2457

File tree

19 files changed

+287
-44
lines changed

19 files changed

+287
-44
lines changed

common/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"runtime/debug"
66
)
77

8-
var tag = "v4.4.85"
8+
var tag = "v4.4.86"
99

1010
var commit = func() string {
1111
if info, ok := debug.ReadBuildInfo(); ok {

coordinator/conf/config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"prover_manager": {
33
"provers_per_session": 1,
44
"session_attempts": 5,
5+
"external_prover_threshold": 32,
56
"bundle_collection_time_sec": 180,
67
"batch_collection_time_sec": 180,
78
"chunk_collection_time_sec": 180,

coordinator/internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type ProverManager struct {
1616
// Number of attempts that a session can be retried if previous attempts failed.
1717
// Currently we only consider proving timeout as failure here.
1818
SessionAttempts uint8 `json:"session_attempts"`
19+
// Threshold for activating the external prover based on unassigned task count.
20+
ExternalProverThreshold int64 `json:"external_prover_threshold"`
1921
// Zk verifier config.
2022
Verifier *VerifierConfig `json:"verifier"`
2123
// BatchCollectionTimeSec batch Proof collection time (in seconds).

coordinator/internal/controller/api/auth.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,11 @@ func (a *AuthController) PayloadFunc(data interface{}) jwt.MapClaims {
7070
}
7171

7272
return jwt.MapClaims{
73-
types.HardForkName: v.HardForkName,
74-
types.PublicKey: v.PublicKey,
75-
types.ProverName: v.Message.ProverName,
76-
types.ProverVersion: v.Message.ProverVersion,
73+
types.HardForkName: v.HardForkName,
74+
types.PublicKey: v.PublicKey,
75+
types.ProverName: v.Message.ProverName,
76+
types.ProverVersion: v.Message.ProverVersion,
77+
types.ProverProviderTypeKey: v.Message.ProverProviderType,
7778
}
7879
}
7980

@@ -96,5 +97,9 @@ func (a *AuthController) IdentityHandler(c *gin.Context) interface{} {
9697
c.Set(types.HardForkName, hardForkName)
9798
}
9899

100+
if providerType, ok := claims[types.ProverProviderTypeKey]; ok {
101+
c.Set(types.ProverProviderTypeKey, providerType)
102+
}
103+
99104
return nil
100105
}

coordinator/internal/logic/auth/login.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,17 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error {
106106
}
107107
}
108108
}
109+
110+
if login.Message.ProverProviderType != types.ProverProviderTypeInternal && login.Message.ProverProviderType != types.ProverProviderTypeExternal {
111+
// for backward compatibility, set ProverProviderType as internal
112+
if login.Message.ProverProviderType == types.ProverProviderTypeUndefined {
113+
login.Message.ProverProviderType = types.ProverProviderTypeInternal
114+
} else {
115+
log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover version", login.Message.ProverVersion)
116+
return errors.New("invalid prover provider type.")
117+
}
118+
}
119+
109120
return nil
110121
}
111122

coordinator/internal/logic/provertask/batch_prover_task.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"scroll-tech/coordinator/internal/config"
2323
"scroll-tech/coordinator/internal/orm"
2424
coordinatorType "scroll-tech/coordinator/internal/types"
25+
cutils "scroll-tech/coordinator/internal/utils"
2526
)
2627

2728
// BatchProverTask is prover task implement for batch proof
@@ -63,6 +64,18 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
6364

6465
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
6566
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
67+
if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) {
68+
unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
69+
if getCountError != nil {
70+
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError)
71+
return nil, ErrCoordinatorInternalFailure
72+
}
73+
// Assign external prover if unassigned task number exceeds threshold
74+
if unassignedBatchCount < bp.cfg.ProverManager.ExternalProverThreshold {
75+
return nil, nil
76+
}
77+
}
78+
6679
var batchTask *orm.Batch
6780
for i := 0; i < 5; i++ {
6881
var getTaskError error
@@ -88,6 +101,20 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
88101
return nil, nil
89102
}
90103

104+
// Don't dispatch the same failing job to the same prover
105+
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2)
106+
if getTaskError != nil {
107+
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBatchTask.Hash, "error", getTaskError)
108+
return nil, ErrCoordinatorInternalFailure
109+
}
110+
for i := 0; i < len(proverTasks); i++ {
111+
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
112+
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
113+
log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight)
114+
return nil, nil
115+
}
116+
}
117+
91118
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
92119
if updateAttemptsErr != nil {
93120
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)

coordinator/internal/logic/provertask/bundle_prover_task.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"scroll-tech/coordinator/internal/config"
2222
"scroll-tech/coordinator/internal/orm"
2323
coordinatorType "scroll-tech/coordinator/internal/types"
24+
cutils "scroll-tech/coordinator/internal/utils"
2425
)
2526

2627
// BundleProverTask is prover task implement for bundle proof
@@ -63,6 +64,18 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
6364

6465
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
6566
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
67+
if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) {
68+
unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
69+
if getCountError != nil {
70+
log.Error("failed to get unassigned bundle proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError)
71+
return nil, ErrCoordinatorInternalFailure
72+
}
73+
// Assign external prover if unassigned task number exceeds threshold
74+
if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold {
75+
return nil, nil
76+
}
77+
}
78+
6679
var bundleTask *orm.Bundle
6780
for i := 0; i < 5; i++ {
6881
var getTaskError error
@@ -88,6 +101,20 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
88101
return nil, nil
89102
}
90103

104+
// Don't dispatch the same failing job to the same prover
105+
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBundle, tmpBundleTask.Hash, 2)
106+
if getTaskError != nil {
107+
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBundle.String(), "task ID", tmpBundleTask.Hash, "error", getTaskError)
108+
return nil, ErrCoordinatorInternalFailure
109+
}
110+
for i := 0; i < len(proverTasks); i++ {
111+
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
112+
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
113+
log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight)
114+
return nil, nil
115+
}
116+
}
117+
91118
rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
92119
if updateAttemptsErr != nil {
93120
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)

coordinator/internal/logic/provertask/chunk_prover_task.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"scroll-tech/coordinator/internal/config"
2222
"scroll-tech/coordinator/internal/orm"
2323
coordinatorType "scroll-tech/coordinator/internal/types"
24+
cutils "scroll-tech/coordinator/internal/utils"
2425
)
2526

2627
// ChunkProverTask the chunk prover task
@@ -61,6 +62,18 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
6162

6263
maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
6364
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
65+
if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) {
66+
unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
67+
if getCountError != nil {
68+
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError)
69+
return nil, ErrCoordinatorInternalFailure
70+
}
71+
// Assign external prover if unassigned task number exceeds threshold
72+
if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold {
73+
return nil, nil
74+
}
75+
}
76+
6477
var chunkTask *orm.Chunk
6578
for i := 0; i < 5; i++ {
6679
var getTaskError error
@@ -86,6 +99,20 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
8699
return nil, nil
87100
}
88101

102+
// Don't dispatch the same failing job to the same prover
103+
proverTasks, getTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2)
104+
if getTaskError != nil {
105+
log.Error("failed to get prover tasks", "proof type", message.ProofTypeChunk.String(), "task ID", tmpChunkTask.Hash, "error", getTaskError)
106+
return nil, ErrCoordinatorInternalFailure
107+
}
108+
for i := 0; i < len(proverTasks); i++ {
109+
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
110+
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
111+
log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight)
112+
return nil, nil
113+
}
114+
}
115+
89116
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
90117
if updateAttemptsErr != nil {
91118
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)

coordinator/internal/logic/provertask/prover_task.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ type BaseProverTask struct {
4747
}
4848

4949
type proverTaskContext struct {
50-
PublicKey string
51-
ProverName string
52-
ProverVersion string
53-
HardForkNames map[string]struct{}
50+
PublicKey string
51+
ProverName string
52+
ProverVersion string
53+
ProverProviderType uint8
54+
HardForkNames map[string]struct{}
5455
}
5556

5657
// checkParameter check the prover task parameter illegal
@@ -76,6 +77,12 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context) (*proverTaskContext, e
7677
}
7778
ptc.ProverVersion = proverVersion.(string)
7879

80+
ProverProviderType, ProverProviderTypeExist := ctx.Get(coordinatorType.ProverProviderTypeKey)
81+
if !ProverProviderTypeExist {
82+
return nil, errors.New("get prover provider type from context failed")
83+
}
84+
ptc.ProverProviderType = uint8(ProverProviderType.(float64))
85+
7986
hardForkNamesStr, hardForkNameExist := ctx.Get(coordinatorType.HardForkName)
8087
if !hardForkNameExist {
8188
return nil, errors.New("get hard fork name from context failed")

coordinator/internal/orm/batch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,22 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo
9595
return &batch, nil
9696
}
9797

98+
// GetUnassignedBatchCount retrieves unassigned batch count.
99+
func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
100+
var count int64
101+
db := o.db.WithContext(ctx)
102+
db = db.Model(&Batch{})
103+
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
104+
db = db.Where("total_attempts < ?", maxTotalAttempts)
105+
db = db.Where("active_attempts < ?", maxActiveAttempts)
106+
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
107+
db = db.Where("batch.deleted_at IS NULL")
108+
if err := db.Count(&count).Error; err != nil {
109+
return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err)
110+
}
111+
return count, nil
112+
}
113+
98114
// GetAssignedBatch retrieves assigned batch based on the specified limit.
99115
// The returned batches are sorted in ascending order by their index.
100116
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {

0 commit comments

Comments
 (0)