Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fails to stream to S3 using AWS SDK v3 #662

Open
Peterbyte opened this issue May 5, 2023 · 18 comments
Open

Fails to stream to S3 using AWS SDK v3 #662

Peterbyte opened this issue May 5, 2023 · 18 comments

Comments

@Peterbyte
Copy link

I need to stream a number of files from S3, zip them and stream them back to S3 as a zip file.

I had a previous working version using aws-sdk V2, but I'm attempting to now replace that implementation with V3.

Here's a simplified example of some code to reproduce the issue I'm seeing. This correctly streams the input file down from S3, but then never seems to pipe any output back into the upload stream.

The console output will usually output the zip entry (inconsistently), but I never see any console logs related to upload, and no object gets created in S3. The command completes with exit code 0, but never outputs 'done' from the log statement at the end of the file.

import { PassThrough } from 'stream';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';

const { AWS_REGION, S3_BUCKET } = process.env;
const s3FileKey = 'example.jpg';
const outputZipKey = `test.zip`;

const s3Client = new S3Client({ region: AWS_REGION });

(async () => {
  const s3UploadStream = new PassThrough();
  s3UploadStream.on('error', (err) => {
    console.error('error', err);
    throw err;
  });

  const s3Upload = new Upload({
    client: s3Client,
    params: {
      Body: s3UploadStream,
      Bucket: S3_BUCKET,
      ContentType: 'application/zip',
      Key: outputZipKey
    }
  });

  s3Upload.on('httpUploadProgress', (progress) => {
    console.log(JSON.stringify(progress));
  });

  const archive = archiver('zip', { zlib: { level: 0 } });
  archive.pipe(s3UploadStream);

  archive.on('entry', (f) => {
    console.log(f);
  });

  const downloadStream = (await s3Client.send(new GetObjectCommand({ Bucket: MEDIA_S3_BUCKET, Key: s3FileKey }))).Body;

  archive.append(downloadStream, { name: 'file.jpg' });

  await archive.finalize();
  await s3Upload.done();
  console.log('done');
})();

I suspect there is some incompatibility that's been introduced with V3, and noticed there are some discussions relating to the version of readable-stream in archiver here: https://stackoverflow.com/questions/69751320/upload-stream-to-amazon-s3-using-nodejs-sdk-v3

Can anyone reproduce or provide a workaround?

Archiver 5.3.1
Node 18.16.0
@aws-sdk/client-s3 3.327.0
@aws-sdk/lib-storage 3.327.0

@Peterbyte Peterbyte changed the title fails to stream to S3 using AWS SDK v3 Fails to stream to S3 using AWS SDK v3 May 5, 2023
@danielkochengineer
Copy link

I am having the same problem as well. For me it seems especially files with larger file size tend to cause this issue. For some small files it works fine. I hope this can be solved.

@michael-raymond
Copy link

I ran into this today, and managed to get it working better, though perhaps not entirely. I don't fully understand what's happening, and came to the issues to look for an explanation.

If you start the S3 Upload promise first, but don't await it, then append to the zip, then await the .finalize() call, then await the Upload promise, it will go faster.

...

(async () => {

  ...

  const uploadPromise = s3Upload.done()

  const archive = archiver('zip', { zlib: { level: 0 } });
  archive.pipe(s3UploadStream);

  ...

  archive.append(downloadStream, { name: 'file.jpg' });

  await archive.finalize();
  await uploadPromise;
  console.log('done');
})();

I also tried await Promise.all([archive.finalize(), s3Upload.done()]), which also worked for my case, but was slower. I was zipping ~90 small files, and it took 18 seconds if I started both promises at the same time, but only 11 if I started the upload promise before appending. I didn't get any progress callbacks until after the finalisation had completed though. So I'm still confused as to what's actually happening.

@punkuz
Copy link

punkuz commented Jul 27, 2023

