-
Notifications
You must be signed in to change notification settings - Fork 19
/
import.js
130 lines (117 loc) · 3.93 KB
/
import.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* Import a pinlist (newline separated list of CIDs) into nft.storage.
*
* Usage:
* node import.js pinlist.txt
* # start from line 1000
* node import.js pinlist.txt --start 1000
*/
import fs from 'fs'
import ora from 'ora'
import { pipeline } from 'stream/promises'
import dotenv from 'dotenv'
import * as d3 from 'd3-format'
import fetch from '@web-std/fetch'
import { RateLimiter } from 'limiter'
import retry from 'p-retry'
import AbortController from 'abort-controller'
import batch from 'it-batch'
import split from './lib/split.js'
import drop from './lib/drop.js'
dotenv.config()
const format = d3.format(',')
const BATCH_SIZE = 1000 // process CIDs in batches of this size
const RATE_LIMIT = [2, /* per */ 'second'] // rate limit requests to nft.storage
const RETRIES = 5 // failed request retries
function parseArgs () {
const apiKey = process.env.API_KEY
if (!apiKey) throw new Error('missing nft.storage API key')
const filePath = process.argv[2]
if (!filePath) throw new Error('missing path to newline delimited CID list')
let startLine = 0
if ((process.argv[3] || '').startsWith('--start')) {
startLine = parseInt(process.argv[3] === '--start' ? process.argv[4] : process.argv[3].split('=')[1])
}
const endpoint = process.env.ENDPOINT || 'https://api.nft.storage'
return { endpoint, apiKey, filePath, startLine }
}
async function main () {
const { endpoint, apiKey, filePath, startLine } = parseArgs()
const spinner = ora()
const pinner = new Pinner({ apiKey, endpoint })
const start = Date.now()
const totals = { total: 0, running: 0, requests: 0, reqsPerSec: 0 }
console.log(`🔌 Using endpoint: ${endpoint}`)
if (startLine > 0) {
console.log(`⏩ starting from line ${format(startLine)}`)
}
spinner.start()
try {
await pipeline(
fs.createReadStream(filePath),
split,
cids => drop(cids, startLine),
cids => batch(cids, BATCH_SIZE),
async batchedCids => {
for await (const cids of batchedCids) {
await Promise.all(cids.map(async cid => {
try {
totals.running++
spinner.text = toText('Importing...', totals)
await pinner.pin(cid)
} finally {
totals.running--
totals.total++
totals.requests++
totals.reqsPerSec = totals.requests / ((Date.now() - start) / 1000)
spinner.text = toText('Importing...', totals)
}
}))
}
}
)
} catch (err) {
spinner.stopAndPersist({ text: toText('Errored', totals) })
spinner.fail(`Error: ${err.message}`)
throw err
}
spinner.succeed(toText('Complete!', totals))
}
function toText (prefix, totals) {
const items = [`💖 Total sent: ${format(totals.total)}`]
if (totals.running) {
items.push(`📌 Sending: ${totals.running}`)
}
items.push(`🔁 Requests/sec: ${totals.reqsPerSec.toFixed(1)}`)
return `${prefix}\n${items.join('\n')}`
}
class Pinner {
constructor ({ apiKey, endpoint }) {
this.apiKey = apiKey
this.pinsURL = new URL('pins', endpoint).toString()
this.limiter = new RateLimiter({ tokensPerInterval: RATE_LIMIT[0], interval: RATE_LIMIT[1] })
}
async pin (cid) {
await this.limiter.removeTokens(1)
const res = await retry(async () => {
const controller = new AbortController()
const abortID = setTimeout(() => controller.abort(), 60000)
try {
const res = await fetch(this.pinsURL, {
method: 'POST',
headers: { Authorization: `Bearer ${this.apiKey}` },
body: JSON.stringify({ cid }),
signal: controller.signal
})
const text = await res.text()
if (!res.ok) {
throw new Error(`pinning ${cid}: ${res.status}: ${res.statusText}\nHeaders: ${Array.from(res.headers.entries())}\nBody: ${text}`)
}
} finally {
clearTimeout(abortID)
}
}, { retries: RETRIES })
return res
}
}
main()