Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Lambdas to create and update in parallel #3976

Merged
merged 10 commits into from
Jun 7, 2024
47 changes: 47 additions & 0 deletions patches/0059-Parallelize-Lambda-Function-resource-operations.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Florian Stadler <florian@pulumi.com>
Date: Wed, 22 May 2024 17:01:32 +0200
Subject: [PATCH] Parallelize Lambda Function resource operations

Upstream introduced serialization of Lambda Function resource
operations to fight high memory usage when managing a lot of
Lambda functions.
We think this was an optimization for a special edge case that
drastically worsens the UX for the majority of users.

diff --git a/internal/service/lambda/function.go b/internal/service/lambda/function.go
index 4c49c5fb21..f2ce387f67 100644
--- a/internal/service/lambda/function.go
+++ b/internal/service/lambda/function.go
@@ -35,7 +35,6 @@ import (

const (
FunctionVersionLatest = "$LATEST"
- mutexKey = `aws_lambda_function`
listVersionsMaxItems = 10000
)

@@ -481,11 +480,6 @@ func resourceFunctionCreate(ctx context.Context, d *schema.ResourceData, meta in
}

if v, ok := d.GetOk("filename"); ok {
- // Grab an exclusive lock so that we're only reading one function into memory at a time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

- // See https://github.com/hashicorp/terraform/issues/9364.
- conns.GlobalMutexKV.Lock(mutexKey)
- defer conns.GlobalMutexKV.Unlock(mutexKey)
-
zipFile, err := readFileContents(v.(string))

if err != nil {
@@ -942,11 +936,6 @@ func resourceFunctionUpdate(ctx context.Context, d *schema.ResourceData, meta in
}

if v, ok := d.GetOk("filename"); ok {
- // Grab an exclusive lock so that we're only reading one function into memory at a time.
- // See https://github.com/hashicorp/terraform/issues/9364
- conns.GlobalMutexKV.Lock(mutexKey)
- defer conns.GlobalMutexKV.Unlock(mutexKey)
-
zipFile, err := readFileContents(v.(string))

if err != nil {
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Florian Stadler <florian@pulumi.com>
Date: Fri, 31 May 2024 12:29:36 +0200
Subject: [PATCH] Create Logging Middleware for Lambda service that does not
log the lambda code archive

When creating lambda functions and directly uploading the code, then the whole archive
is being logged as a base64 encoded string as part of the HTTP request logger.
In order to do so, multiple copies of the body are created in memory, which leads
to memory bloating.
This change fixes that by redacting the body in the logs for the Create/Update Lambda
calls.

diff --git a/internal/service/lambda/request_response_logger.go b/internal/service/lambda/request_response_logger.go
new file mode 100644
index 0000000000..58563221ab
--- /dev/null
+++ b/internal/service/lambda/request_response_logger.go
@@ -0,0 +1,107 @@
+package lambda
+
+import (
+ "context"
+ "fmt"
+ "github.com/aws/aws-sdk-go-v2/aws"
+ awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
+ "github.com/aws/smithy-go/middleware"
+ smithyhttp "github.com/aws/smithy-go/transport/http"
+ "github.com/hashicorp/aws-sdk-go-base/v2/logging"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+ _ "unsafe"
+)
+
+const (
+ lambdaCreateOperation = "CreateFunction"
+ lambdaUpdateFunctionCodeOperation = "UpdateFunctionCode"
+)
+
+// Replaces the upstream logging middleware from https://github.com/hashicorp/aws-sdk-go-base/blob/main/logger.go#L107
+// We do not want to log the Lambda Archive that is part of the request body because this leads to bloating memory
+type wrappedRequestResponseLogger struct {
+ wrapped middleware.DeserializeMiddleware
+}
+
+// ID is the middleware identifier.
+func (r *wrappedRequestResponseLogger) ID() string {
+ return "PULUMI_AWS_RequestResponseLogger"
+}
+
+func NewWrappedRequestResponseLogger(wrapped middleware.DeserializeMiddleware) middleware.DeserializeMiddleware {
+ return &wrappedRequestResponseLogger{wrapped: wrapped}
+}
+
+//go:linkname decomposeHTTPResponse github.com/hashicorp/aws-sdk-go-base/v2.decomposeHTTPResponse
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna highlight this. If we don't do this, we need to fork aws-sdk-go-base or not log the response at all.

In case upstream changes the symbol, we catch it during compile time

+func decomposeHTTPResponse(ctx context.Context, resp *http.Response, elapsed time.Duration) (map[string]any, error)
+
+func (r *wrappedRequestResponseLogger) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing this was inlined and edited from upstream ? I think that's great and worth doing, just would be helpful to indicate that in the comment, and highlight the edits, just in case. Since we forked it the upstream changes to this handler will not propagate anymore but that sounds ok.

+) (
+ out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
+) {
+ if awsmiddleware.GetServiceID(ctx) == "Lambda" {
+ if op := awsmiddleware.GetOperationName(ctx); op != lambdaCreateOperation && op != lambdaUpdateFunctionCodeOperation {
+ // pass through to the wrapped response logger for all other lambda operations that do not send the code as part of the request body
+ return r.wrapped.HandleDeserialize(ctx, in, next)
+ }
+ }
+
+ logger := logging.RetrieveLogger(ctx)
+ region := awsmiddleware.GetRegion(ctx)
+
+ if signingRegion := awsmiddleware.GetSigningRegion(ctx); signingRegion != region { //nolint:staticcheck // Not retrievable elsewhere
+ ctx = logger.SetField(ctx, string(logging.SigningRegionKey), signingRegion)
+ }
+ if awsmiddleware.GetEndpointSource(ctx) == aws.EndpointSourceCustom {
+ ctx = logger.SetField(ctx, string(logging.CustomEndpointKey), true)
+ }
+
+ req, ok := in.Request.(*smithyhttp.Request)
+ if !ok {
+ return out, metadata, fmt.Errorf("unexpected request middleware type %T", in.Request)
+ }
+
+ rc := req.Build(ctx)
+
+ originalBody := rc.Body
+ // remove the body from the logging output
+ redactedBody := strings.NewReader("[Redacted]")
+ rc.Body = io.NopCloser(redactedBody)
+ rc.ContentLength = redactedBody.Size()
+
+ requestFields, err := logging.DecomposeHTTPRequest(ctx, rc)
+ if err != nil {
+ return out, metadata, fmt.Errorf("decomposing request: %w", err)
+ }
+ logger.Debug(ctx, "HTTP Request Sent", requestFields)
+
+ // reconstruct the original request
+ req, err = req.SetStream(originalBody)
+ if err != nil {
+ return out, metadata, err
+ }
+ in.Request = req
+
+ start := time.Now()
+ out, metadata, err = next.HandleDeserialize(ctx, in)
+ duration := time.Since(start)
+
+ if err != nil {
+ return out, metadata, err
+ }
+
+ if res, ok := out.RawResponse.(*smithyhttp.Response); !ok {
+ return out, metadata, fmt.Errorf("unknown response type: %T", out.RawResponse)
+ } else {
+ responseFields, err := decomposeHTTPResponse(ctx, res.Response, duration)
+ if err != nil {
+ return out, metadata, fmt.Errorf("decomposing response: %w", err)
+ }
+ logger.Debug(ctx, "HTTP Response Received", responseFields)
+ }
+
+ return out, metadata, err
+}
diff --git a/internal/service/lambda/service_package_extra.go b/internal/service/lambda/service_package_extra.go
index 54f6aac15a..1f2440d3e3 100644
--- a/internal/service/lambda/service_package_extra.go
+++ b/internal/service/lambda/service_package_extra.go
@@ -6,6 +6,7 @@ import (
aws_sdkv2 "github.com/aws/aws-sdk-go-v2/aws"
retry_sdkv2 "github.com/aws/aws-sdk-go-v2/aws/retry"
lambda_sdkv2 "github.com/aws/aws-sdk-go-v2/service/lambda"
+ "github.com/aws/smithy-go/middleware"
tfawserr_sdkv2 "github.com/hashicorp/aws-sdk-go-base/v2/tfawserr"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/names"
@@ -34,6 +35,19 @@ func (p *servicePackage) NewClient(ctx context.Context, config map[string]any) (
if endpoint := config[names.AttrEndpoint].(string); endpoint != "" {
o.BaseEndpoint = aws_sdkv2.String(endpoint)
}
+
+ // Switch out the terraform http logging middleware with a custom logging middleware that does not log the
+ // lambda code. Logging the lambda code leads to memory bloating because it allocates a lot of copies of the
+ // body
+ o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error {
+ loggingMiddleware, err := stack.Deserialize.Remove("TF_AWS_RequestResponseLogger")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the name of the logging middleware ever changes it'll fail during the integration tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice !!

+ if err != nil {
+ return err
+ }
+
+ err = stack.Deserialize.Add(NewWrappedRequestResponseLogger(loggingMiddleware), middleware.After)
+ return err
+ })
o.Retryer = conns.AddIsErrorRetryables(cfg.Retryer().(aws_sdkv2.RetryerV2), retry)
}), nil
}
69 changes: 69 additions & 0 deletions provider/provider_nodejs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,72 @@ func TestRegressAttributeMustBeWholeNumber(t *testing.T) {
result := test.Preview()
t.Logf("#%v", result.ChangeSummary)
}

func TestParallelLambdaCreation(t *testing.T) {
if testing.Short() {
t.Skipf("Skipping test in -short mode because it needs cloud credentials")
return
}

tempFile, err := createLambdaArchive(25 * 1024 * 1024)
require.NoError(t, err)
defer os.Remove(tempFile)

maxDuration(5*time.Minute, t, func(t *testing.T) {
test := getJSBaseOptions(t).
With(integration.ProgramTestOptions{
Dir: filepath.Join("test-programs", "parallel-lambdas"),
Config: map[string]string{
"lambda:archivePath": tempFile,
},
// Lambdas have diffs on every update (source code hash)
AllowEmptyPreviewChanges: true,
SkipRefresh: true,
})

integration.ProgramTest(t, &test)
}
}

func createLambdaArchive(size int64) (string, error) {
// Create a temporary file to save the zip archive
tempFile, err := os.CreateTemp("", "archive-*.zip")
if err != nil {
return "", err
}
defer tempFile.Close()

// Create a new zip archive
zipWriter := zip.NewWriter(tempFile)
defer zipWriter.Close()

randomDataReader := io.LimitReader(rand.Reader, size)

// Create the index.js file for the lambda
indexWriter, err := zipWriter.Create("index.js")
if err != nil {
return "", err
}
_, err = indexWriter.Write([]byte("const { version } = require(\"@aws-sdk/client-s3/package.json\");\n\nexports.handler = async () => ({ version });\n"))
if err != nil {
return "", err
}

randomDataWriter, err := zipWriter.Create("random.txt")
if err != nil {
return "", err
}
_, err = io.Copy(randomDataWriter, randomDataReader)
if err != nil {
return "", err
}

// Get the path of the temporary file
archivePath, err := filepath.Abs(tempFile.Name())
if err != nil {
return "", err
}

return archivePath, nil
}

3 changes: 3 additions & 0 deletions provider/test-programs/parallel-lambdas/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: parallel-lambdas
runtime: nodejs
description: Parallel Lambdas example
46 changes: 46 additions & 0 deletions provider/test-programs/parallel-lambdas/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const config = new pulumi.Config("aws");
const providerOpts = { provider: new aws.Provider("prov", { region: <aws.Region>config.require("envRegion") }) };
const lambdaConfig = new pulumi.Config("lambda");

const assumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["lambda.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
}, providerOpts);
const role = new aws.iam.Role("parallel-iam-role", {
assumeRolePolicy: assumeRole.then(assumeRole => assumeRole.json)
}, providerOpts);

for (let i = 0; i < 25; i++) {
new aws.lambda.Function(`lambda-${i}`, {
code: new pulumi.asset.FileArchive(lambdaConfig.require("archivePath")),
role: role.arn,
handler: "index.handler",
runtime: "nodejs20.x",
}, {
...providerOpts,
ignoreChanges: ["replacementSecurityGroupIds"],
});
}
16 changes: 16 additions & 0 deletions provider/test-programs/parallel-lambdas/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "parallel-lambdas",
"version": "0.0.1",
"license": "Apache-2.0",
"scripts": {
"build": "tsc"
},
"dependencies": {
"@pulumi/pulumi": "^3.0.0",
"@pulumi/aws": "^6.0.0",
"@pulumi/std": "^1.6.2"
},
"devDependencies": {
"@types/node": "^8.0.0"
}
}
18 changes: 18 additions & 0 deletions provider/test-programs/parallel-lambdas/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"compilerOptions": {
"strict": true,
"outDir": "bin",
"target": "es2016",
"module": "commonjs",
"moduleResolution": "node",
"sourceMap": true,
"experimentalDecorators": true,
"pretty": true,
"noFallthroughCasesInSwitch": true,
"noImplicitReturns": true,
"forceConsistentCasingInFileNames": true
},
"files": [
"index.ts"
]
}
Loading