I am facing very similar issue, where I am pushing files by zipping them to a sftp server, for small multiple files its working, but for large files, for example I have 3 files each one above 1.12 gb, then I am getting "Archive Error Error: Aborted". has anyone tried such large files, or this library can't handle such big files

@6eyu
Copy link

6eyu commented Aug 31, 2023

I am having the same problem as well. For me it seems especially files with larger file size tend to cause this issue. For some small files it works fine. I hope this can be solved.

Similar problem here today. I solved this by adding httpsAgent in the s3Client config

const { S3Client} = require("@aws-sdk/client-s3");
const { NodeHttpHandler } = require("@aws-sdk/node-http-handler");
const https = require('https');

const s3Client = new S3Client({ 
    region: "ap-southeast-2",
    requestHandler: new NodeHttpHandler({
        httpsAgent: new https.Agent({
            KeepAlive: true,
            rejectUnauthorized: true
        })
})

@kvzaytsev
Copy link

Hello ! Is there any update on this issue? I am experiencing the same problem.
Adding httpsAgent did not solve the issue.

I am testing on archiving s3 prefix which contains 200 objects.

@kuhe
Copy link

kuhe commented Oct 27, 2023

Try increasing the highWaterMark of your PassThrough stream.

@deathemperor
Copy link

Try increasing the highWaterMark of your PassThrough stream.

do you mean like this?

const archive = Archiver("zip", {
      zlib: { level: 0 },
      highWaterMark: 1000 * 1024 * 1024,
    });

You can see I set it as 1GB but still not working

@thovden
Copy link

thovden commented Oct 29, 2023

Try increasing the highWaterMark of your PassThrough stream.

I have the same problem. To me it looks it is not streaming at all. Increasing the highWaterMark in the Passthrough stream seemingly works only because it loads more data in memory. Once the size of the input files are larger than the highWaterMark / memory it fails again. I have an event handler for httpUploadProgress (I use the S3 SDK uploader: import { Upload } from '@aws-sdk/lib-storage') and all of those logs come at the end, indicating there is no streaming until archiver.finalize().

@noahw3
Copy link

noahw3 commented Nov 2, 2023

Also experiencing this issue.

I dug into it a bit. The problem seems to be that the underlying _finalize call is never made, so node recognizes this as a hanging promise that will never resolve and exits.

In Archiver.prototype.finalize, this._queue.idle() is false so _finalize is not called directly from there.

Changing the order of promises as @michael-raymond mentioned above seems to make it work. In this case, _finalize gets called when onQueueDrain fires. Without the promise reordering, however, this never fires normally.

Anyway that's as far as I dug, hopefully this is helpful.

@CentralMatthew
Copy link

CentralMatthew commented Dec 23, 2023

Any updates on this?
Have the same issue

@vgjenks
Copy link

vgjenks commented Jan 8, 2024

Try increasing the highWaterMark of your PassThrough stream.

Thank you. This was extremely helpful when nothing else was, for a problem related to another package.

@fullstackzach
Copy link

fullstackzach commented Jan 18, 2024

We had trouble with our process ending with no error while trying to implement zip streaming with the s3 v3 sdk. The thing that helped us:

  1. creating a new s3 client for each readable / writeable stream (roger that this isn't optimal..)
  2. not using await on archive.finalize, but instead returning a new promise, and resolving on "end"

Hope this is helpful in getting someone get past this issue.

import {
  GetObjectCommand,
  GetObjectCommandOutput,
  ListObjectsV2Command,
  PutObjectCommand,
  S3Client,
} from "@aws-sdk/client-s3";
import { PassThrough } from "stream";
import { Upload } from "@aws-sdk/lib-storage";
import * as archiver from "archiver";

....

export const getReadableStreamFromS3 = async (
  key: string,
  bucketName: string,
): Promise<GetObjectCommandOutput["Body"] | undefined> => {
  const client = new S3Client({
    forcePathStyle: true,
    region,
    endpoint,
  });

  const command = new GetObjectCommand({
    Bucket: bucketName,
    Key: key,
  });

  const response = await client.send(command);

  return response.Body;
};

export const getWritableStreamFromS3 = (zipFileKey: string, bucketName: string): PassThrough => {
  const passthrough = new PassThrough();

  const client = new S3Client({
    forcePathStyle: true,
    region,
    endpoint,
  });

  new Upload({
    client,
    params: {
      Bucket: bucketName,
      Key: zipFileKey,
      Body: passthrough,
    },
  }).done();

  return passthrough;
};


export const generateAndStreamZipfileToS3 = async (
  s3KeyList: string[],
  zipFileS3Key: string,
  bucketName: string,
): Promise<void> => {
  // eslint-disable-next-line no-async-promise-executor
  return new Promise(async (resolve, reject) => {
    const pass = new PassThrough();
    const archive = archiver("zip", { zlib: { level: 9 } });
    const chunks: Buffer[] = [];

    archive.on("error", (err) => reject(err));
    pass.on("error", (err) => reject(err));
    pass.on("data", (chunk) => chunks.push(chunk));
    pass.on("end", async () => {
      const buffer = Buffer.concat(chunks);

      const uploadParams = {
        Bucket: bucketName,
        Key: zipFileS3Key,
        Body: buffer,
      };

      await s3Client.send(new PutObjectCommand(uploadParams));

      resolve();
    });

    archive.pipe(pass);

    for (const s3Key of s3KeyList) {
      const stream = (await getReadableStreamFromS3(s3Key, bucketName)) as Readable;
      archive.append(stream, { name: s3Key.split("/").pop()! });
    }

    archive.finalize();
  });
};

@Dirshant
Copy link

Dirshant commented Feb 1, 2024

A similar problem with a Heap out of memory error,
I am appending 100000 file streams to archive.append(file_stream, { name: file_name }) one by one in a loop
seems like the archiver keeps the reference of all streams in heap memory finally throwing the Heap out of memory error.

@marsilinou97
Copy link

I have the same problem, when I add ~200 images to the archive everything works fine, when I increase number of images to 1k my lambda exits with success status after 5 minutes (my lambda timeout is 15 minutes).

I tried the following:

  • Increase highWaterMark to 1GB
  • Override requestHandler using NodeHttpHandler
  • Await promises in order as mentioned above
    await zip.finalize();
    console.log("Finalized the zip archive");
    await uploadPromise

@Keithcat767
Copy link

Keithcat767 commented Mar 19, 2024

This gist helped me a ton after running into swallowed errors and my lambdas ending without any notice: https://gist.github.com/amiantos/16bacc9ed742c91151fcf1a41012445e?permalink_comment_id=3804034#gistcomment-3804034. Might help you too!

@xuxucode
Copy link

xuxucode commented Oct 12, 2024

Hi, I create a zip for 35 files (50M per file) from GCS(Google Cloud Storage), since they are remote files, streaming them one by one works for me:

const files = [/* your files */]
const archive = archiver('zip', {
  zlib: { level: 1 },
})

archive
  .on('entry', (entry) => { // fires when the file been processed and appended to the archive
    // Stream the next file.
    // In contrast to local files, it's reasonable to stream remote files
    // one by one, or else when streaming massive remote files at once
    // (i.e. call `archive.append()` for all files), it's likely the
    // streaming will be stuck.
    streamNext()
  })

let index = 0
const streamNext = () => {
  const file = files[index++]
  if (!file) {
    // Finalize when all files are processed.
    archive.finalize().then(() => console.log('archive finalized'))
    return
  }

  const stream = createGcsReadStream(file.url)
  archive.append(stream, { name: 'something' })
}

// Start streaming.
streamNext()

@mikecbrant
Copy link

Had similar setup to @fullstackzach (archiver + @aws-sdk/lib-storage) and found that the advice on initiating the S3 Upload command with the PassThrough stream BEFORE piping archiver into it.

We found that we could use same S3 Client for all operations (we get Readable stream from S3 for each file using same client).

# in main zip+upload function

  const bodyStream = new PassThrough()

  // we consider streams to be finished on 'end' events as opposed to 'close'
  // as it seems when working without file descriptors, 'close' may not be fired

  bodyStream.on('finish', () => {
    logger.info('Body stream finished receiving data from archive')
  })

  bodyStream.on('pipe', () => {
    logger.info('Body stream piped')
  })

  const bodyStreamPromise = new Promise<void>((resolve, reject) => {
    bodyStream.on('error', () => {
      reject(error)
    })
    bodyStream.on('end', () => {
      logger.info('Body stream finished writing')
      resolve()
    })
  })

  // initialize upload stream prior to piping to archive

  // see below for upload code
  const uploadPromise = upload({
    body: bodyStream,
    bucket: s3Bucket,
    client: s3Client,
    key: zipKey,
    logger
  })
  
  const archive = archiver('zip', archiverOpts)

  archive.on('entry', (event) => {
    // @todo make logging debug once happy with behavior
    logger.info('Archive entry', { event })
  })

  archive.on('progress', (event) => {
    // @todo make debug once happy with behavior
    logger.info('Archive progress', { event })
  })

  archive.on('finish', () => {
    // @todo make debug once happy with behavior
    logger.info('Archive finished reading data streams')
  })

  // the promise returned by archive.finalize() does not seem to work quite correctly
  // see https://github.com/archiverjs/node-archiver/issues/662
  // so we make our own promise
  const archivePromise = new Promise<void>((resolve, reject) => {
    archive.on('error', (error) => {
      reject(error)
    })

    // documentation indicates errors like ENOENT could come through on this event
    // for now, throw all of these until we understand if any error types are recoverable
    archive.on('warning', (error) => {
      reject(error)
    })

    archive.on('end', () => {
      logger.info('Archive finished writing')
      resolve()
    })
  })

  // start the upload stream
  archive.pipe(bodyStream)
 
  for (...) {
    ...
    // readable is S3 Readable stream
    archive.append(readable as Readable, { name: `${entityPath}/${pdfName}` })
    ...
  }

  logger.info('Zip archive finalized')
  archive.finalize()

  // code not shown, just a convenience method we use for Promise.allSettled()
  const { error } = await settlePromises<void | boolean>([archivePromise, bodyStreamPromise, uploadPromise])

  if (error) {
    logger.error('Error writing zip to S3', { error, s3Bucket, zipKey })
    return failExport('Zip upload error', { cnt, logger, state })
  }
# S3 upload function

export type S3UploadInput = {
  body: StreamingBlobPayloadInputTypes
  bucket: string
  client: S3Client
  key: string
  logger?: AppLogger | Console
}

export const upload = async ({ body: Body, bucket: Bucket, client, key: Key, logger = console }: S3UploadInput): Promise<boolean> => {
  logger.info('Upload requested', { Bucket, Key })

  const up = new Upload({
    client,
    params: { Body, Bucket, Key }
  })

  logger.info('Upload initiated')

  up.on('httpUploadProgress', (progress) => {
    logger.debug('Upload progress', { progress })
  })

  await up.done()
  logger.info('Upload complete')

  return true
}

@tynan-cr
Copy link

tynan-cr commented Dec 4, 2024

This gist helped me a ton after running into swallowed errors and my lambdas ending without any notice: https://gist.github.com/amiantos/16bacc9ed742c91151fcf1a41012445e?permalink_comment_id=3804034#gistcomment-3804034. Might help you too!

I got it working - I followed the linked instructions exactly, but I had to downgrade to archiver 5.3.2! Once I followed those instructions with 5.3.2 it worked perfectly. 6.0.0 and newer do not work with the solution in the linked gist comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests