diff --git a/integration-tests/gatsby-pipeline/__tests__/fetch-remote-file/index.js b/integration-tests/gatsby-pipeline/__tests__/fetch-remote-file/index.js new file mode 100644 index 0000000000000..8078622a170d4 --- /dev/null +++ b/integration-tests/gatsby-pipeline/__tests__/fetch-remote-file/index.js @@ -0,0 +1,66 @@ +/** + * We want to make sure that fetch-remote-file is working with multi workers. + */ + +const execa = require(`execa`) +const path = require(`path`) +const glob = require(`glob`) +const fs = require(`fs-extra`) +const md5File = require(`md5-file`) +const basePath = path.resolve(__dirname, `../../`) + +const cleanDirs = () => + Promise.all([ + fs.emptyDir(`${basePath}/public`), + fs.emptyDir(`${basePath}/.cache`), + ]) + +describe(`fetch-remote-file`, () => { + beforeAll(async () => { + await cleanDirs() + await execa(`yarn`, [`build`], { + cwd: basePath, + // we want to force 1 query per worker + env: { NODE_ENV: `production`, GATSBY_PARALLEL_QUERY_CHUNK_SIZE: `1` }, + }) + }, 60 * 1000) + + it("should have the correct md5", async () => { + expect( + await md5File( + path.join( + __dirname, + "../..", + "public/images/50c58a791de3c2303e62084d731799eb/photoA.jpg" + ) + ) + ).toEqual("a9e57a66a10b2d26a1999a4685d7c9ef") + expect( + await md5File( + path.join( + __dirname, + "../..", + "public/images/4910e745c3c453b8795d6ba65c79d99b/photoB.jpg" + ) + ) + ).toEqual("c305dc5c5db45cc773231a507af5116d") + expect( + await md5File( + path.join( + __dirname, + "../..", + "public/images/fb673e75e9534b3cc2d2e24085386d48/photoC.jpg" + ) + ) + ).toEqual("4ba953ba27236727d7abe7d5b8916432") + }) + + /** + * this is a bit of a cheeky test but we just want to make sure we're actually running on multiple workers + */ + it("should have conflict between workers", async () => { + const files = await fs.readdir(path.join(__dirname, "../../.cache/workers")) + + expect(files.length).toBeGreaterThan(1) + }) +}) diff --git a/integration-tests/gatsby-pipeline/gatsby-node.js b/integration-tests/gatsby-pipeline/gatsby-node.js new file mode 100644 index 0000000000000..07cf62385916c --- /dev/null +++ b/integration-tests/gatsby-pipeline/gatsby-node.js @@ -0,0 +1,69 @@ +const { fetchRemoteFile } = require("gatsby-core-utils/fetch-remote-file") +const { slash } = require("gatsby-core-utils") +const path = require("path") +const fs = require("fs-extra") + +/** @type{import('gatsby').createSchemaCustomization} */ +exports.createSchemaCustomization = ({ actions, schema, cache, reporter }) => { + actions.createTypes( + schema.buildObjectType({ + name: "MyRemoteFile", + fields: { + url: "String!", + publicUrl: { + type: "String!", + async resolve(source) { + const filePath = await fetchRemoteFile({ + name: path.basename(source.name, path.extname(source.name)), + ext: path.extname(source.name), + url: source.url, + directory: "./public/images", + }) + + const dir = path.join(global.__GATSBY.root, ".cache", "workers") + await fs.ensureDir(dir) + await fs.createFile( + `${path.join(dir, `worker-${process.env.GATSBY_WORKER_ID}`)}` + ) + + const workers = (await cache.get("workers")) ?? [] + workers.push(process.env.GATSBY_WORKER_ID) + + return `${slash(filePath.replace(/^public/, ""))}` + }, + }, + }, + interfaces: ["Node"], + }) + ) +} + +/** @type {imporg('gatsby').sourceNodes} */ +exports.sourceNodes = ({ actions, createNodeId, createContentDigest }) => { + const items = [ + { + name: "photoA.jpg", + url: "https://images.unsplash.com/photo-1517849845537-4d257902454a?ixlib=rb-1.2.1&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=300&q=80", + }, + { + name: "photoB.jpg", + url: "https://images.unsplash.com/photo-1552053831-71594a27632d?ixlib=rb-1.2.1&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=300&q=80", + }, + { + name: "photoC.jpg", + url: "https://images.unsplash.com/photo-1561037404-61cd46aa615b?ixlib=rb-1.2.1&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=300&q=80", + }, + ] + + items.forEach((item, index) => { + actions.createNode({ + id: createNodeId(`remote-file-${index}`), + name: item.name, + url: item.url, + internal: { + type: "MyRemoteFile", + contentDigest: createContentDigest(item.url), + }, + }) + }) +} diff --git a/integration-tests/gatsby-pipeline/src/pages/fetch-remote-a.js b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-a.js new file mode 100644 index 0000000000000..578769d050771 --- /dev/null +++ b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-a.js @@ -0,0 +1,31 @@ +import React from "react" +import { graphql, Link } from "gatsby" + +import Layout from "../components/layout" +import SEO from "../components/seo" + +const FetchRemoteA = ({ data }) => { + return ( + + + +
+      Go back to the homepage
+    
+  )
+}
+
+export default FetchRemoteA
+
+export const pageQuery = graphql`
+  {
+    allMyRemoteFile {
+      nodes {
+        url
+        publicUrl
+      }
+    }
+  }
+`
diff --git a/integration-tests/gatsby-pipeline/src/pages/fetch-remote-b.js b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-b.js
new file mode 100644
index 0000000000000..e7e86751904a4
--- /dev/null
+++ b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-b.js
@@ -0,0 +1,32 @@
+import React from "react"
+import { graphql, Link } from "gatsby"
+
+import Layout from "../components/layout"
+import SEO from "../components/seo"
+
+const FetchRemoteB = ({ data }) => {
+  return (
+    
+      
+
+      
+
+      Go back to the homepage
+    
+  )
+}
+
+export default FetchRemoteB
+
+export const pageQuery = graphql`
+  {
+    allMyRemoteFile {
+      nodes {
+        url
+        publicUrl
+      }
+    }
+  }
+`
diff --git a/integration-tests/gatsby-pipeline/src/pages/fetch-remote-c.js b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-c.js
new file mode 100644
index 0000000000000..6e3535c399090
--- /dev/null
+++ b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-c.js
@@ -0,0 +1,31 @@
+import React from "react"
+import { graphql, Link } from "gatsby"
+
+import Layout from "../components/layout"
+import SEO from "../components/seo"
+
+const FetchRemoteB = ({ data }) => {
+  return (
+    
+      
+
+      
+      Go back to the homepage
+    
+  )
+}
+
+export default FetchRemoteB
+
+export const pageQuery = graphql`
+  {
+    allMyRemoteFile {
+      nodes {
+        url
+        publicUrl
+      }
+    }
+  }
+`
diff --git a/integration-tests/gatsby-pipeline/src/pages/fetch-remote-d.js b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-d.js
new file mode 100644
index 0000000000000..88312818acc3c
--- /dev/null
+++ b/integration-tests/gatsby-pipeline/src/pages/fetch-remote-d.js
@@ -0,0 +1,31 @@
+import React from "react"
+import { graphql, Link } from "gatsby"
+
+import Layout from "../components/layout"
+import SEO from "../components/seo"
+
+const FetchRemoteB = ({ data }) => {
+  return (
+    
+      
+
+      
+      Go back to the homepage
+    
+  )
+}
+
+export default FetchRemoteB
+
+export const pageQuery = graphql`
+  {
+    allMyRemoteFile {
+      nodes {
+        url
+        publicUrl
+      }
+    }
+  }
+`
diff --git a/integration-tests/gatsby-source-wordpress/jest.config.js b/integration-tests/gatsby-source-wordpress/jest.config.js
index 9248bce5aebb8..d468a716f3da8 100644
--- a/integration-tests/gatsby-source-wordpress/jest.config.js
+++ b/integration-tests/gatsby-source-wordpress/jest.config.js
@@ -1,4 +1,7 @@
 module.exports = {
   testPathIgnorePatterns: [`/node_modules/`, `__tests__/fixtures`, `.cache`],
   bail: true,
+  moduleNameMapper: {
+    "^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
+  },
 }
diff --git a/packages/gatsby-core-utils/src/__tests__/fetch-remote-file.js b/packages/gatsby-core-utils/src/__tests__/fetch-remote-file.js
index 42965850d2f8e..5c846355e19b3 100644
--- a/packages/gatsby-core-utils/src/__tests__/fetch-remote-file.js
+++ b/packages/gatsby-core-utils/src/__tests__/fetch-remote-file.js
@@ -2,33 +2,19 @@
 
 import path from "path"
 import zlib from "zlib"
-import os from "os"
 import { rest } from "msw"
 import { setupServer } from "msw/node"
 import { Writable } from "stream"
 import got from "got"
 import fs from "fs-extra"
+import { fetchRemoteFile } from "../fetch-remote-file"
+import * as storage from "../utils/get-storage"
 
-jest.mock(`got`, () => {
-  const realGot = jest.requireActual(`got`)
+jest.spyOn(storage, `getDatabaseDir`)
+jest.spyOn(got, `stream`)
+jest.spyOn(fs, `move`)
 
-  return {
-    ...realGot,
-    default: {
-      ...realGot,
-      stream: jest.fn(realGot.stream),
-    },
-  }
-})
 const gotStream = got.stream
-jest.mock(`fs-extra`, () => {
-  const realFs = jest.requireActual(`fs-extra`)
-
-  return {
-    ...realFs,
-    move: jest.fn(realFs.move),
-  }
-})
 const fsMove = fs.move
 
 const urlCount = new Map()
@@ -233,31 +219,7 @@ const server = setupServer(
   )
 )
 
-function getFetchInWorkerContext(workerId) {
-  let fetchRemoteInstance
-  jest.isolateModules(() => {
-    const send = process.send
-    process.env.GATSBY_WORKER_ID = workerId
-    process.send = jest.fn()
-    process.env.GATSBY_WORKER_MODULE_PATH = `123`
-
-    fetchRemoteInstance = require(`../fetch-remote-file`).fetchRemoteFile
-
-    delete process.env.GATSBY_WORKER_MODULE_PATH
-    delete process.env.GATSBY_WORKER_ID
-    process.send = send
-  })
-
-  return fetchRemoteInstance
-}
-
-async function createMockCache() {
-  const tmpDir = fs.mkdtempSync(
-    path.join(os.tmpdir(), `gatsby-source-filesystem-`)
-  )
-
-  fs.ensureDir(tmpDir)
-
+async function createMockCache(tmpDir) {
   return {
     get: jest.fn(() => Promise.resolve(null)),
     set: jest.fn(() => Promise.resolve(null)),
@@ -267,21 +229,21 @@ async function createMockCache() {
 
 describe(`fetch-remote-file`, () => {
   let cache
-  let fetchRemoteFile
+  const cachePath = path.join(__dirname, `.cache-fetch`)
 
   beforeAll(async () => {
-    cache = await createMockCache()
     // Establish requests interception layer before all tests.
     server.listen()
+
+    cache = await createMockCache(cachePath)
+    await fs.ensureDir(cachePath)
+    storage.getDatabaseDir.mockReturnValue(cachePath)
   })
-  afterAll(() => {
-    if (cache) {
-      try {
-        fs.removeSync(cache.directory)
-      } catch (err) {
-        // ignore
-      }
-    }
+
+  afterAll(async () => {
+    await storage.closeDatabase()
+    await fs.remove(cachePath)
+    delete global.__GATSBY
 
     // Clean up after all tests are done, preventing this
     // interception layer from affecting irrelevant tests.
@@ -289,18 +251,15 @@ describe(`fetch-remote-file`, () => {
   })
 
   beforeEach(() => {
+    // simulate a new build each run
+    global.__GATSBY = {
+      buildId: global.__GATSBY?.buildId
+        ? String(Number(global.__GATSBY.buildId) + 1)
+        : `1`,
+    }
     gotStream.mockClear()
     fsMove.mockClear()
     urlCount.clear()
-
-    jest.isolateModules(() => {
-      // we need to bypass the cache for each test
-      fetchRemoteFile = require(`../fetch-remote-file`).fetchRemoteFile
-    })
-  })
-
-  afterEach(() => {
-    jest.useRealTimers()
   })
 
   it(`downloads and create a svg file`, async () => {
@@ -380,314 +339,24 @@ describe(`fetch-remote-file`, () => {
     expect(gotStream).toBeCalledTimes(1)
   })
 
-  it(`only writes the file once when multiple workers fetch at the same time`, async () => {
-    // we don't want to wait for polling to finish
-    jest.useFakeTimers()
-    jest.runAllTimers()
-
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-    const fetchRemoteFileInstanceTwo = getFetchInWorkerContext(`2`)
-
-    const requests = [
-      fetchRemoteFileInstanceOne({
-        url: `http://external.com/logo.svg`,
-        cache: workerCache,
-      }),
-      fetchRemoteFileInstanceTwo({
-        url: `http://external.com/logo.svg`,
-        cache: workerCache,
-      }),
-    ]
-
-    // reverse order as last writer wins
-    await requests[1]
-    jest.runAllTimers()
-    await requests[0]
-
-    // we still expect 2 fetches because cache can't save fast enough
-    expect(gotStream).toBeCalledTimes(2)
-    expect(fsMove).toBeCalledTimes(1)
-  })
-
-  it(`it clears the mutex cache when new build id is present`, async () => {
-    // we don't want to wait for polling to finish
-    jest.useFakeTimers()
-    jest.runAllTimers()
-
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-    const fetchRemoteFileInstanceTwo = getFetchInWorkerContext(`2`)
-
-    global.__GATSBY = { buildId: `1` }
-    let requests = [
-      fetchRemoteFileInstanceOne({
-        url: `http://external.com/logo.svg`,
-        cache: workerCache,
-      }),
-      fetchRemoteFileInstanceTwo({
-        url: `http://external.com/logo.svg`,
-        cache: workerCache,
-      }),
-    ]
-
-    // reverse order as last writer wins
-    await requests[1]
-    jest.runAllTimers()
-    await requests[0]
-    jest.runAllTimers()
-
-    global.__GATSBY = { buildId: `2` }
-    requests = [
-      fetchRemoteFileInstanceOne({
-        url: `http://external.com/logo.svg`,
-        cache: workerCache,
-      }),
-      fetchRemoteFileInstanceTwo({
-        url: `http://external.com/logo.svg`,
-        cache: workerCache,
-      }),
-    ]
-
-    // reverse order as last writer wins
-    await requests[1]
-    jest.runAllTimers()
-    await requests[0]
-
-    // we still expect 4 fetches because cache can't save fast enough
-    expect(gotStream).toBeCalledTimes(4)
-    expect(fsMove).toBeCalledTimes(2)
-  })
-
-  it(`handles 304 responses correctly in different builds`, async () => {
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    global.__GATSBY = { buildId: `1` }
+  it(`handles 304 responses correctly`, async () => {
+    const currentGlobal = global.__GATSBY
+    global.__GATSBY = { buildId: `304-1` }
     const filePath = await fetchRemoteFile({
       url: `http://external.com/dog-304.jpg`,
-      cache: workerCache,
+      directory: cachePath,
     })
 
-    global.__GATSBY = { buildId: `2` }
+    global.__GATSBY = { buildId: `304-2` }
     const filePathCached = await fetchRemoteFile({
       url: `http://external.com/dog-304.jpg`,
-      cache: workerCache,
-    })
-
-    expect(filePathCached).toBe(filePath)
-    expect(fsMove).toBeCalledTimes(1)
-    expect(gotStream).toBeCalledTimes(2)
-  })
-
-  it(`doesn't keep lock when file download failed`, async () => {
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-    const fetchRemoteFileInstanceTwo = getFetchInWorkerContext(`2`)
-
-    await expect(
-      fetchRemoteFileInstanceOne({
-        url: `http://external.com/500.jpg`,
-        cache: workerCache,
-      })
-    ).rejects.toThrow()
-
-    await expect(
-      fetchRemoteFileInstanceTwo({
-        url: `http://external.com/500.jpg`,
-        cache: workerCache,
-      })
-    ).rejects.toThrow()
-
-    expect(gotStream).toBeCalledTimes(3)
-    expect(fsMove).toBeCalledTimes(0)
-  })
-
-  it(`downloading a file in main process after downloading it in worker`, async () => {
-    // we don't want to wait for polling to finish
-    jest.useFakeTimers()
-    jest.runAllTimers()
-
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-
-    const resultFromWorker = await fetchRemoteFileInstanceOne({
-      url: `http://external.com/logo.svg`,
-      cache: workerCache,
-    })
-
-    jest.runAllTimers()
-
-    const resultFromMain = await fetchRemoteFile({
-      url: `http://external.com/logo.svg`,
-      cache: workerCache,
-    })
-
-    expect(resultFromWorker).not.toBeUndefined()
-    expect(resultFromMain).not.toBeUndefined()
-
-    jest.useRealTimers()
-
-    expect(gotStream).toBeCalledTimes(1)
-    expect(fsMove).toBeCalledTimes(1)
-  })
-
-  it(`downloading a file in worker process after downloading it in main`, async () => {
-    // we don't want to wait for polling to finish
-    jest.useFakeTimers()
-    jest.runAllTimers()
-
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-
-    const resultFromMain = await fetchRemoteFile({
-      url: `http://external.com/logo.svg`,
-      cache: workerCache,
-    })
-
-    jest.runAllTimers()
-
-    const resultFromWorker = await fetchRemoteFileInstanceOne({
-      url: `http://external.com/logo.svg`,
-      cache: workerCache,
-    })
-
-    jest.runAllTimers()
-    jest.useRealTimers()
-
-    expect(resultFromWorker).not.toBeUndefined()
-    expect(resultFromMain).not.toBeUndefined()
-    expect(gotStream).toBeCalledTimes(1)
-    expect(fsMove).toBeCalledTimes(1)
-  })
-
-  it(`downloading a file in worker process after downloading it in another worker`, async () => {
-    // we don't want to wait for polling to finish
-    jest.useFakeTimers()
-    jest.runAllTimers()
-
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-    const fetchRemoteFileInstanceTwo = getFetchInWorkerContext(`2`)
-
-    const resultFromWorker1 = await fetchRemoteFileInstanceOne({
-      url: `http://external.com/logo.svg`,
-      cache: workerCache,
-    })
-    jest.runAllTimers()
-
-    const resultFromWorker2 = await fetchRemoteFileInstanceTwo({
-      url: `http://external.com/logo.svg`,
-      cache: workerCache,
-    })
-
-    jest.runAllTimers()
-    jest.useRealTimers()
-
-    expect(resultFromWorker1).not.toBeUndefined()
-    expect(resultFromWorker2).not.toBeUndefined()
-    expect(gotStream).toBeCalledTimes(1)
-    expect(fsMove).toBeCalledTimes(1)
-  })
-
-  it(`handles 304 responses correctly in different builds and workers`, async () => {
-    const cacheInternals = new Map()
-    const workerCache = {
-      get(key) {
-        return Promise.resolve(cacheInternals.get(key))
-      },
-      set(key, value) {
-        return Promise.resolve(cacheInternals.set(key, value))
-      },
-      directory: cache.directory,
-    }
-
-    const fetchRemoteFileInstanceOne = getFetchInWorkerContext(`1`)
-    const fetchRemoteFileInstanceTwo = getFetchInWorkerContext(`2`)
-
-    global.__GATSBY = { buildId: `1` }
-    const filePath = await fetchRemoteFileInstanceOne({
-      url: `http://external.com/dog-304.jpg`,
-      cache: workerCache,
-    })
-
-    global.__GATSBY = { buildId: `2` }
-    const filePathCached = await fetchRemoteFileInstanceTwo({
-      url: `http://external.com/dog-304.jpg`,
-      cache: workerCache,
+      directory: cachePath,
     })
 
     expect(filePathCached).toBe(filePath)
     expect(fsMove).toBeCalledTimes(1)
     expect(gotStream).toBeCalledTimes(2)
+    global.__GATSBY = currentGlobal
   })
 
   it(`fails when 404 is triggered`, async () => {
diff --git a/packages/gatsby-core-utils/src/fetch-remote-file.ts b/packages/gatsby-core-utils/src/fetch-remote-file.ts
index f739bc4104e4b..966f31ccc362b 100644
--- a/packages/gatsby-core-utils/src/fetch-remote-file.ts
+++ b/packages/gatsby-core-utils/src/fetch-remote-file.ts
@@ -1,109 +1,101 @@
-import got, { Headers, Options, RequestError } from "got"
 import fileType from "file-type"
 import path from "path"
 import fs from "fs-extra"
+import Queue from "fastq"
 import { createContentDigest } from "./create-content-digest"
 import {
   getRemoteFileName,
   getRemoteFileExtension,
   createFilePath,
 } from "./filename-utils"
-import type { IncomingMessage } from "http"
-import type { GatsbyCache } from "gatsby"
-import Queue from "fastq"
-import type { queue, done } from "fastq"
-
-export interface IFetchRemoteFileOptions {
-  url: string
-  cache: GatsbyCache
-  auth?: {
-    htaccess_pass?: string
-    htaccess_user?: string
-  }
-  httpHeaders?: Headers
-  ext?: string
-  name?: string
-  maxAttempts?: number
-}
-
-// copied from gatsby-worker
-const IS_WORKER = !!(process.send && process.env.GATSBY_WORKER_MODULE_PATH)
-const WORKER_ID = process.env.GATSBY_WORKER_ID
-
-const cacheIdForWorkers = (url: string): string => `remote-file-workers-${url}`
-const cacheIdForHeaders = (url: string): string => `remote-file-headers-${url}`
-const cacheIdForExtensions = (url: string): string =>
-  `remote-file-extension-${url}`
-
-const STALL_RETRY_LIMIT = process.env.GATSBY_STALL_RETRY_LIMIT
-  ? parseInt(process.env.GATSBY_STALL_RETRY_LIMIT, 10)
-  : 3
-const STALL_TIMEOUT = process.env.GATSBY_STALL_TIMEOUT
-  ? parseInt(process.env.GATSBY_STALL_TIMEOUT, 10)
-  : 30000
-
-const CONNECTION_TIMEOUT = process.env.GATSBY_CONNECTION_TIMEOUT
-  ? parseInt(process.env.GATSBY_CONNECTION_TIMEOUT, 10)
-  : 30000
-
-const INCOMPLETE_RETRY_LIMIT = process.env.GATSBY_INCOMPLETE_RETRY_LIMIT
-  ? parseInt(process.env.GATSBY_INCOMPLETE_RETRY_LIMIT, 10)
-  : 3
-
-// jest doesn't allow us to run all timings infinitely, so we set it 0  in tests
-const BACKOFF_TIME = process.env.NODE_ENV === `test` ? 0 : 1000
-
-function range(start: number, end: number): Array {
-  return Array(end - start)
-    .fill(null)
-    .map((_, i) => start + i)
+import { slash } from "./path"
+import { requestRemoteNode } from "./remote-file-utils/fetch-file"
+import { getStorage, getDatabaseDir } from "./utils/get-storage"
+import { createMutex } from "./mutex"
+import type { Options } from "got"
+import type { IFetchRemoteFileOptions } from "./remote-file-utils/fetch-file"
+
+interface ITask {
+  args: IFetchRemoteFileOptions
 }
 
-// Based on the defaults of https://github.com/JustinBeckwith/retry-axios
-const STATUS_CODES_TO_RETRY = [...range(100, 200), 429, ...range(500, 600)]
-const ERROR_CODES_TO_RETRY = [
-  `ETIMEDOUT`,
-  `ECONNRESET`,
-  `EADDRINUSE`,
-  `ECONNREFUSED`,
-  `EPIPE`,
-  `ENOTFOUND`,
-  `ENETUNREACH`,
-  `EAI_AGAIN`,
-  `ERR_NON_2XX_3XX_RESPONSE`,
-  `ERR_GOT_REQUEST_ERROR`,
-]
-
-/********************
- * Queue Management *
- ********************/
-
 const GATSBY_CONCURRENT_DOWNLOAD = process.env.GATSBY_CONCURRENT_DOWNLOAD
   ? parseInt(process.env.GATSBY_CONCURRENT_DOWNLOAD, 10) || 0
   : 50
 
-const q: queue = Queue(
-  fetchWorker,
-  GATSBY_CONCURRENT_DOWNLOAD
-)
+const alreadyCopiedFiles = new Set()
+
+export type { IFetchRemoteFileOptions }
 
 /**
- * fetchWorker
- * --
- * Handle fetch requests that are pushed in to the Queue
+ * Downloads a remote file to disk
  */
-async function fetchWorker(
-  task: IFetchRemoteFileOptions,
-  cb: done
-): Promise {
-  try {
-    const node = await fetchFile(task)
-    return void cb(null, node)
-  } catch (e) {
-    return void cb(e)
+export async function fetchRemoteFile(
+  args: IFetchRemoteFileOptions
+): Promise {
+  // when cachekey is present we can do more persistance
+  if (args.cacheKey) {
+    const storage = getStorage(getDatabaseDir())
+    const info = storage.remoteFileInfo.get(args.url)
+
+    const fileDirectory = (
+      args.cache ? args.cache.directory : args.directory
+    ) as string
+
+    if (info?.cacheKey === args.cacheKey && fileDirectory) {
+      const cachedPath = path.join(info.directory, info.path)
+      const downloadPath = path.join(fileDirectory, info.path)
+
+      if (await fs.pathExists(cachedPath)) {
+        // If the cached directory is not part of the public directory, we don't need to copy it
+        // as it won't be part of the build.
+        if (
+          !cachedPath.startsWith(
+            path.join(global.__GATSBY?.root ?? process.cwd(), `public`)
+          )
+        ) {
+          return cachedPath
+        }
+
+        // Create a mutex to do our copy - we could do a md5 hash check as well but that's also expensive
+        if (alreadyCopiedFiles.has(downloadPath)) {
+          alreadyCopiedFiles.add(downloadPath)
+
+          const copyFileMutex = createMutex(
+            `gatsby-core-utils:copy-fetch:${downloadPath}`,
+            200
+          )
+          await copyFileMutex.acquire()
+          await fs.copy(cachedPath, downloadPath, {
+            overwrite: true,
+          })
+          await copyFileMutex.release()
+        }
+
+        return downloadPath
+      }
+    }
   }
+
+  return pushTask({ args })
 }
 
+const queue = Queue(
+  /**
+   * fetchWorker
+   * --
+   * Handle fetch requests that are pushed in to the Queue
+   */
+  async function fetchWorker(task, cb): Promise {
+    try {
+      return void cb(null, await fetchFile(task.args))
+    } catch (e) {
+      return void cb(e)
+    }
+  },
+  GATSBY_CONCURRENT_DOWNLOAD
+)
+
 /**
  * pushTask
  * --
@@ -111,152 +103,79 @@ async function fetchWorker(
  *
  * Promisfy a task in queue
  * @param {CreateRemoteFileNodePayload} task
- * @return {Promise}
+ * @return {Promise}
  */
-async function pushTask(task: IFetchRemoteFileOptions): Promise {
+async function pushTask(task: ITask): Promise {
   return new Promise((resolve, reject) => {
-    q.push(task, (err, node) => {
+    queue.push(task, (err, node) => {
       if (!err) {
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        resolve(node!)
+        resolve(node as string)
       } else {
         reject(err)
       }
     })
   })
 }
-let fetchCache = new Map()
-let latestBuildId = ``
 
-/***************************
- * Fetch remote file logic *
- ***************************/
-
-export async function fetchRemoteFile(
-  args: IFetchRemoteFileOptions
-): Promise {
-  const BUILD_ID = global.__GATSBY?.buildId ?? ``
-  if (BUILD_ID !== latestBuildId) {
-    latestBuildId = BUILD_ID
-    fetchCache = new Map()
-  }
-
-  // If we are already fetching the file, return the unresolved promise
-  const inFlight = fetchCache.get(args.url)
-  if (inFlight) {
-    return inFlight
-  }
-
-  // Create file fetch promise and store it into cache
-  const fetchPromise = pushTask(args)
-  fetchCache.set(args.url, fetchPromise)
-
-  return fetchPromise.catch(err => {
-    fetchCache.delete(args.url)
-
-    throw err
-  })
-}
-
-function pollUntilComplete(
-  cache: GatsbyCache,
-  url: string,
-  buildId: string,
-  cb: (err?: Error, result?: string) => void
-): void {
-  cache.get(cacheIdForWorkers(url)).then(entry => {
-    if (!entry || entry.buildId !== buildId) {
-      return void cb()
-    }
-
-    if (entry.status === `complete`) {
-      cb(undefined, entry.result)
-    } else if (entry.status === `failed`) {
-      cb(new Error(entry.result))
-    } else {
-      setTimeout(() => {
-        pollUntilComplete(cache, url, buildId, cb)
-        // Magic number
-      }, 500)
-    }
-
-    return undefined
-  })
-
-  return undefined
-}
-
-// TODO Add proper mutex instead of file cache hacks
 async function fetchFile({
   url,
   cache,
+  directory,
   auth = {},
   httpHeaders = {},
   ext,
   name,
+  cacheKey,
 }: IFetchRemoteFileOptions): Promise {
   // global introduced in gatsby 4.0.0
   const BUILD_ID = global.__GATSBY?.buildId ?? ``
-  const pluginCacheDir = cache.directory
-
-  // when a cache entry is present we wait until it completes
-  const result = await new Promise((resolve, reject) => {
-    pollUntilComplete(cache, url, BUILD_ID, (err, result) => {
-      if (err) {
-        return reject(err)
-      }
+  const fileDirectory = (cache ? cache.directory : directory) as string
+  const storage = getStorage(getDatabaseDir())
 
-      return resolve(result)
-    })
-  })
-
-  if (result) {
-    return result
+  if (!cache && !directory) {
+    throw new Error(`You must specify either a cache or a directory`)
   }
 
-  await cache.set(cacheIdForWorkers(url), {
-    status: `pending`,
-    result: null,
-    workerId: WORKER_ID,
-    buildId: BUILD_ID,
-  })
+  const fetchFileMutex = createMutex(`gatsby-core-utils:fetch:${url}`)
+  await fetchFileMutex.acquire()
 
-  // See if there's response headers for this url
-  // from a previous request.
-  const { headers: cachedHeaders, digest: originalDigest } =
-    (await cache.get(cacheIdForHeaders(url))) ?? {}
-  const headers = { ...httpHeaders }
-  if (cachedHeaders && cachedHeaders.etag) {
-    headers[`If-None-Match`] = cachedHeaders.etag
-  }
+  // Fetch the file.
+  try {
+    const inFlightValue = getInFlightObject(url, BUILD_ID)
+    if (inFlightValue) {
+      return inFlightValue
+    }
 
-  // Add htaccess authentication if passed in. This isn't particularly
-  // extensible. We should define a proper API that we validate.
-  const httpOptions: Options = {}
-  if (auth && (auth.htaccess_pass || auth.htaccess_user)) {
-    httpOptions.username = auth.htaccess_user
-    httpOptions.password = auth.htaccess_pass
-  }
+    const cachedEntry = await storage.remoteFileInfo.get(url)
 
-  // Create the temp and permanent file names for the url.
-  let digest = createContentDigest(url)
+    // See if there's response headers for this url
+    // from a previous request.
+    const headers = { ...httpHeaders }
+    if (cachedEntry?.headers?.etag) {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      headers[`If-None-Match`] = cachedEntry.headers.etag
+    }
 
-  // if worker id is present - we also append the worker id until we have a proper mutex
-  if (IS_WORKER) {
-    digest += `-${WORKER_ID}`
-  }
+    // Add htaccess authentication if passed in. This isn't particularly
+    // extensible. We should define a proper API that we validate.
+    const httpOptions: Options = {}
+    if (auth && (auth.htaccess_pass || auth.htaccess_user)) {
+      httpOptions.username = auth.htaccess_user
+      httpOptions.password = auth.htaccess_pass
+    }
 
-  if (!name) {
-    name = getRemoteFileName(url)
-  }
-  if (!ext) {
-    ext = getRemoteFileExtension(url)
-  }
+    if (!name) {
+      name = getRemoteFileName(url)
+    }
+
+    if (!ext) {
+      ext = getRemoteFileExtension(url)
+    }
 
-  const tmpFilename = createFilePath(pluginCacheDir, `tmp-${digest}`, ext)
+    const digest = createContentDigest(url)
+    await fs.ensureDir(path.join(fileDirectory, digest))
 
-  // Fetch the file.
-  try {
+    const tmpFilename = createFilePath(fileDirectory, `tmp-${digest}`, ext)
     const response = await requestRemoteNode(
       url,
       headers,
@@ -264,278 +183,67 @@ async function fetchFile({
       httpOptions
     )
 
+    const filename = createFilePath(path.join(fileDirectory, digest), name, ext)
     if (response.statusCode === 200) {
       // Save the response headers for future requests.
-      await cache.set(cacheIdForHeaders(url), {
-        headers: response.headers,
-        digest,
-      })
-
       // If the user did not provide an extension and we couldn't get one from remote file, try and guess one
       if (!ext) {
         // if this is fresh response - try to guess extension and cache result for future
         const filetype = await fileType.fromFile(tmpFilename)
         if (filetype) {
           ext = `.${filetype.ext}`
-          await cache.set(cacheIdForExtensions(url), ext)
         }
       }
-    } else if (response.statusCode === 304) {
-      if (!ext) {
-        ext = await cache.get(cacheIdForExtensions(url))
-      }
-    }
 
-    // Multiple processes have started the fetch and we need another check to only let one complete
-    const cacheEntry = await cache.get(cacheIdForWorkers(url))
-    if (cacheEntry && cacheEntry.workerId !== WORKER_ID) {
-      return new Promise((resolve, reject) => {
-        pollUntilComplete(cache, url, BUILD_ID, (err, result) => {
-          if (err) {
-            return reject(err)
-          }
-
-          return resolve(result as string)
-        })
-      })
-    }
-
-    // If the status code is 200, move the piped temp file to the real name.
-    const filename = createFilePath(
-      path.join(pluginCacheDir, originalDigest ?? digest),
-      name,
-      ext as string
-    )
-
-    if (response.statusCode === 200) {
       await fs.move(tmpFilename, filename, { overwrite: true })
-      // Else if 304, remove the empty response.
-    } else {
-      await fs.remove(tmpFilename)
-    }
 
-    await cache.set(cacheIdForWorkers(url), {
-      status: `complete`,
-      result: filename,
-      workerId: WORKER_ID,
-      buildId: BUILD_ID,
-    })
-
-    return filename
-  } catch (err) {
-    // enable multiple processes to continue when done
-    const cacheEntry = await cache.get(cacheIdForWorkers(url))
-
-    if (!cacheEntry || cacheEntry.workerId === WORKER_ID) {
-      await cache.set(cacheIdForWorkers(url), {
-        status: `failed`,
-        result: err.toString ? err.toString() : err.message ? err.message : err,
-        workerId: WORKER_ID,
-        buildId: BUILD_ID,
+      await setInFlightObject(url, BUILD_ID, {
+        cacheKey,
+        extension: ext,
+        headers: response.headers.etag ? { etag: response.headers.etag } : {},
+        directory: slash(fileDirectory),
+        path: slash(filename.replace(fileDirectory, ``)),
       })
+    } else if (response.statusCode === 304) {
+      await fs.remove(tmpFilename)
     }
 
-    throw err
+    return filename
+  } finally {
+    await fetchFileMutex.release()
   }
 }
 
-/**
- * requestRemoteNode
- * --
- * Download the requested file
- *
- * @param  {String}   url
- * @param  {Headers}  headers
- * @param  {String}   tmpFilename
- * @param  {Object}   httpOptions
- * @param  {number}   attempt
- * @return {Promise}  Resolves with the [http Result Object]{@link https://nodejs.org/api/http.html#http_class_http_serverresponse}
- */
-function requestRemoteNode(
-  url: string | URL,
-  headers: Headers,
-  tmpFilename: string,
-  httpOptions?: Options,
-  attempt: number = 1
-): Promise {
-  return new Promise((resolve, reject) => {
-    let timeout: NodeJS.Timeout
-    const fsWriteStream = fs.createWriteStream(tmpFilename)
-    fsWriteStream.on(`error`, (error: unknown) => {
-      if (timeout) {
-        clearTimeout(timeout)
-      }
-
-      reject(error)
-    })
-
-    // Called if we stall for 30s without receiving any data
-    const handleTimeout = async (): Promise => {
-      fsWriteStream.close()
-      fs.removeSync(tmpFilename)
-
-      if (attempt < STALL_RETRY_LIMIT) {
-        // Retry by calling ourself recursively
-        resolve(
-          requestRemoteNode(url, headers, tmpFilename, httpOptions, attempt + 1)
-        )
-      } else {
-        // TODO move to new Error type
-        // eslint-disable-next-line prefer-promise-reject-errors
-        reject(`Failed to download ${url} after ${STALL_RETRY_LIMIT} attempts`)
-      }
-    }
-
-    const resetTimeout = (): void => {
-      if (timeout) {
-        clearTimeout(timeout)
-      }
-      timeout = setTimeout(handleTimeout, STALL_TIMEOUT)
-    }
-    const responseStream = got.stream(url, {
-      headers,
-      timeout: {
-        send: CONNECTION_TIMEOUT, // https://github.com/sindresorhus/got#timeout
-      },
-      ...httpOptions,
-      isStream: true,
-    })
-
-    let haveAllBytesBeenWritten = false
-    // Fixes a bug in latest got where progress.total gets reset when stream ends, even if it wasn't complete.
-    let totalSize: number | null = null
-    responseStream.on(`downloadProgress`, progress => {
-      // reset the timeout on each progress event to make sure large files don't timeout
-      resetTimeout()
-
-      if (
-        progress.total != null &&
-        (!totalSize || totalSize < progress.total)
-      ) {
-        totalSize = progress.total
-      }
-
-      if (progress.transferred === totalSize || totalSize === null) {
-        haveAllBytesBeenWritten = true
-      }
-    })
-
-    responseStream.pipe(fsWriteStream)
-
-    // If there's a 400/500 response or other error.
-    // it will trigger a finish event on fsWriteStream
-    responseStream.on(`error`, error => {
-      if (timeout) {
-        clearTimeout(timeout)
-      }
-
-      fsWriteStream.close()
-      fs.removeSync(tmpFilename)
-
-      if (!(error instanceof RequestError)) {
-        return reject(error)
-      }
-
-      // This is a replacement for the stream retry logic of got
-      // till we can update all got instances to v12
-      // https://github.com/sindresorhus/got/blob/main/documentation/7-retry.md
-      // https://github.com/sindresorhus/got/blob/main/documentation/3-streams.md#retry
-      const statusCode = error.response?.statusCode
-      const errorCode = error.code || error.message // got gives error.code, but msw/node returns the error codes in the message only
-
-      if (
-        // HTTP STATUS CODE ERRORS
-        (statusCode && STATUS_CODES_TO_RETRY.includes(statusCode)) ||
-        // GENERAL NETWORK ERRORS
-        (errorCode && ERROR_CODES_TO_RETRY.includes(errorCode))
-      ) {
-        if (attempt < INCOMPLETE_RETRY_LIMIT) {
-          setTimeout(() => {
-            resolve(
-              requestRemoteNode(
-                url,
-                headers,
-                tmpFilename,
-                httpOptions,
-                attempt + 1
-              )
-            )
-          }, BACKOFF_TIME * attempt)
-
-          return undefined
-        }
-        // Throw user friendly error
-        error.message = [
-          `Unable to fetch:`,
-          url,
-          `---`,
-          `Reason: ${error.message}`,
-          `---`,
-        ].join(`\n`)
-
-        // Gather details about what went wrong from the error object and the request
-        const details = Object.entries({
-          attempt,
-          method: error.options?.method,
-          errorCode: error.code,
-          responseStatusCode: error.response?.statusCode,
-          responseStatusMessage: error.response?.statusMessage,
-          requestHeaders: error.options?.headers,
-          responseHeaders: error.response?.headers,
-        })
-          // Remove undefined values from the details to keep it clean
-          .reduce((a, [k, v]) => (v === undefined ? a : ((a[k] = v), a)), {})
-
-        if (Object.keys(details).length) {
-          error.message = [
-            error.message,
-            `Fetch details:`,
-            JSON.stringify(details, null, 2),
-            `---`,
-          ].join(`\n`)
-        }
-      }
-
-      return reject(error)
-    })
+const inFlightMap = new Map()
+function getInFlightObject(key: string, buildId?: string): string | undefined {
+  if (!buildId) {
+    return inFlightMap.get(key)
+  }
 
-    responseStream.on(`response`, response => {
-      resetTimeout()
+  const remoteFile = getStorage(getDatabaseDir()).remoteFileInfo.get(key)
+  // if buildId match we know it's the same build and it already processed this url this build
+  if (remoteFile && remoteFile.buildId === buildId) {
+    return path.join(remoteFile.directory, remoteFile.path)
+  }
 
-      fsWriteStream.once(`finish`, () => {
-        if (timeout) {
-          clearTimeout(timeout)
-        }
+  return undefined
+}
+async function setInFlightObject(
+  key: string,
+  buildId: string,
+  value: { buildId?: string } & Omit<
+    NonNullable<
+      ReturnType["remoteFileInfo"]["get"]>
+    >,
+    "buildId"
+  >
+): Promise {
+  if (!buildId) {
+    inFlightMap.set(key, path.join(value.directory, value.path))
+  }
 
-        // We have an incomplete download
-        if (!haveAllBytesBeenWritten) {
-          fs.removeSync(tmpFilename)
-
-          if (attempt < INCOMPLETE_RETRY_LIMIT) {
-            // let's give node time to remove the file
-            process.nextTick(() =>
-              resolve(
-                requestRemoteNode(
-                  url,
-                  headers,
-                  tmpFilename,
-                  httpOptions,
-                  attempt + 1
-                )
-              )
-            )
-
-            return undefined
-          } else {
-            // TODO move to new Error type
-            // eslint-disable-next-line prefer-promise-reject-errors
-            return reject(
-              `Failed to download ${url} after ${INCOMPLETE_RETRY_LIMIT} attempts`
-            )
-          }
-        }
-        return resolve(response)
-      })
-    })
+  await getStorage(getDatabaseDir()).remoteFileInfo.put(key, {
+    ...value,
+    buildId,
   })
 }
diff --git a/packages/gatsby-core-utils/src/remote-file-utils/fetch-file.ts b/packages/gatsby-core-utils/src/remote-file-utils/fetch-file.ts
new file mode 100644
index 0000000000000..1d715e49fd5cf
--- /dev/null
+++ b/packages/gatsby-core-utils/src/remote-file-utils/fetch-file.ts
@@ -0,0 +1,271 @@
+import fs from "fs-extra"
+import type { IncomingMessage } from "http"
+import type { Headers, Options } from "got"
+import type { GatsbyCache } from "gatsby"
+
+// keeping the I for backward compatibility
+export type IFetchRemoteFileOptions = {
+  url: string
+  auth?: {
+    htaccess_pass?: string
+    htaccess_user?: string
+  }
+  httpHeaders?: Headers
+  ext?: string
+  name?: string
+  cacheKey?: string
+} & (
+  | {
+      directory: string
+      cache?: never
+    }
+  | {
+      directory?: never
+      cache: GatsbyCache
+    }
+)
+
+const STALL_RETRY_LIMIT = process.env.GATSBY_STALL_RETRY_LIMIT
+  ? parseInt(process.env.GATSBY_STALL_RETRY_LIMIT, 10)
+  : 3
+const STALL_TIMEOUT = process.env.GATSBY_STALL_TIMEOUT
+  ? parseInt(process.env.GATSBY_STALL_TIMEOUT, 10)
+  : 30000
+
+const CONNECTION_TIMEOUT = process.env.GATSBY_CONNECTION_TIMEOUT
+  ? parseInt(process.env.GATSBY_CONNECTION_TIMEOUT, 10)
+  : 30000
+
+const INCOMPLETE_RETRY_LIMIT = process.env.GATSBY_INCOMPLETE_RETRY_LIMIT
+  ? parseInt(process.env.GATSBY_INCOMPLETE_RETRY_LIMIT, 10)
+  : 3
+
+// jest doesn't allow us to run all timings infinitely, so we set it 0  in tests
+const BACKOFF_TIME = process.env.NODE_ENV === `test` ? 0 : 1000
+
+function range(start: number, end: number): Array {
+  return Array(end - start)
+    .fill(null)
+    .map((_, i) => start + i)
+}
+
+// Based on the defaults of https://github.com/JustinBeckwith/retry-axios
+const STATUS_CODES_TO_RETRY = [...range(100, 200), 429, ...range(500, 600)]
+const ERROR_CODES_TO_RETRY = [
+  `ETIMEDOUT`,
+  `ECONNRESET`,
+  `EADDRINUSE`,
+  `ECONNREFUSED`,
+  `EPIPE`,
+  `ENOTFOUND`,
+  `ENETUNREACH`,
+  `EAI_AGAIN`,
+  `ERR_NON_2XX_3XX_RESPONSE`,
+  `ERR_GOT_REQUEST_ERROR`,
+]
+
+/**
+ * requestRemoteNode
+ * --
+ * Download the requested file
+ *
+ * @param  {String}   url
+ * @param  {Headers}  headers
+ * @param  {String}   tmpFilename
+ * @param  {Object}   httpOptions
+ * @param  {number}   attempt
+ * @return {Promise}  Resolves with the [http Result Object]{@link https://nodejs.org/api/http.html#http_class_http_serverresponse}
+ */
+export async function requestRemoteNode(
+  url: string | URL,
+  headers: Headers,
+  tmpFilename: string,
+  httpOptions?: Options,
+  attempt: number = 1
+): Promise {
+  // TODO(v5): use dynamic import syntax - it's currently blocked because older v4 versions have V8-compile-cache
+  // const { default: got, RequestError } = await import(`got`)
+  const { default: got, RequestError } = require(`got`)
+
+  return new Promise((resolve, reject) => {
+    let timeout: NodeJS.Timeout
+    const fsWriteStream = fs.createWriteStream(tmpFilename)
+    fsWriteStream.on(`error`, (error: unknown) => {
+      if (timeout) {
+        clearTimeout(timeout)
+      }
+
+      reject(error)
+    })
+
+    // Called if we stall for 30s without receiving any data
+    const handleTimeout = async (): Promise => {
+      fsWriteStream.close()
+      await fs.remove(tmpFilename)
+
+      if (attempt < STALL_RETRY_LIMIT) {
+        // Retry by calling ourself recursively
+        resolve(
+          requestRemoteNode(url, headers, tmpFilename, httpOptions, attempt + 1)
+        )
+      } else {
+        // TODO move to new Error type
+        // eslint-disable-next-line prefer-promise-reject-errors
+        reject(`Failed to download ${url} after ${STALL_RETRY_LIMIT} attempts`)
+      }
+    }
+
+    const resetTimeout = (): void => {
+      if (timeout) {
+        clearTimeout(timeout)
+      }
+      timeout = setTimeout(handleTimeout, STALL_TIMEOUT)
+    }
+    const responseStream = got.stream(url, {
+      headers,
+      timeout: {
+        send: CONNECTION_TIMEOUT, // https://github.com/sindresorhus/got#timeout
+      },
+      ...httpOptions,
+      isStream: true,
+    })
+
+    let haveAllBytesBeenWritten = false
+    // Fixes a bug in latest got where progress.total gets reset when stream ends, even if it wasn't complete.
+    let totalSize: number | null = null
+    responseStream.on(`downloadProgress`, progress => {
+      // reset the timeout on each progress event to make sure large files don't timeout
+      resetTimeout()
+
+      if (
+        progress.total != null &&
+        (!totalSize || totalSize < progress.total)
+      ) {
+        totalSize = progress.total
+      }
+
+      if (progress.transferred === totalSize || totalSize === null) {
+        haveAllBytesBeenWritten = true
+      }
+    })
+
+    responseStream.pipe(fsWriteStream)
+
+    // If there's a 400/500 response or other error.
+    // it will trigger a finish event on fsWriteStream
+    responseStream.on(`error`, async error => {
+      if (timeout) {
+        clearTimeout(timeout)
+      }
+
+      fsWriteStream.close()
+      await fs.remove(tmpFilename)
+
+      if (!(error instanceof RequestError)) {
+        return reject(error)
+      }
+
+      // This is a replacement for the stream retry logic of got
+      // till we can update all got instances to v12
+      // https://github.com/sindresorhus/got/blob/main/documentation/7-retry.md
+      // https://github.com/sindresorhus/got/blob/main/documentation/3-streams.md#retry
+      const statusCode = error.response?.statusCode
+      const errorCode = error.code || error.message // got gives error.code, but msw/node returns the error codes in the message only
+
+      if (
+        // HTTP STATUS CODE ERRORS
+        (statusCode && STATUS_CODES_TO_RETRY.includes(statusCode)) ||
+        // GENERAL NETWORK ERRORS
+        (errorCode && ERROR_CODES_TO_RETRY.includes(errorCode))
+      ) {
+        if (attempt < INCOMPLETE_RETRY_LIMIT) {
+          setTimeout(() => {
+            resolve(
+              requestRemoteNode(
+                url,
+                headers,
+                tmpFilename,
+                httpOptions,
+                attempt + 1
+              )
+            )
+          }, BACKOFF_TIME * attempt)
+
+          return undefined
+        }
+        // Throw user friendly error
+        error.message = [
+          `Unable to fetch:`,
+          url,
+          `---`,
+          `Reason: ${error.message}`,
+          `---`,
+        ].join(`\n`)
+
+        // Gather details about what went wrong from the error object and the request
+        const details = Object.entries({
+          attempt,
+          method: error.options?.method,
+          errorCode: error.code,
+          responseStatusCode: error.response?.statusCode,
+          responseStatusMessage: error.response?.statusMessage,
+          requestHeaders: error.options?.headers,
+          responseHeaders: error.response?.headers,
+        })
+          // Remove undefined values from the details to keep it clean
+          .reduce((a, [k, v]) => (v === undefined ? a : ((a[k] = v), a)), {})
+
+        if (Object.keys(details).length) {
+          error.message = [
+            error.message,
+            `Fetch details:`,
+            JSON.stringify(details, null, 2),
+            `---`,
+          ].join(`\n`)
+        }
+      }
+
+      return reject(error)
+    })
+
+    responseStream.on(`response`, response => {
+      resetTimeout()
+
+      fsWriteStream.once(`finish`, async () => {
+        if (timeout) {
+          clearTimeout(timeout)
+        }
+
+        // We have an incomplete download
+        if (!haveAllBytesBeenWritten) {
+          await fs.remove(tmpFilename)
+
+          if (attempt < INCOMPLETE_RETRY_LIMIT) {
+            // let's give node time to remove the file
+            setImmediate(() =>
+              resolve(
+                requestRemoteNode(
+                  url,
+                  headers,
+                  tmpFilename,
+                  httpOptions,
+                  attempt + 1
+                )
+              )
+            )
+
+            return undefined
+          } else {
+            // TODO move to new Error type
+            // eslint-disable-next-line prefer-promise-reject-errors
+            return reject(
+              `Failed to download ${url} after ${INCOMPLETE_RETRY_LIMIT} attempts`
+            )
+          }
+        }
+
+        return resolve(response)
+      })
+    })
+  })
+}
diff --git a/packages/gatsby-core-utils/src/utils/get-storage.ts b/packages/gatsby-core-utils/src/utils/get-storage.ts
index 63441fbbbed7e..71ed0c5b8eae1 100644
--- a/packages/gatsby-core-utils/src/utils/get-storage.ts
+++ b/packages/gatsby-core-utils/src/utils/get-storage.ts
@@ -1,6 +1,7 @@
 import path from "path"
 import { getLmdb } from "./get-lmdb"
 import type { RootDatabase, Database } from "lmdb"
+import type { Headers } from "got"
 
 export enum LockStatus {
   Locked = 0,
@@ -8,6 +9,17 @@ export enum LockStatus {
 }
 
 interface ICoreUtilsDatabase {
+  remoteFileInfo: Database<
+    {
+      extension: string
+      headers: Headers
+      path: string
+      directory: string
+      cacheKey?: string
+      buildId: string
+    },
+    string
+  >
   mutex: Database
 }
 
@@ -50,6 +62,9 @@ export function getStorage(fullDbPath: string): ICoreUtilsDatabase {
     })
 
     databases = {
+      remoteFileInfo: rootDb.openDB({
+        name: `remote-file`,
+      }),
       mutex: rootDb.openDB({
         name: `mutex`,
       }),
diff --git a/packages/gatsby-source-contentful/src/__tests__/gatsby-plugin-image.js b/packages/gatsby-source-contentful/src/__tests__/gatsby-plugin-image.js
index 571691402f9bd..85eeb6e801ea8 100644
--- a/packages/gatsby-source-contentful/src/__tests__/gatsby-plugin-image.js
+++ b/packages/gatsby-source-contentful/src/__tests__/gatsby-plugin-image.js
@@ -4,7 +4,7 @@ import _ from "lodash"
 import nock from "nock"
 import path from "path"
 import { generateImageSource, getBase64Image } from "../gatsby-plugin-image"
-import * as coreUtils from "gatsby-core-utils"
+import * as coreUtils from "gatsby-core-utils/fetch-remote-file"
 
 nock.disableNetConnect()
 
@@ -99,6 +99,9 @@ describe(`contentful extend node type`, () => {
         title: `Contentful Logo PNG`,
         description: ``,
         node_locale: `en-US`,
+        internal: {
+          contentDigest: `123`,
+        },
       },
       options: {
         width: 200,
diff --git a/packages/gatsby-source-contentful/src/gatsby-plugin-image.js b/packages/gatsby-source-contentful/src/gatsby-plugin-image.js
index 959b3a1fd9a77..10684cb3161bf 100644
--- a/packages/gatsby-source-contentful/src/gatsby-plugin-image.js
+++ b/packages/gatsby-source-contentful/src/gatsby-plugin-image.js
@@ -1,6 +1,6 @@
 // @ts-check
 import fs from "fs-extra"
-import { fetchRemoteFile } from "gatsby-core-utils"
+import { fetchRemoteFile } from "gatsby-core-utils/fetch-remote-file"
 import path from "path"
 import {
   createUrl,
@@ -61,8 +61,9 @@ export const getBase64Image = (imageProps, cache) => {
 
     const absolutePath = await fetchRemoteFile({
       url: requestUrl,
-      cache,
+      directory: cache.directory,
       ext: extension,
+      cacheKey: imageProps.image.internal.contentDigest,
     })
 
     const base64 = (await fs.readFile(absolutePath)).toString(`base64`)
@@ -97,8 +98,9 @@ const getTracedSVG = async ({ image, options, cache }) => {
   const absolutePath = await fetchRemoteFile({
     url,
     name,
-    cache,
+    directory: cache.directory,
     ext: extension,
+    cacheKey: image.internal.contentDigest,
   })
 
   return traceSVG({
@@ -147,8 +149,9 @@ const getDominantColor = async ({ image, options, cache }) => {
     const absolutePath = await fetchRemoteFile({
       url,
       name,
-      cache,
+      directory: cache.directory,
       ext: extension,
+      cacheKey: image.internal.contentDigest,
     })
 
     if (!(`getDominantColor` in pluginSharp)) {
diff --git a/packages/gatsby-source-filesystem/src/__tests__/create-remote-file-node.js b/packages/gatsby-source-filesystem/src/__tests__/create-remote-file-node.js
index b64bf6d21c6fe..e90addb78ecc8 100644
--- a/packages/gatsby-source-filesystem/src/__tests__/create-remote-file-node.js
+++ b/packages/gatsby-source-filesystem/src/__tests__/create-remote-file-node.js
@@ -22,7 +22,7 @@ jest.mock(`got`, () => {
   }
 })
 
-jest.mock(`gatsby-core-utils`, () => {
+jest.mock(`gatsby-core-utils/fetch-remote-file`, () => {
   return {
     fetchRemoteFile: jest.fn(),
   }
@@ -37,7 +37,7 @@ const reporter = {}
 
 const createRemoteFileNode = require(`../create-remote-file-node`)
 const { createFileNode } = require(`../create-file-node`)
-const { fetchRemoteFile } = require(`gatsby-core-utils`)
+const { fetchRemoteFile } = require(`gatsby-core-utils/fetch-remote-file`)
 
 const createMockCache = () => {
   return {
diff --git a/packages/gatsby-source-filesystem/src/create-remote-file-node.js b/packages/gatsby-source-filesystem/src/create-remote-file-node.js
index bfb4574d79eed..66655132bec74 100644
--- a/packages/gatsby-source-filesystem/src/create-remote-file-node.js
+++ b/packages/gatsby-source-filesystem/src/create-remote-file-node.js
@@ -1,4 +1,4 @@
-const { fetchRemoteFile } = require(`gatsby-core-utils`)
+const { fetchRemoteFile } = require(`gatsby-core-utils/fetch-remote-file`)
 const { isWebUri } = require(`valid-url`)
 const { createFileNode } = require(`./create-file-node`)
 
diff --git a/packages/gatsby-source-shopify/src/resolve-gatsby-image-data.ts b/packages/gatsby-source-shopify/src/resolve-gatsby-image-data.ts
index 3ff2307e253dd..401232f4fe0ac 100644
--- a/packages/gatsby-source-shopify/src/resolve-gatsby-image-data.ts
+++ b/packages/gatsby-source-shopify/src/resolve-gatsby-image-data.ts
@@ -1,4 +1,4 @@
-import { fetchRemoteFile } from "gatsby-core-utils"
+import { fetchRemoteFile } from "gatsby-core-utils/fetch-remote-file"
 import {
   generateImageData,
   getLowResolutionImageURL,
@@ -10,6 +10,7 @@ import {
 import { IGatsbyImageFieldArgs } from "gatsby-plugin-image/graphql-utils"
 import { readFileSync } from "fs"
 import { IShopifyImage, urlBuilder } from "./get-shopify-image"
+import type { Node } from "gatsby"
 
 type IImageWithPlaceholder = IImage & {
   placeholder: string
@@ -17,15 +18,18 @@ type IImageWithPlaceholder = IImage & {
 
 async function getImageBase64({
   imageAddress,
-  cache,
+  directory,
+  contentDigest,
 }: {
   imageAddress: string
-  cache: any
+  directory: string
+  contentDigest: string
 }): Promise {
   // Downloads file to the site cache and returns the file path for the given image (this is a path on the host system, not a URL)
   const filePath = await fetchRemoteFile({
     url: imageAddress,
-    cache,
+    directory,
+    cacheKey: contentDigest,
   })
   const buffer = readFileSync(filePath)
   return buffer.toString(`base64`)
@@ -99,7 +103,8 @@ export function makeResolveGatsbyImageData(cache: any) {
       })
       const imageBase64 = await getImageBase64({
         imageAddress: lowResImageURL,
-        cache,
+        directory: cache.directory as string,
+        contentDigest: image.internal.contentDigest,
       })
 
       // This would be your own function to download and generate a low-resolution placeholder
diff --git a/packages/gatsby-transformer-sqip/src/extend-node-type.js b/packages/gatsby-transformer-sqip/src/extend-node-type.js
index d07b79a382f2d..b64bef2e3e501 100644
--- a/packages/gatsby-transformer-sqip/src/extend-node-type.js
+++ b/packages/gatsby-transformer-sqip/src/extend-node-type.js
@@ -12,7 +12,7 @@ const {
   GraphQLBoolean,
 } = require(`gatsby/graphql`)
 const { queueImageResizing } = require(`gatsby-plugin-sharp`)
-const { fetchRemoteFile } = require(`gatsby-core-utils`)
+const { fetchRemoteFile } = require(`gatsby-core-utils/fetch-remote-file`)
 const {
   DuotoneGradientType,
   ImageCropFocusType,