diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 3a0d8db1705..2c50191823d 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -820,6 +820,7 @@ udhcpc UEFI Unauthed UNCONFIGURED +Undici unexpose unexposing unfetch diff --git a/scripts/lib/download.ts b/scripts/lib/download.ts index e717447b009..3a0eb1c4dd9 100644 --- a/scripts/lib/download.ts +++ b/scripts/lib/download.ts @@ -8,6 +8,7 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; import stream from 'stream'; +import timers from 'timers/promises'; import { simpleSpawn } from 'scripts/simple_process'; @@ -29,17 +30,123 @@ export type ArchiveDownloadOptions = DownloadOptions & { entryName?: string; }; -async function fetchWithRetry(url: string) { +/** + * GrowingWritable is an implementation of stream.Writable that just buffers + * everything in memory. + */ +class GrowingWritable extends stream.Writable { + protected name: string; + protected buffer = Buffer.alloc(0); + constructor(name: string) { + super(); + this.name = name; + } + + _writev(chunks: Array<{ chunk: Buffer; encoding: BufferEncoding | 'buffer'; }>, callback: (error?: Error | null) => void): void { + // Check that all chunks have 'buffer' encoding. + const unexpectedEncoding = chunks.map(({ encoding }) => encoding).find(e => e !== 'buffer'); + + if (unexpectedEncoding) { + console.log(`${ this.name }: failed to buffer to memory: ${ unexpectedEncoding }`); + callback(new Error(`Only buffer chunks are accepted, not string with encoding ${ unexpectedEncoding }`)); + + return; + } + // Copy the buffer to avoid it being lost. + try { + this.buffer = Buffer.concat([this.buffer, ...chunks.map(({ chunk }) => chunk)]); + } catch (ex: any) { + callback(ex); + + return; + } + callback(null); + } + + get text() { + return this.buffer.toString('utf-8'); + } +} + +async function fetchWithRetry(url: string): Promise; +async function fetchWithRetry(url: string, writable: fs.WriteStream): Promise; +async function fetchWithRetry(url: string, writable?: fs.WriteStream): Promise { while (true) { try { - return await fetch(url, { redirect: 'follow' }); + const response = await fetch(url, { redirect: 'follow' }); + + if (!response.ok) { + if ([429, 500, 502, 503, 504].includes(response.status)) { + // For these responses, retry the download. + await timers.setTimeout(1_000); + continue; + } + throw new Error(`Error downloading ${ url }: ${ response.statusText }`); + } + if (!response.body) { + throw new Error(`Error downloading ${ url }: did not receive response body`); + } + const outStream = writable || new GrowingWritable(url); + const streamFinished = stream.promises.finished(outStream); + const progressTimeout = 5_000; // body timeout, in milliseconds. + let abortSignal = AbortSignal.timeout(progressTimeout); + const abortedError = new Error(`Timed out reading body`, { cause: { code: 'EAI_AGAIN' } }); + const reader = response.body.getReader(); + + while (!abortSignal.aborted) { + const abortPromise = new Promise>>((resolve, reject) => { + abortSignal.onabort = () => reject(abortedError); + }); + const { value, done } = await Promise.race([reader.read(), abortPromise]); + + // Reset the abort signal on progress; we set up `onabort` on next iteration. + abortSignal.onabort = null; + abortSignal = AbortSignal.timeout(progressTimeout); + if (done) { + await new Promise(resolve => outStream.end(resolve)); + break; + } + await new Promise((resolve) => { + if (outStream.write(value)) { + resolve(); + } else { + outStream.once('drain', resolve); + } + }); + } + if (abortSignal.aborted) { + // This can happen if we timed out waiting on `outStream.write()` etc. + throw abortedError; + } + await streamFinished; + if (!writable) { + return (outStream as GrowingWritable).text; + } + + return; } catch (ex: any) { - if (ex && ex.errno === 'EAI_AGAIN') { - console.log(`Recoverable error downloading ${ url }, retrying...`); + const hasError = (ex: any, ...codes: string[]) => { + while (ex) { + if (codes.includes(ex.errno) || codes.includes(ex.code)) { + return true; + } + ex = ex.cause; + } + + return false; + }; + const errorCodes = [ + 'EAI_AGAIN', 'ECONNRESET', 'ECONNREFUSED', 'EHOSTDOWN', + 'ENETDOWN', 'ENETUNREACH', 'ENOTFOUND']; + const UndiciPrefix = 'UND_ERR_'; // spellcheck-ignore-line + + errorCodes.push(...['BODY_TIMEOUT', 'CONNECT_TIMEOUT', 'REQ_RETRY', 'SOCKET'].map(e => UndiciPrefix + e )); + if (hasError(ex, ...errorCodes)) { + console.log(`Recoverable error ${ ex } downloading ${ url }, retrying...`); continue; } console.dir(ex); - throw ex; + throw new Error(`Error downloading ${ url }`, { cause: ex }); } } } @@ -68,22 +175,22 @@ export async function download(url: string, destPath: string, options: DownloadO } } } - console.log(`Downloading ${ url } to ${ destPath }...`); + const destPathDisplay = [ + path.dirname(destPath), + path.sep, + '\x1B[0;1;33;40m', + path.basename(destPath), + '\x1B[0m', + ].join(''); + + console.log(`Downloading ${ url } to ${ destPathDisplay }`); await fs.promises.mkdir(path.dirname(destPath), { recursive: true }); - const response = await fetchWithRetry(url); - - if (!response.ok) { - throw new Error(`Error downloading ${ url }: ${ response.statusText }`); - } - if (!response.body) { - throw new Error(`Error downloading ${ url }: did not receive response body`); - } const tempPath = `${ destPath }.download`; try { const file = fs.createWriteStream(tempPath); - await response.body.pipeTo(stream.Writable.toWeb(file)); + await fetchWithRetry(url, file); if (expectedChecksum) { const actualChecksum = await getChecksumForFile(tempPath, checksumAlgorithm); @@ -139,13 +246,7 @@ async function getChecksumForFile(inputPath: string, checksumAlgorithm: Checksum * @returns The file contents. */ export async function getResource(url: string): Promise { - const response = await fetchWithRetry(url); - - if (!response.ok) { - throw new Error(`Error downloading ${ url }: ${ response.statusText }`); - } - - return await response.text(); + return await fetchWithRetry(url); } /**