-
-
Notifications
You must be signed in to change notification settings - Fork 306
/
attestationPool.ts
223 lines (197 loc) Β· 7.86 KB
/
attestationPool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {phase0, Slot, Root, RootHex} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
/**
* The number of slots that will be stored in the pool.
*
* For example, if `SLOTS_RETAINED == 3` and the pool is pruned at slot `6`, then all attestations
* at slots less than `4` will be dropped and any future attestation with a slot less than `4`
* will be refused.
*/
const SLOTS_RETAINED = 3;
/**
* The maximum number of distinct `AttestationData` that will be stored in each slot.
*
* This is a DoS protection measure.
*/
const MAX_ATTESTATIONS_PER_SLOT = 16_384;
type AggregateFast = {
data: phase0.Attestation["data"];
aggregationBits: BitArray;
signature: Signature;
};
/** Hex string of DataRoot `TODO` */
type DataRootHex = string;
/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
* the native aggregation scheme.
*
* **The `NaiveAggregationPool` does not do any signature or attestation verification. It assumes
* that all `Attestation` objects provided are valid.**
*
* ## Details
*
* The pool sorts the `Attestation` by `attestation.data.slot`, then by `attestation.data`.
*
* As each unaggregated attestation is added it is aggregated with any existing `attestation` with
* the same `AttestationData`. Considering that the pool only accepts attestations with a single
* signature, there should only ever be a single aggregated `Attestation` for any given
* `AttestationData`.
*
* The pool has a capacity for `SLOTS_RETAINED` slots, when a new `attestation.data.slot` is
* provided, the oldest slot is dropped and replaced with the new slot. The pool can also be
* pruned by supplying a `current_slot`; all existing attestations with a slot lower than
* `current_slot - SLOTS_RETAINED` will be removed and any future attestation with a slot lower
* than that will also be refused. Pruning is done automatically based upon the attestations it
* receives and it can be triggered manually.
*/
export class AttestationPool {
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, AggregateFast>>(
() => new Map<DataRootHex, AggregateFast>()
);
private lowestPermissibleSlot = 0;
constructor(
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}
/** Returns current count of pre-aggregated attestations with unique data */
getAttestationCount(): number {
let attestationCount = 0;
for (const attestationByRoot of this.attestationByRootBySlot.values()) {
attestationCount += attestationByRoot.size;
}
return attestationCount;
}
/**
* Accepts an `VerifiedUnaggregatedAttestation` and attempts to apply it to the "naive
* aggregation pool".
*
* The naive aggregation pool is used by local validators to produce
* `SignedAggregateAndProof`.
*
* If the attestation is too old (low slot) to be included in the pool it is simply dropped
* and no error is returned. Also if it's at clock slot but come to the pool later than 2/3
* of slot time, it's dropped too since it's not helpful for the validator anymore
*
* Expects the attestation to be fully validated:
* - Valid signature
* - Consistent bitlength
* - Valid committeeIndex
* - Valid data
*/
add(attestation: phase0.Attestation, attDataRootHex: RootHex): InsertOutcome {
const slot = attestation.data.slot;
const lowestPermissibleSlot = this.lowestPermissibleSlot;
// Reject any attestations that are too old.
if (slot < lowestPermissibleSlot) {
return InsertOutcome.Old;
}
// Reject attestations in the current slot but come to this pool very late
if (this.clock.secFromSlot(slot) > this.cutOffSecFromSlot) {
return InsertOutcome.Late;
}
// Limit object per slot
const aggregateByRoot = this.attestationByRootBySlot.getOrDefault(slot);
if (aggregateByRoot.size >= MAX_ATTESTATIONS_PER_SLOT) {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}
// Pre-aggregate the contribution with existing items
const aggregate = aggregateByRoot.get(attDataRootHex);
if (aggregate) {
// Aggregate mutating
return aggregateAttestationInto(aggregate, attestation);
} else {
// Create new aggregate
aggregateByRoot.set(attDataRootHex, attestationToAggregate(attestation));
return InsertOutcome.NewData;
}
}
/**
* For validator API to get an aggregate
*/
getAggregate(slot: Slot, dataRoot: Root): phase0.Attestation {
const dataRootHex = toHexString(dataRoot);
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
throw Error(`No attestation for slot=${slot} dataRoot=${dataRootHex}`);
}
return fastToAttestation(aggregate);
}
/**
* Removes any attestations with a slot lower than `current_slot - preaggregateSlotDistance`.
* By default, not interested in attestations in old slots, we only preaggregate attestations for the current slot.
*/
prune(clockSlot: Slot): void {
pruneBySlot(this.attestationByRootBySlot, clockSlot, SLOTS_RETAINED);
// by default preaggregateSlotDistance is 0, i.e only accept attestations in the same clock slot.
this.lowestPermissibleSlot = Math.max(clockSlot - this.preaggregateSlotDistance, 0);
}
/**
* Get all attestations optionally filtered by `attestation.data.slot`
* @param bySlot slot to filter, `bySlot === attestation.data.slot`
*/
getAll(bySlot?: Slot): phase0.Attestation[] {
const attestations: phase0.Attestation[] = [];
const aggregateByRoots =
bySlot === undefined
? Array.from(this.attestationByRootBySlot.values())
: [this.attestationByRootBySlot.get(bySlot)];
for (const aggregateByRoot of aggregateByRoots) {
if (aggregateByRoot) {
for (const aggFast of aggregateByRoot.values()) {
attestations.push(fastToAttestation(aggFast));
}
}
}
return attestations;
}
}
// - Retrieve agg attestations by slot and data root
// - Insert attestations coming from gossip and API
/**
* Aggregate a new contribution into `aggregate` mutating it
*/
function aggregateAttestationInto(aggregate: AggregateFast, attestation: phase0.Attestation): InsertOutcome {
const bitIndex = attestation.aggregationBits.getSingleTrueBit();
// Should never happen, attestations are verified against this exact condition before
if (bitIndex === null) {
throw Error("Invalid attestation not exactly one bit set");
}
if (aggregate.aggregationBits.get(bitIndex) === true) {
return InsertOutcome.AlreadyKnown;
}
aggregate.aggregationBits.set(bitIndex, true);
aggregate.signature = bls.Signature.aggregate([
aggregate.signature,
signatureFromBytesNoCheck(attestation.signature),
]);
return InsertOutcome.Aggregated;
}
/**
* Format `contribution` into an efficient `aggregate` to add more contributions in with aggregateContributionInto()
*/
function attestationToAggregate(attestation: phase0.Attestation): AggregateFast {
return {
data: attestation.data,
// clone because it will be mutated
aggregationBits: attestation.aggregationBits.clone(),
signature: signatureFromBytesNoCheck(attestation.signature),
};
}
/**
* Unwrap AggregateFast to phase0.Attestation
*/
function fastToAttestation(aggFast: AggregateFast): phase0.Attestation {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
}