-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathspark.js
188 lines (165 loc) · 5.54 KB
/
spark.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/* global Zinnia */
import { ActivityState } from './activity-state.js'
import { SPARK_VERSION, DELAY_BETWEEN_RETRIEVALS, MAX_CAR_SIZE } from './constants.js'
import { encodeHex } from './deno-encoding-hex.js'
const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))
export default class Spark {
#fetch
#activity = new ActivityState()
constructor ({ fetch = globalThis.fetch } = {}) {
this.#fetch = fetch
}
async getRetrieval () {
console.log('Getting current SPARK round details...')
const res = await this.#fetch('https://api.filspark.com/rounds/current', {
method: 'GET',
headers: { 'Content-Type': 'application/json' },
signal: AbortSignal.timeout(10_000)
})
await assertOkResponse(res, 'Failed to fetch the current SPARK round')
this.#activity.onHealthy()
const { retrievalTasks, ...round } = await res.json()
console.log('Current SPARK round:', round)
console.log(' %s retrieval tasks', retrievalTasks.length)
const retrieval = retrievalTasks[Math.floor(Math.random() * retrievalTasks.length)]
console.log({ retrieval })
return retrieval
}
async fetchCAR (url, stats) {
console.log(`Fetching ${url}...`)
// Abort if no progress was made for 60 seconds
const controller = new AbortController()
const { signal } = controller
let timeout
const resetTimeout = () => {
if (timeout) {
clearTimeout(timeout)
}
timeout = setTimeout(() => {
stats.timeout = true
controller.abort()
}, 60_000)
}
// WebCrypto API does not support streams yet, the hashing function requires entire data
// to be provided at once. See https://github.com/w3c/webcrypto/issues/73
const carBuffer = new ArrayBuffer(0, { maxByteLength: MAX_CAR_SIZE })
const carBytes = new Uint8Array(carBuffer)
try {
resetTimeout()
const res = await this.#fetch(url, { signal })
stats.statusCode = res.status
if (res.ok) {
resetTimeout()
for await (const value of res.body) {
if (stats.firstByteAt === null) {
stats.firstByteAt = new Date()
}
stats.byteLength += value.byteLength
// We want to limit how large content we are willing to download.
// 1. To make sure we don't spend too much time (and network bandwidth) on a single task,
// so that we can complete more tasks per round
// 2. Until we have streaming hashes, we need to keep the entire payload in memory, and so
// we need to put an upper limit on how much memory we consume.
if (stats.byteLength > MAX_CAR_SIZE) {
stats.carTooLarge = true
break
}
const offset = carBuffer.byteLength
carBuffer.resize(offset + value.byteLength)
carBytes.set(value, offset)
resetTimeout()
}
if (!stats.carTooLarge) {
const digest = await crypto.subtle.digest('sha-256', carBytes)
// 12 is the code for sha2-256
// 20 is the digest length (32 bytes = 256 bits)
stats.carChecksum = '1220' + encodeHex(digest)
}
} else {
console.error('Retrieval failed with status code %s: %s',
res.status, await res.text())
}
} finally {
clearTimeout(timeout)
}
stats.endAt = new Date()
console.log(stats)
}
async submitMeasurement (task, stats) {
console.log('Submitting measurement...')
const payload = {
sparkVersion: SPARK_VERSION,
zinniaVersion: Zinnia.versions.zinnia,
...task,
...stats,
participantAddress: Zinnia.walletAddress
}
console.log('%o', payload)
const res = await this.#fetch('https://api.filspark.com/measurements', {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json'
},
signal: AbortSignal.timeout(10_000)
})
await assertOkResponse(res, 'Failed to submit measurement')
const { id } = await res.json()
console.log('Measurement submitted (id: %s)', id)
return id
}
async nextRetrieval () {
const { id: retrievalId, ...retrieval } = await this.getRetrieval()
const stats = {
timeout: false,
startAt: new Date(),
firstByteAt: null,
endAt: null,
carTooLarge: false,
byteLength: 0,
carChecksum: null,
statusCode: null
}
const searchParams = new URLSearchParams({
protocols: retrieval.protocol,
providers: retrieval.providerAddress
})
const url = `ipfs://${retrieval.cid}?${searchParams.toString()}`
try {
await this.fetchCAR(url, stats)
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(err)
}
const measurementId = await this.submitMeasurement(retrieval, { ...stats })
Zinnia.jobCompleted()
return measurementId
}
async run () {
while (true) {
try {
await this.nextRetrieval()
this.#activity.onHealthy()
} catch (err) {
if (err.statusCode === 400 && err.serverMessage === 'OUTDATED CLIENT') {
this.#activity.onOutdatedClient()
} else {
this.#activity.onError()
}
console.error(err)
}
await sleep(DELAY_BETWEEN_RETRIEVALS)
}
}
}
async function assertOkResponse (res, errorMsg) {
if (res.ok) return
let body
try {
body = await res.text()
} catch {}
const err = new Error(`${errorMsg ?? 'Fetch failed'} (${res.status}): ${body}`)
err.statusCode = res.status
err.serverMessage = body
throw err
}