Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@tus/server: add GCS locker #616

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

Conversation

netdown
Copy link
Contributor

@netdown netdown commented May 7, 2024

This PR is not complete yet, it misses unit tests (the code is tested), readme updates and changeset. Despite all that, I would like to ask you to review my approach first so I won't write needless tests. I have documented the process in detail, but feel free to ask questions.

Copy link

changeset-bot bot commented May 7, 2024

⚠️ No Changeset found

Latest commit: 0ce3a90

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Member

@Murderlon Murderlon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting in the time to contribute!

I'm not an expert on (distributed) locking, but conceptually I think GCS storage as a locker only makes sense if you're already deploying your server within GCS infrastructure (so it's faster) and you have a bucket in the region where the uploads happen. My assumption is if those conditions aren't met, things will be slow? AFAIK GCS has strong consistency within the same region but eventual consistency for multi-region.

Maybe you can elaborate on your use case?

@Murderlon Murderlon requested review from Acconut and fenos May 9, 2024 08:46
@netdown
Copy link
Contributor Author

netdown commented May 9, 2024

Indeed I haven't even thought about using this locker with a store other than GCS. In my case, the storage bucket and the locker bucket is the same, and I think the only case they should be separated is when the storage bucket is not in standard storage class.

Anyways, I'm not sure i.e. Firestore would greatly overperform GCS in case of different storage. Regarding region latency, the user should be aware of that and choose a suitable region. Of course a redis based implementation would be much better, but this may be a considerable alternative until thats not implemented.

Shall I move this locker to the gcs-store package to suggest the primary application?

Copy link
Member

@Acconut Acconut left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting because such approaches would allow tus server to implement lockers directly on top of cloud storages instead of using external tools like Redis. However, I would like to see some evidence that this approach actually provides exclusive access to uploads. Is there some blog post that looked into the mechanisms at play here? Are all involved operations strongly consistent?

@netdown
Copy link
Contributor Author

netdown commented May 14, 2024

GCS is strongly consistent, but indeed concurrency was not ensured in my previous approach. I have reworked the code based on this article.

Note that I had to upgrade @google-cloud/storage because previous version was missing a type export. Also, this feature should be moved to a separate package or into gcs-store, as I'm importing from @google-cloud/storage.

@Murderlon
Copy link
Member

Murderlon commented May 21, 2024

Really nice article, thanks for sharing. It does also say this:

A distributed lock like this is best suited for batch workloads. Such workloads typically take seconds to tens or even hundreds of seconds.

But here we are using it for individual uploads, not batches. Or even smaller with a resumed uploads (or where a client sets chunkSize on the client). It's probably fine, but hard to tell when this becomes a bottle neck. Have you ran some tests with this locally to your satisfaction?

@netdown
Copy link
Contributor Author

netdown commented May 28, 2024

For the last 10 days it has been running in production without problems. We have about 5000 uploads per day. In e2e tests it was indeed slightly slower for 140 files compared to xhr, but I could easily compensate this by increasing the number of parallel uploads. If I measure individual uploads, the time elapsed between lock and unlock is mostly 20-400 ms in case of memory locker, and 300-400 for gcs locker.

@Murderlon
Copy link
Member

That's great to hear! I'm in favor adding this into the package then.

Copy link
Member

@Murderlon Murderlon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks very good! Also happy with the extensive code comments.

Some things needed:

  • The build is currently failing
  • We need to update the peerDependencies to not allow any version of @google-cloud/storage.
  • Docs. We should also talk about when to (not) use this lock and the things to watch out for, such as what values to set for the ttl and watch interval.
  • A test similar to this:
    describe('File Store with Locking', () => {
    before(() => {
    server = new Server({
    path: STORE_PATH,
    datastore: new FileStore({directory: `./${STORE_PATH}`}),
    locker: new MemoryLocker(),
    })
    listener = server.listen()
    agent = request.agent(listener)
    })
    after((done) => {
    // Remove the files directory
    rimraf(FILES_DIRECTORY, (err) => {
    if (err) {
    return done(err)
    }
    // Clear the config
    // @ts-expect-error we can consider a generic to pass to
    // datastore to narrow down the store type
    const uploads = (server.datastore.configstore as Configstore).list?.() ?? []
    for (const upload in uploads) {
    // @ts-expect-error we can consider a generic to pass to
    // datastore to narrow down the store type
    await(server.datastore.configstore as Configstore).delete(upload)
    }
    listener.close()
    return done()
    })
    })
    it('will allow another request to acquire the lock by cancelling the previous request', async () => {
    const res = await agent
    .post(STORE_PATH)
    .set('Tus-Resumable', TUS_RESUMABLE)
    .set('Upload-Length', TEST_FILE_SIZE)
    .set('Upload-Metadata', TEST_METADATA)
    .set('Tus-Resumable', TUS_RESUMABLE)
    .expect(201)
    assert.equal('location' in res.headers, true)
    assert.equal(res.headers['tus-resumable'], TUS_RESUMABLE)
    // Save the id for subsequent tests
    const file_id = res.headers.location.split('/').pop()
    const file_size = parseInt(TEST_FILE_SIZE, 10)
    // Slow down writing
    const originalWrite = server.datastore.write.bind(server.datastore)
    sinon.stub(server.datastore, 'write').callsFake((stream, ...args) => {
    const throttleStream = new Throttle({bps: file_size / 4})
    return originalWrite(stream.pipe(throttleStream), ...args)
    })
    const data = Buffer.alloc(parseInt(TEST_FILE_SIZE, 10), 'a')
    const httpAgent = new Agent({
    maxSockets: 2,
    maxFreeSockets: 10,
    timeout: 10000,
    keepAlive: true,
    })
    const createPatchReq = (offset: number) => {
    return agent
    .patch(`${STORE_PATH}/${file_id}`)
    .agent(httpAgent)
    .set('Tus-Resumable', TUS_RESUMABLE)
    .set('Upload-Offset', offset.toString())
    .set('Content-Type', 'application/offset+octet-stream')
    .send(data.subarray(offset))
    }
    const req1 = createPatchReq(0).then((e) => e)
    await wait(100)
    const req2 = agent
    .head(`${STORE_PATH}/${file_id}`)
    .agent(httpAgent)
    .set('Tus-Resumable', TUS_RESUMABLE)
    .expect(200)
    .then((e) => e)
    const [res1, res2] = await Promise.allSettled([req1, req2])
    assert.equal(res1.status, 'fulfilled')
    assert.equal(res2.status, 'fulfilled')
    assert.equal(res1.value.statusCode, 400)
    assert.equal(res1.value.headers['upload-offset'] !== TEST_FILE_SIZE, true)
    assert.equal(res2.value.statusCode, 200)
    // Verify that we are able to resume even if the first request
    // was cancelled by the second request trying to acquire the lock
    const offset = parseInt(res2.value.headers['upload-offset'], 10)
    const finishedUpload = await createPatchReq(offset)
    assert.equal(finishedUpload.statusCode, 204)
    assert.equal(finishedUpload.headers['upload-offset'], TEST_FILE_SIZE)
    }).timeout(20000)
    })
    })

