Skip to content

Commit

Permalink
[serverless] Add DynamoDB Span Pointers (#4912)
Browse files Browse the repository at this point in the history
* Add span pointer support for updateItem and deleteItem

* putItem support

* transactWriteItem support

* batchWriteItem support

* Add unit+integration tests (very large commit)

* Move `DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS` parsing logic to config.js

* Code refactoring

* Move util functions to packages/datadog-plugin-aws-sdk/

* lint

* log when encountering errors in `encodeValue`; fix test

* Send config env var as string to telemetry; handle parsing logic in dynamodb.js

* Update config_norm_rules.json

* fix test

* Add unit tests for DynamoDB generatePointerHash

* better logging + checks
  • Loading branch information
nhulston authored and rochdev committed Dec 18, 2024
1 parent 6fa6620 commit f4bc89c
Show file tree
Hide file tree
Showing 10 changed files with 1,300 additions and 35 deletions.
154 changes: 154 additions & 0 deletions packages/datadog-plugin-aws-sdk/src/services/dynamodb.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
'use strict'

const BaseAwsSdkPlugin = require('../base')
const log = require('../../../dd-trace/src/log')
const { DYNAMODB_PTR_KIND, SPAN_POINTER_DIRECTION } = require('../../../dd-trace/src/constants')
const { extractPrimaryKeys, generatePointerHash } = require('../util')

class DynamoDb extends BaseAwsSdkPlugin {
static get id () { return 'dynamodb' }
Expand Down Expand Up @@ -48,6 +51,157 @@ class DynamoDb extends BaseAwsSdkPlugin {

return tags
}

addSpanPointers (span, response) {
const request = response?.request
const operationName = request?.operation

const hashes = []
switch (operationName) {
case 'putItem': {
const hash = DynamoDb.calculatePutItemHash(
request?.params?.TableName,
request?.params?.Item,
this.getPrimaryKeyConfig()
)
if (hash) hashes.push(hash)
break
}
case 'updateItem':
case 'deleteItem': {
const hash = DynamoDb.calculateHashWithKnownKeys(request?.params?.TableName, request?.params?.Key)
if (hash) hashes.push(hash)
break
}
case 'transactWriteItems': {
const transactItems = request?.params?.TransactItems || []
for (const item of transactItems) {
if (item.Put) {
const hash =
DynamoDb.calculatePutItemHash(item.Put.TableName, item.Put.Item, this.getPrimaryKeyConfig())
if (hash) hashes.push(hash)
} else if (item.Update || item.Delete) {
const operation = item.Update ? item.Update : item.Delete
const hash = DynamoDb.calculateHashWithKnownKeys(operation.TableName, operation.Key)
if (hash) hashes.push(hash)
}
}
break
}
case 'batchWriteItem': {
const requestItems = request?.params.RequestItems || {}
for (const [tableName, operations] of Object.entries(requestItems)) {
if (!Array.isArray(operations)) continue
for (const operation of operations) {
if (operation?.PutRequest) {
const hash =
DynamoDb.calculatePutItemHash(tableName, operation.PutRequest.Item, this.getPrimaryKeyConfig())
if (hash) hashes.push(hash)
} else if (operation?.DeleteRequest) {
const hash = DynamoDb.calculateHashWithKnownKeys(tableName, operation.DeleteRequest.Key)
if (hash) hashes.push(hash)
}
}
}
break
}
}

for (const hash of hashes) {
span.addSpanPointer(DYNAMODB_PTR_KIND, SPAN_POINTER_DIRECTION.DOWNSTREAM, hash)
}
}

/**
* Parses primary key config from the `DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS` env var.
* Only runs when needed, and warns when missing or invalid config.
* @returns {Object|undefined} Parsed config from env var or undefined if empty/missing/invalid config.
*/
getPrimaryKeyConfig () {
if (this.dynamoPrimaryKeyConfig) {
// Return cached config if it exists
return this.dynamoPrimaryKeyConfig
}

const configStr = this._tracerConfig?.aws?.dynamoDb?.tablePrimaryKeys
if (!configStr) {
log.warn('Missing DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS env variable. ' +
'Please add your table\'s primary keys under this env variable.')
return
}

try {
const parsedConfig = JSON.parse(configStr)
const config = {}
for (const [tableName, primaryKeys] of Object.entries(parsedConfig)) {
if (Array.isArray(primaryKeys) && primaryKeys.length > 0 && primaryKeys.length <= 2) {
config[tableName] = new Set(primaryKeys)
} else {
log.warn(`Invalid primary key configuration for table: ${tableName}.` +
'Please fix the DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS env var.')
}
}

this.dynamoPrimaryKeyConfig = config
return config
} catch (err) {
log.warn('Failed to parse DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS:', err.message)
}
}

/**
* Calculates a hash for DynamoDB PutItem operations using table's configured primary keys.
* @param {string} tableName - Name of the DynamoDB table.
* @param {Object} item - Complete PutItem item parameter to be put.
* @param {Object.<string, Set<string>>} primaryKeyConfig - Mapping of table names to Sets of primary key names
* loaded from DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS.
* @returns {string|undefined} Hash combining table name and primary key/value pairs, or undefined if unable.
*/
static calculatePutItemHash (tableName, item, primaryKeyConfig) {
if (!tableName || !item) {
log.debug('Unable to calculate hash because missing required parameters')
return
}
if (!primaryKeyConfig) {
log.warn('Missing DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS env variable')
return
}
const primaryKeySet = primaryKeyConfig[tableName]
if (!primaryKeySet || !(primaryKeySet instanceof Set) || primaryKeySet.size === 0 || primaryKeySet.size > 2) {
log.warn(
`span pointers: failed to extract PutItem span pointer: table ${tableName} ` +
'not found in primary key names or the DD_AWS_SDK_DYNAMODB_TABLE_PRIMARY_KEYS env var was invalid.' +
'Please update the env var.'
)
return
}
const keyValues = extractPrimaryKeys(primaryKeySet, item)
if (keyValues) {
return generatePointerHash([tableName, ...keyValues])
}
}

/**
* Calculates a hash for DynamoDB operations that have keys provided (UpdateItem, DeleteItem).
* @param {string} tableName - Name of the DynamoDB table.
* @param {Object} keysObject - Object containing primary key/value attributes in DynamoDB format.
* (e.g., { userId: { S: "123" }, sortKey: { N: "456" } })
* @returns {string|undefined} Hash value combining table name and primary key/value pairs, or undefined if unable.
*
* @example
* calculateHashWithKnownKeys('UserTable', { userId: { S: "user123" }, timestamp: { N: "1234567" } })
*/
static calculateHashWithKnownKeys (tableName, keysObject) {
if (!tableName || !keysObject) {
log.debug('Unable to calculate hash because missing parameters')
return
}
const keyNamesSet = new Set(Object.keys(keysObject))
const keyValues = extractPrimaryKeys(keyNamesSet, keysObject)
if (keyValues) {
return generatePointerHash([tableName, ...keyValues])
}
}
}

module.exports = DynamoDb
2 changes: 1 addition & 1 deletion packages/datadog-plugin-aws-sdk/src/services/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const BaseAwsSdkPlugin = require('../base')
const log = require('../../../dd-trace/src/log')
const { generatePointerHash } = require('../../../dd-trace/src/util')
const { generatePointerHash } = require('../util')
const { S3_PTR_KIND, SPAN_POINTER_DIRECTION } = require('../../../dd-trace/src/constants')

class S3 extends BaseAwsSdkPlugin {
Expand Down
92 changes: 92 additions & 0 deletions packages/datadog-plugin-aws-sdk/src/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'

const crypto = require('crypto')
const log = require('../../dd-trace/src/log')

/**
* Generates a unique hash from an array of strings by joining them with | before hashing.
* Used to uniquely identify AWS requests for span pointers.
* @param {string[]} components - Array of strings to hash
* @returns {string} A 32-character hash uniquely identifying the components
*/
function generatePointerHash (components) {
// If passing S3's ETag as a component, make sure any quotes have already been removed!
const dataToHash = components.join('|')
const hash = crypto.createHash('sha256').update(dataToHash).digest('hex')
return hash.substring(0, 32)
}

/**
* Encodes a DynamoDB attribute value to Buffer for span pointer hashing.
* @param {Object} valueObject - DynamoDB value in AWS format ({ S: string } or { N: string } or { B: Buffer })
* @returns {Buffer|undefined} Encoded value as Buffer, or undefined if invalid input.
*
* @example
* encodeValue({ S: "user123" }) -> Buffer("user123")
* encodeValue({ N: "42" }) -> Buffer("42")
* encodeValue({ B: Buffer([1, 2, 3]) }) -> Buffer([1, 2, 3])
*/
function encodeValue (valueObject) {
if (!valueObject) {
return
}

try {
const type = Object.keys(valueObject)[0]
const value = valueObject[type]

switch (type) {
case 'S':
return Buffer.from(value)
case 'N':
return Buffer.from(value.toString())
case 'B':
return Buffer.isBuffer(value) ? value : Buffer.from(value)
default:
log.debug(`Found unknown type while trying to create DynamoDB span pointer: ${type}`)
}
} catch (err) {
log.debug(`Failed to encode value while trying to create DynamoDB span pointer: ${err.message}`)
}
}

/**
* Extracts and encodes primary key values from a DynamoDB item.
* Handles tables with single-key and two-key scenarios.
*
* @param {Set<string>} keySet - Set of primary key names.
* @param {Object} keyValuePairs - Object containing key/value pairs.
* @returns {Array|undefined} [key1Name, key1Value, key2Name, key2Value], or undefined if invalid input.
* key2 entries are empty strings in the single-key case.
* @example
* extractPrimaryKeys(new Set(['userId']), {userId: {S: "user123"}})
* // Returns ["userId", Buffer("user123"), "", ""]
* extractPrimaryKeys(new Set(['userId', 'timestamp']), {userId: {S: "user123"}, timestamp: {N: "1234}})
* // Returns ["timestamp", Buffer.from("1234"), "userId", Buffer.from("user123")]
*/
const extractPrimaryKeys = (keySet, keyValuePairs) => {
const keyNames = Array.from(keySet)
if (keyNames.length === 0) {
return
}

if (keyNames.length === 1) {
const value = encodeValue(keyValuePairs[keyNames[0]])
if (value) {
return [keyNames[0], value, '', '']
}
} else {
const [key1, key2] = keyNames.sort()
const value1 = encodeValue(keyValuePairs[key1])
const value2 = encodeValue(keyValuePairs[key2])
if (value1 && value2) {
return [key1, value1, key2, value2]
}
}
}

module.exports = {
generatePointerHash,
encodeValue,
extractPrimaryKeys
}
Loading

0 comments on commit f4bc89c

Please sign in to comment.