-
Notifications
You must be signed in to change notification settings - Fork 931
/
cascade.go
153 lines (129 loc) · 4.32 KB
/
cascade.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package getters
import (
"context"
"errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/celestiaorg/rsmt2d"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)
var _ share.Getter = (*CascadeGetter)(nil)
// CascadeGetter implements custom share.Getter that composes multiple Getter implementations in
// "cascading" order.
//
// See cascade func for details on cascading.
type CascadeGetter struct {
getters []share.Getter
}
// NewCascadeGetter instantiates a new CascadeGetter from given share.Getters with given interval.
func NewCascadeGetter(getters []share.Getter) *CascadeGetter {
return &CascadeGetter{
getters: getters,
}
}
// GetShare gets a share from any of registered share.Getters in cascading order.
func (cg *CascadeGetter) GetShare(
ctx context.Context, header *header.ExtendedHeader, row, col int,
) (share.Share, error) {
ctx, span := tracer.Start(ctx, "cascade/get-share", trace.WithAttributes(
attribute.Int("row", row),
attribute.Int("col", col),
))
defer span.End()
upperBound := len(header.DAH.RowRoots)
if row >= upperBound || col >= upperBound {
err := share.ErrOutOfBounds
span.RecordError(err)
return nil, err
}
get := func(ctx context.Context, get share.Getter) (share.Share, error) {
return get.GetShare(ctx, header, row, col)
}
return cascadeGetters(ctx, cg.getters, get)
}
// GetEDS gets a full EDS from any of registered share.Getters in cascading order.
func (cg *CascadeGetter) GetEDS(
ctx context.Context, header *header.ExtendedHeader,
) (*rsmt2d.ExtendedDataSquare, error) {
ctx, span := tracer.Start(ctx, "cascade/get-eds")
defer span.End()
get := func(ctx context.Context, get share.Getter) (*rsmt2d.ExtendedDataSquare, error) {
return get.GetEDS(ctx, header)
}
return cascadeGetters(ctx, cg.getters, get)
}
// GetSharesByNamespace gets NamespacedShares from any of registered share.Getters in cascading
// order.
func (cg *CascadeGetter) GetSharesByNamespace(
ctx context.Context,
header *header.ExtendedHeader,
namespace share.Namespace,
) (share.NamespacedShares, error) {
ctx, span := tracer.Start(ctx, "cascade/get-shares-by-namespace", trace.WithAttributes(
attribute.String("namespace", namespace.String()),
))
defer span.End()
get := func(ctx context.Context, get share.Getter) (share.NamespacedShares, error) {
return get.GetSharesByNamespace(ctx, header, namespace)
}
return cascadeGetters(ctx, cg.getters, get)
}
// cascade implements a cascading retry algorithm for getting a value from multiple sources.
// Cascading implies trying the sources one-by-one in the given order with the
// given interval until either:
// - One of the sources returns the value
// - All of the sources errors
// - Context is canceled
//
// NOTE: New source attempts after interval do suspend running sources in progress.
func cascadeGetters[V any](
ctx context.Context,
getters []share.Getter,
get func(context.Context, share.Getter) (V, error),
) (V, error) {
var (
zero V
err error
)
if len(getters) == 0 {
return zero, errors.New("no getters provided")
}
ctx, span := tracer.Start(ctx, "cascade", trace.WithAttributes(
attribute.Int("total-getters", len(getters)),
))
defer func() {
if err != nil {
utils.SetStatusAndEnd(span, errors.New("all getters failed"))
}
}()
for i, getter := range getters {
log.Debugf("cascade: launching getter #%d", i)
span.AddEvent("getter launched", trace.WithAttributes(attribute.Int("getter_idx", i)))
// we split the timeout between left getters
// once async cascadegetter is implemented, we can remove this
getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0)
val, getErr := get(getCtx, getter)
cancel()
if getErr == nil {
return val, nil
}
if errors.Is(getErr, errOperationNotSupported) {
continue
}
span.RecordError(getErr, trace.WithAttributes(attribute.Int("getter_idx", i)))
var byzantineErr *byzantine.ErrByzantine
if errors.As(getErr, &byzantineErr) {
// short circuit if byzantine error was detected (to be able to handle it correctly
// and create the BEFP)
return zero, byzantineErr
}
err = errors.Join(err, getErr)
if ctx.Err() != nil {
return zero, err
}
}
return zero, err
}