If you need help with any of these let me know.

packages/server/src/lockers/GCSLockFile.ts Outdated Show resolved Hide resolved
packages/server/src/lockers/GCSLockFile.ts Outdated Show resolved Hide resolved
@Murderlon Murderlon requested a review from Acconut May 28, 2024 07:33
@Acconut
Copy link
Member

Acconut commented Jun 1, 2024

GCS is strongly consistent, but indeed concurrency was not ensured in my previous approach. I have reworked the code based on this article.

Thank you for the article, I will have a look at it! I am wondering if S3 has similar capabilities and a locker can be implemented nowadays ontop of it as well.

@Murderlon
Copy link
Member

Murderlon commented Jun 12, 2024

@netdown still interested in getting this over the finish line?

@netdown
Copy link
Contributor Author

netdown commented Jun 12, 2024

Yes, but I've been busy the last few weeks and I expect the same at least until July. Feel free to complete the PR if you have the time.

Copy link
Member

@Acconut Acconut left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for my delayed review! I just read the accompanying blog post and wanted to leave some comments about it first. Some additional background information can be found at gcslock, which was a previous GCS-based lock. The only valuable comment online I was able to find about the proposed algorithm is on Lobsters by Aphyr, who is quite experienced in testing distributed systems and databases. However, his comment was more about a general issue with distributed locks and not about this GCS-based approach in particular. The same critique can also be applied to Redis-based locks and there is not much we can do on our end as far as I know.

