Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-s3 upload aws credentials timing out for long-running chunked payloads. #996

Closed
thadeusb opened this issue Jun 4, 2018 · 2 comments
Closed
Labels
Milestone

Comments

@thadeusb
Copy link

thadeusb commented Jun 4, 2018

We are getting this error when using multipartUploadWithHeaders.

akka.stream.alpakka.s3.impl.FailedUpload: Cannot find etag in HttpResponse(403 Forbidden,List(x-amz-request-id: **, x-amz-id-2: **, Date: Fri, 01 Jun 2018 16:38:45 GMT, Connection: close, Server: AmazonS3),HttpEntity.Chunked(application/xml),HttpProtocol(HTTP/1.1))

This is due to the fact the stream we are uploading to S3 can take a couple of hours to complete. The S3 authentication credentials we are using are provided by Hashicorp Vault and are only valid for 15 minutes and then a new set of credentials need to be generated. Which means in the middle of the multipart upload we need to switch to a new set of credentials every 15 minutes for new upload chunks.

We attempted to get around this by passing in a custom AWS credential provider that will refresh the credentials every 15 minutes.

import java.time.Instant

import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicSessionCredentials}

class AWSVaultCredentialsProvider(vaultSecrets: VaultSecrets) extends AWSCredentialsProvider {

  /**
    * The threshold before credentials expire at which new credentials
    * will be loaded.
    */
  private[this] lazy val EXPIRATION_THRESHOLD = 900
  private[this] var expiration: Instant = Instant.now
  private[this] var credentials: AWSCredentials = null

  def getCredentials(): AWSCredentials = {
    if (needsToFetchCredentials()) {
      fetchCredentials()
    }

    credentials
  }

  def refresh() = {
    credentials = null
  }

  private[this] def genCreds() = vaultSecrets.read()

  private[this] def needsToFetchCredentials(): Boolean = {
    (credentials == null) || (Instant.now.isAfter(expiration))
  }

  private[this] def fetchCredentials() = {
    val creds = genCreds()
    credentials = new BasicSessionCredentials(creds("access_key"), creds("secret_key"), creds("security_token"))
    expiration = Instant.now.plusSeconds(EXPIRATION_THRESHOLD)
  }
}

The problem is alpakka s3 is not calling getCredentials for every sub request.

See
https://github.com/akka/alpakka/blob/master/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala#L334.

    // use the same key for all sub-requests (chunks)
    val key: SigningKey = signingKey

    val headers: S3Headers = S3Headers(sse.fold[Seq[HttpHeader]](Seq.empty) { _.headersFor(UploadPart) })

    SplitAfterSize(chunkSize)(atLeastOneByteString)
      .via(getChunkBuffer(chunkSize)) //creates the chunks
      .concatSubstreams
      .zipWith(requestInfo) {
        case (chunkedPayload, (uploadInfo, chunkIndex)) =>
          //each of the payload requests are created
          val partRequest =
            uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload.data, chunkedPayload.size, headers)
          (partRequest, (uploadInfo, chunkIndex))
      }
      .mapAsync(parallelism) { case (req, info) => Signer.signedRequest(req, key).zip(Future.successful(info)) }

It appears the issue could be solved by moving the logic to getCredentials from caching the credentials upfront before starting the multipart upload to having it call getCredentials for every sub request for the multipart upload.

Would this change work here?

    // use the same key for all sub-requests (chunks)
    // val key: SigningKey = signingKey

    val headers: S3Headers = S3Headers(sse.fold[Seq[HttpHeader]](Seq.empty) { _.headersFor(UploadPart) })

    SplitAfterSize(chunkSize)(atLeastOneByteString)
      .via(getChunkBuffer(chunkSize)) //creates the chunks
      .concatSubstreams
      .zipWith(requestInfo) {
        case (chunkedPayload, (uploadInfo, chunkIndex)) =>
          //each of the payload requests are created
          val partRequest =
            uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload.data, chunkedPayload.size, headers)
          (partRequest, (uploadInfo, chunkIndex))
      }
      .mapAsync(parallelism) { case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) }

Is this something that can be fixed in alpakka itself, or is there a recommended way we can override this method in our codebase to get this result?

Thank you for your time.

@ennru ennru added the p:aws-s3 label Jun 5, 2018
@ennru
Copy link
Member

ennru commented Jun 5, 2018

Yes, your proposed change seems reasonable. Please open a Pull Request.

cc @sfali

ahd985 added a commit to ahd985/alpakka that referenced this issue Jun 5, 2018
Adjust S3 MultiPartUpload to use non-static signing key.  See issue: 

akka#996
@ennru ennru added this to the 0.20 milestone Jun 29, 2018
@ennru
Copy link
Member

ennru commented Jun 29, 2018

Fixed with #998

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants