Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 Downstream Span Pointers #587

Merged
merged 15 commits into from
Dec 6, 2024
28 changes: 26 additions & 2 deletions src/trace/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Context } from "aws-lambda";

import { patchHttp, unpatchHttp } from "./patch-http";

import { extractTriggerTags, extractHTTPStatusCodeTag } from "./trigger";
import { extractTriggerTags, extractHTTPStatusCodeTag, parseEventSource } from "./trigger";
import { ColdStartTracerConfig, ColdStartTracer } from "./cold-start-tracer";
import { logDebug, tagObject } from "../utils";
import { didFunctionColdStart, isProactiveInitialization } from "../utils/cold-start";
Expand All @@ -17,6 +17,7 @@ import { TraceContext, TraceContextService, TraceSource } from "./trace-context-
import { StepFunctionContext, StepFunctionContextService } from "./step-function-service";
import { XrayService } from "./xray-service";
import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http";
import { getSpanPointerAttributes } from "../utils/span-pointers";
export type TraceExtractor = (event: any, context: Context) => Promise<TraceContext> | TraceContext;

export interface TraceConfig {
Expand Down Expand Up @@ -70,6 +71,12 @@ export interface TraceConfig {
coldStartTraceSkipLib: string;
}

interface SpanPointerAttributes {
pointerKind: string;
pointerDirection: string;
pointerHash: string;
}
nhulston marked this conversation as resolved.
Show resolved Hide resolved

export class TraceListener {
private contextService: TraceContextService;
private context?: Context;
Expand All @@ -80,6 +87,7 @@ export class TraceListener {
private wrappedCurrentSpan?: SpanWrapper;
private triggerTags?: { [key: string]: string };
private lambdaSpanParentContext?: SpanContext;
private spanPointerAttributesList: SpanPointerAttributes[] = [];

public get currentTraceHeaders() {
return this.contextService.currentTraceHeaders;
Expand Down Expand Up @@ -131,8 +139,14 @@ export class TraceListener {

this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext;
this.context = context;
this.triggerTags = extractTriggerTags(event, context);
const eventSource = parseEventSource(event);
this.triggerTags = extractTriggerTags(event, context, eventSource);
this.stepFunctionContext = StepFunctionContextService.instance().context;

const result = getSpanPointerAttributes(eventSource, event);
if (result) {
this.spanPointerAttributesList.push(...result);
nhulston marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand Down Expand Up @@ -195,6 +209,16 @@ export class TraceListener {
}
}
}

if (this.wrappedCurrentSpan) {
for (const attributes of this.spanPointerAttributesList) {
this.wrappedCurrentSpan.span.addSpanPointer(
attributes.pointerKind,
attributes.pointerDirection,
attributes.pointerHash,
);
}
}
return false;
}

Expand Down
9 changes: 6 additions & 3 deletions src/trace/trigger.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,17 @@ describe("parseEventSource", () => {
it("extracts all trigger tags", () => {
for (let event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(eventData);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
expect(triggerTags).toEqual(event.result);
}
});

it("extracts the status code if API Gateway, ALB, or Function URL, otherwise do nothing, for buffered functions", () => {
for (const event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(eventData);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
const isResponseStreamingFunction = false;
for (const response of bufferedResponses) {
const statusCode = extractHTTPStatusCodeTag(triggerTags, response.responseBody, isResponseStreamingFunction);
Expand All @@ -206,7 +208,8 @@ describe("parseEventSource", () => {
it("extracts the status code if API Gateway, ALB, or Function URL, otherwise do nothing, for streaming functions", () => {
for (let event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(eventData);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
const isResponseStreamingFunction = true;
for (const response of streamingResponses) {
const statusCode = extractHTTPStatusCodeTag(triggerTags, response.responseBody, isResponseStreamingFunction);
Expand Down
3 changes: 1 addition & 2 deletions src/trace/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,8 @@ function extractHTTPTags(event: APIGatewayEvent | APIGatewayProxyEventV2 | ALBEv
/**
* extractTriggerTags parses the trigger event object for tags to be added to the span metadata
*/
export function extractTriggerTags(event: any, context: Context) {
export function extractTriggerTags(event: any, context: Context, eventSource: eventTypes | undefined) {
nhulston marked this conversation as resolved.
Show resolved Hide resolved
let triggerTags: { [key: string]: string } = {};
const eventSource = parseEventSource(event);
if (eventSource) {
triggerTags["function_trigger.event_source"] = eventSource;

Expand Down
159 changes: 159 additions & 0 deletions src/utils/span-pointers.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { getSpanPointerAttributes } from "./span-pointers";
import { eventTypes } from "../trace/trigger";

// tslint:disable-next-line:no-var-requires
nhulston marked this conversation as resolved.
Show resolved Hide resolved
const { S3_PTR_KIND, SPAN_POINTER_DIRECTION } = require("dd-trace/packages/dd-trace/src/constants");
nhulston marked this conversation as resolved.
Show resolved Hide resolved
// tslint:disable-next-line:no-var-requires
nhulston marked this conversation as resolved.
Show resolved Hide resolved
const util = require("dd-trace/packages/dd-trace/src/util");
nhulston marked this conversation as resolved.
Show resolved Hide resolved

// Mock the external dependencies
jest.mock("./log", () => ({
logDebug: jest.fn(),
}));

interface SpanPointerAttributes {
pointerKind: string;
pointerDirection: string;
pointerHash: string;
}

describe("span-pointers utils", () => {
const mockPointerHash = "mock-hash-123";

beforeEach(() => {
jest.spyOn(util, "generatePointerHash").mockReturnValue(mockPointerHash);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not let the generate pointer hash function actually run?

Copy link
Contributor Author

@nhulston nhulston Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have tests for generatePointerHash in the tracer; i just wanted to test only the logic behind the functions created in this PR. That way, if we ever make changes to how the generatePointerHash function works in the tracer, it won't break the tests here. And the changes will be automatically propagated to here, requiring no code updates.

});

afterEach(() => {
jest.clearAllMocks();
});

describe("getSpanPointerAttributes", () => {
it("returns undefined when eventSource is undefined", () => {
const result = getSpanPointerAttributes(undefined, {});
expect(result).toBeUndefined();
});

it("returns undefined for unsupported event types", () => {
const result = getSpanPointerAttributes("unsupported" as eventTypes, {});
expect(result).toBeUndefined();
});

describe("S3 event processing", () => {
it("processes single S3 record correctly", () => {
const event = {
Records: [
{
s3: {
bucket: { name: "test-bucket" },
object: {
key: "test-key",
eTag: "test-etag",
},
},
eventName: "ObjectCreated:SomeEventName",
},
],
};

const expected: SpanPointerAttributes[] = [
{
pointerKind: S3_PTR_KIND,
pointerDirection: SPAN_POINTER_DIRECTION.UPSTREAM,
pointerHash: mockPointerHash,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
expect(util.generatePointerHash).toHaveBeenCalledWith(["test-bucket", "test-key", "test-etag"]);
});

it("processes multiple S3 records correctly", () => {
const event = {
Records: [
{
s3: {
bucket: { name: "bucket1" },
object: {
key: "key1",
eTag: "etag1",
},
},
eventName: "ObjectCreated:SomeEventName",
},
{
s3: {
bucket: { name: "bucket2" },
object: {
key: "key2",
eTag: "etag2",
},
},
eventName: "ObjectCreated:SomeEventName",
},
],
};

const expected: SpanPointerAttributes[] = [
{
pointerKind: S3_PTR_KIND,
pointerDirection: SPAN_POINTER_DIRECTION.UPSTREAM,
pointerHash: mockPointerHash,
},
{
pointerKind: S3_PTR_KIND,
pointerDirection: SPAN_POINTER_DIRECTION.UPSTREAM,
pointerHash: mockPointerHash,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
});

it("handles empty Records array", () => {
const event = { Records: [] };
const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual([]);
});

it("handles missing Records property", () => {
const event = {};
const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual([]);
});

it("skips invalid records but processes valid ones", () => {
const event = {
Records: [
{
// Invalid record missing s3 property
},
{
s3: {
bucket: { name: "valid-bucket" },
object: {
key: "valid-key",
eTag: "valid-etag",
},
},
eventName: "ObjectCreated:SomeEventName",
},
],
};

const expected: SpanPointerAttributes[] = [
{
pointerKind: S3_PTR_KIND,
pointerDirection: SPAN_POINTER_DIRECTION.UPSTREAM,
pointerHash: mockPointerHash,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
});
});
});
});
85 changes: 85 additions & 0 deletions src/utils/span-pointers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { eventTypes } from "../trace/trigger";
import { logDebug } from "./log";

interface SpanPointerAttributes {
pointerKind: string;
pointerDirection: string;
pointerHash: string;
nhulston marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Computes span pointer attributes
*
* @param {eventTypes} eventSource - The type of event being processed (e.g., S3, DynamoDB).
* @param {any} event - The event object containing source-specific data.
* @returns {SpanPointerAttributes[] | undefined} An array of span pointer attribute objects, or undefined if none could be computed.
*/
export function getSpanPointerAttributes(
eventSource: eventTypes | undefined,
event: any,
nhulston marked this conversation as resolved.
Show resolved Hide resolved
): SpanPointerAttributes[] | undefined {
if (!eventSource) {
return;
}

switch (eventSource) {
case eventTypes.s3:
return processS3Event(event);
default:
logDebug(`Event type ${eventSource} not supported by span pointers.`);
nhulston marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}

function processS3Event(event: any): SpanPointerAttributes[] {
nhulston marked this conversation as resolved.
Show resolved Hide resolved
const records = event.Records || [];
const spanPointerAttributesList: SpanPointerAttributes[] = [];

// Get dependencies from tracer only when needed
let constants;
let util;
try {
constants = require("dd-trace/packages/dd-trace/src/constants");
util = require("dd-trace/packages/dd-trace/src/util");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duncanista, can you help answer a question for me?

I recall when you added support for w3c trace propagation to datadog-lambda-js, you said that instead of directly importing the tracer from dd-trace, we create an interface to wrap it.

If that's correct, then is there any issue with us requiring other files from ddtrace here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be an issue when customer is not using the tracer (some do it), it would essentially crash, ideally, we'd have a better way to get this type of data

Copy link
Contributor Author

@nhulston nhulston Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duncanista Could you take another look at the updated code from my latest commit? It should be safe and avoid any crashing:

  let S3_PTR_KIND;
  let SPAN_POINTER_DIRECTION;
  let generatePointerHash;
  try {
    const constants = require("dd-trace/packages/dd-trace/src/constants");
    const util = require("dd-trace/packages/dd-trace/src/util");

    ({ S3_PTR_KIND, SPAN_POINTER_DIRECTION } = constants);
    ({ generatePointerHash } = util);
  } catch (err) {
    if (err instanceof Error) {
      logDebug("Failed to load dd-trace span pointer dependencies", err);
    }
    return spanPointerAttributesList;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duncanista, Can you give more details (or point me to a doc somewhere) about how "customer is not using the tracer" is implemented? Do you mean they just aren't including ddtrace in their package.json file? I'd like to be able to create a small sample app that I can test this PR with that "is not using the tracer".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah correct, just the lambda library, not the tracer. Let me review the whole PR

} catch (err) {
if (err instanceof Error) {
logDebug("Failed to load dd-trace span pointer dependencies", err);
}
return spanPointerAttributesList;
}

const { S3_PTR_KIND, SPAN_POINTER_DIRECTION } = constants;
const { generatePointerHash } = util;

for (const record of records) {
const eventName = record.eventName;
if (!eventName || !eventName.startsWith("ObjectCreated")) {
continue;
}
// Values are stored in the same place, regardless of AWS SDK v2/v3 or the event type.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
const s3Event = record?.s3;
const bucketName = s3Event?.bucket?.name;
const objectKey = s3Event?.object?.key;
let eTag = s3Event?.object?.eTag;

if (!bucketName || !objectKey || !eTag) {
logDebug("Unable to calculate span pointer hash because of missing parameters.");
continue;
}

// https://github.com/DataDog/dd-span-pointer-rules/blob/main/AWS/S3/Object/README.md
if (eTag.startsWith('"') && eTag.endsWith('"')) {
eTag = eTag.slice(1, -1);
}
const pointerHash = generatePointerHash([bucketName, objectKey, eTag]);
const spanPointerAttributes: SpanPointerAttributes = {
pointerKind: S3_PTR_KIND,
pointerDirection: SPAN_POINTER_DIRECTION.UPSTREAM,
pointerHash,
};
spanPointerAttributesList.push(spanPointerAttributes);
}

return spanPointerAttributesList;
}
Loading