The proposed algorithm on its own seems sound to me (although I am no expert). It relies on the storage offering strong consistency which is the case with GCS. While there are many S3-compatible storage I am not aware of any GCS-compatible storages. So we don't have to worry much about storages with a GCS-like interface that are not strongly consistent.

In addition, the propsed algorithm also provides "instant recovery from stale locks" if the lock was left stale by the same actor that now tries to acquire it. This functionality attaches an identity to each lock, which is dangerous for tus-node-server as we do not want two requests that are processed by the same tus-node-server instance to interfere with the same lock. This PR does not implement this feature but this difference to the blog post should still be noted in the code somewhere.
The author also acknowledges that this algorithm does not offer low-latency:

A locking operation's average speed is in the order of hundreds of milliseconds.

This is probably fine for large file uploads, which are I/O-bound, but still work documenting somewhere.

Finally, while reading the article, I hoped that a similar approach might be possible for S3, but this does not seem possible at the first glance as it does not offer conditional writes like GCS does.

//On the first attempt, retry after current I/O operations are done, else use an exponential backoff
const waitFn = (then: () => void) =>
attempt > 0
? setTimeout(then, (attempt * this.locker.watchInterval) / 3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if it also added random jitter.

packages/server/src/lockers/GCSLocker.ts Outdated Show resolved Hide resolved
preconditionOpts: {ifGenerationMatch: 0},
})
} catch (err) {
//Release file already created, no need to report
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer check explicitly which error conditions should be ignored and then re-throwing all other errors. This applies to multiple try-catch block in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure which error conditions this would be then

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only safely ignore the error if it's a 412 Precondition Failed because then the release file exists already. All other errors (e.g. failure to reach GCS API entirely) should be thrown.

packages/server/src/lockers/GCSLockFile.ts Outdated Show resolved Hide resolved
*/
protected startWatcher(cancelHandler: RequestRelease) {
this.watcher = setInterval(() => {
const handleError = () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling here needs some extension. The error should probably be logged somewhere. In addition, if we cannot extend the lock's lifetime, we should stop all processing of the resource once the locker expires. This is probably hard to implement with the current lock interface but this is also something I would like to address with tusd in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this can be done in a future PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have this already though:

if (isAborted) {
// This condition handles situations where the request has been flagged as aborted.
// In such cases, the server informs the client that the connection will be closed.
// This is communicated by setting the 'Connection' header to 'close' in the response.
// This step is essential to prevent the server from continuing to process a request
// that is no longer needed, thereby saving resources.
// @ts-expect-error not explicitly typed but possible
headers['Connection'] = 'close'
// An event listener is added to the response ('res') for the 'finish' event.
// The 'finish' event is triggered when the response has been sent to the client.
// Once the response is complete, the request ('req') object is destroyed.
// Destroying the request object is a crucial step to release any resources
// tied to this request, as it has already been aborted.
res.on('finish', () => {
req.destroy()
})
}
res.writeHead(status, headers)
res.write(body)
return res.end()

/**
* Release the lock - clear watcher and delete the file.
*/
public async release() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need some internal mutex to ensure that deleting a lock and extending it cannot run simultaneously and thus interfere with each other? Async requests can run concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK GCS should already take care of concurrent access for us.

* main:
  Add issue templates
  [ci] release (tus#625)
  Add .npmrc
  @tus/server: add `lastPath` arg to `getFileIdFromRequest` (tus#626)
  Add basic storage info to Upload model (tus#624)
  Release
  v0.0.0
  @tus/server: allow onUploadFinish hook to override response data (tus#615)
* main:
  Bump @shopify/semaphore from 3.0.2 to 3.1.0 (tus#637)
  Remove turbo for linting (tus#640)
  Refactor TypeScript setup (tus#641)
Copy link

socket-security bot commented Aug 5, 2024

New and removed dependencies detected. Learn more about Socket for GitHub ↗︎

Package New capabilities Transitives Size Publisher
npm/@google-cloud/storage@7.12.0 environment, filesystem Transitive: network, shell +67 6.11 MB google-wombot

🚮 Removed packages: npm/@google-cloud/storage@6.12.0

View full report↗︎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants