Skip to content

Commit

Permalink
Add transactWrite APIs to provide non-polymorphic build APIs within a…
Browse files Browse the repository at this point in the history
… transaction. (#23)
  • Loading branch information
tachyonics authored Sep 14, 2024
1 parent c578ef3 commit 3501167
Show file tree
Hide file tree
Showing 9 changed files with 736 additions and 22 deletions.
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,12 @@ let entryList: [TestTypeAWriteEntry] = [
]

try await table.bulkWrite(entryList)
//try await table.transactWrite(entryList) <<-- When implemented
```

Or alternatively executed within a DynamoDB transaction-

```swift
try await table.transactWrite(entryList)
```

and similarly for polymorphic queries-
Expand Down Expand Up @@ -384,20 +389,29 @@ let entryList: [TestPolymorphicWriteEntry] = [
]

try await table.polymorphicBulkWrite(entryList)
```

Or alternatively executed within a DynamoDB transaction-

```swift
try await table.polymorphicTransactWrite(entryList)
```

For transactions, you can additionally specify a set of constraints to be part of the transaction-

```swift
typealias TestTypeAStandardTransactionConstraintEntry = StandardTransactionConstraintEntry<TestTypeA>
let constraintList: [StandardTransactionConstraintEntry<TestTypeA>] = [
.required(existing: databaseItem3),
.required(existing: databaseItem4),
]

// Update when `transactWrite` API implemented
try await table.transactWrite(entryList, constraints: constraintList)
```

and similarly for polymorphic queries-

```swift
typealias TestTypeAStandardTransactionConstraintEntry = StandardTransactionConstraintEntry<TestTypeA>
typealias TestTypeBStandardTransactionConstraintEntry = StandardTransactionConstraintEntry<TestTypeB>

enum TestPolymorphicTransactionConstraintEntry: PolymorphicTransactionConstraintEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,31 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
throw DynamoDBTableError.batchErrorsReturned(errorCount: errorCount, messageMap: errorMap)
}

private func writeTransactionItems<AttributesType, ItemType>(
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
{
// if there are no items, there is nothing to update
guard entries.count > 0 else {
return
}

let entryStatements = try entries.map { entry -> DynamoDBClientTypes.ParameterizedStatement in
let statement = try self.entryToStatement(entry)

return DynamoDBClientTypes.ParameterizedStatement(statement: statement)
}

let requiredItemsStatements = try constraints.map { entry -> DynamoDBClientTypes.ParameterizedStatement in
let statement = try self.entryToStatement(entry)

return DynamoDBClientTypes.ParameterizedStatement(statement: statement)
}

let transactionInput = ExecuteTransactionInput(transactStatements: entryStatements + requiredItemsStatements)

_ = try await dynamodb.executeTransaction(input: transactionInput)
}

private func writeTransactionItems(
_ entries: [some PolymorphicWriteEntry], constraints: [some PolymorphicTransactionConstraintEntry]) async throws
{
Expand Down Expand Up @@ -179,6 +204,18 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
_ = try await dynamodb.executeTransaction(input: transactionInput)
}

func transactWrite(_ entries: [WriteEntry<some Any, some Any>]) async throws {
try await self.transactWrite(entries, constraints: [],
retriesRemaining: self.retryConfiguration.numRetries)
}

func transactWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>],
constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
{
try await self.transactWrite(entries, constraints: constraints,
retriesRemaining: self.retryConfiguration.numRetries)
}

func polymorphicTransactWrite(_ entries: [some PolymorphicWriteEntry]) async throws {
let noConstraints: [EmptyPolymorphicTransactionConstraintEntry] = []
return try await self.polymorphicTransactWrite(entries, constraints: noConstraints,
Expand All @@ -192,6 +229,113 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
retriesRemaining: self.retryConfiguration.numRetries)
}

private func transactWrite<AttributesType, ItemType>(
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>],
retriesRemaining: Int) async throws
{
let entryCount = entries.count + constraints.count

if entryCount > AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement {
throw DynamoDBTableError.transactionSizeExceeded(attemptedSize: entryCount,
maximumSize: AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement)
}

let result: Swift.Result<Void, DynamoDBTableError>
do {
try await self.writeTransactionItems(entries, constraints: constraints)

result = .success(())
} catch let exception as TransactionCanceledException {
guard let cancellationReasons = exception.properties.cancellationReasons else {
throw DynamoDBTableError.transactionCanceled(reasons: [])
}

let keys = entries.map(\.compositePrimaryKey) + constraints.map(\.compositePrimaryKey)

var isTransactionConflict = false
let reasons = try zip(cancellationReasons, keys).compactMap { cancellationReason, entryKey -> DynamoDBTableError? in
let key: StandardCompositePrimaryKey?
if let item = cancellationReason.item {
key = try DynamoDBDecoder().decode(.m(item))
} else {
key = nil
}

let partitionKey = key?.partitionKey ?? entryKey.partitionKey
let sortKey = key?.sortKey ?? entryKey.sortKey

// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ExecuteTransaction.html
switch cancellationReason.code {
case "None":
return nil
case "ConditionalCheckFailed":
return DynamoDBTableError.transactionConditionalCheckFailed(partitionKey: partitionKey,
sortKey: sortKey,
message: cancellationReason.message)
case "DuplicateItem":
return DynamoDBTableError.duplicateItem(partitionKey: partitionKey, sortKey: sortKey,
message: cancellationReason.message)
case "ItemCollectionSizeLimitExceeded":
return DynamoDBTableError.transactionSizeExceeded(attemptedSize: entryCount,
maximumSize: AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement)
case "TransactionConflict":
isTransactionConflict = true

return DynamoDBTableError.transactionConflict(message: cancellationReason.message)
case "ProvisionedThroughputExceeded":
return DynamoDBTableError.transactionProvisionedThroughputExceeded(message: cancellationReason.message)
case "ThrottlingError":
return DynamoDBTableError.transactionThrottling(message: cancellationReason.message)
case "ValidationError":
return DynamoDBTableError.transactionValidation(partitionKey: partitionKey, sortKey: sortKey,
message: cancellationReason.message)
default:
return DynamoDBTableError.transactionUnknown(code: cancellationReason.code, partitionKey: partitionKey,
sortKey: sortKey, message: cancellationReason.message)
}
}

if isTransactionConflict, retriesRemaining > 0 {
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
}

result = .failure(DynamoDBTableError.transactionCanceled(reasons: reasons))
} catch let exception as TransactionConflictException {
if retriesRemaining > 0 {
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
}

let reason = DynamoDBTableError.transactionConflict(message: exception.message)

result = .failure(DynamoDBTableError.transactionCanceled(reasons: [reason]))
}

let retryCount = self.retryConfiguration.numRetries - retriesRemaining
self.tableMetrics.transactWriteRetryCountRecorder?.record(retryCount)

switch result {
case .success:
return
case let .failure(failure):
throw failure
}
}

private func retryTransactWrite<AttributesType, ItemType>(
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>],
retriesRemaining: Int) async throws
{
// determine the required interval
let retryInterval = Int(self.retryConfiguration.getRetryInterval(retriesRemaining: retriesRemaining))

logger.warning(
"Transaction retried due to conflict. Remaining retries: \(retriesRemaining). Retrying in \(retryInterval) ms.")
try await Task.sleep(nanoseconds: UInt64(retryInterval) * millisecondsToNanoSeconds)

logger.trace("Reattempting request due to remaining retries: \(retryInterval)")
return try await self.transactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining - 1)
}

private func polymorphicTransactWrite(
_ entries: [some PolymorphicWriteEntry], constraints: [some PolymorphicTransactionConstraintEntry],
retriesRemaining: Int) async throws
Expand Down Expand Up @@ -259,13 +403,13 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
}

if isTransactionConflict, retriesRemaining > 0 {
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
return try await retryPolymorphicTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
}

result = .failure(DynamoDBTableError.transactionCanceled(reasons: reasons))
} catch let exception as TransactionConflictException {
if retriesRemaining > 0 {
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
return try await retryPolymorphicTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
}

let reason = DynamoDBTableError.transactionConflict(message: exception.message)
Expand All @@ -284,7 +428,7 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
}
}

private func retryTransactWrite(
private func retryPolymorphicTransactWrite(
_ entries: [some PolymorphicWriteEntry], constraints: [some PolymorphicTransactionConstraintEntry],
retriesRemaining: Int) async throws
{
Expand Down
5 changes: 5 additions & 0 deletions Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public protocol DynamoDBCompositePrimaryKeyTable {
* The transaction will comprise of the write entries specified in `entries`.
* The transaction will fail if the number of entries is greater than 100.
*/
func transactWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws

func polymorphicTransactWrite<WriteEntryType: PolymorphicWriteEntry>(
_ entries: [WriteEntryType]) async throws

Expand All @@ -147,6 +149,9 @@ public protocol DynamoDBCompositePrimaryKeyTable {
* with a specified version must exist regardless of if it will be written to by the transaction).
* The transaction will fail if the number of entries and constraints combined is greater than 100.
*/
func transactWrite<AttributesType, ItemType>(
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws

func polymorphicTransactWrite<WriteEntryType: PolymorphicWriteEntry, TransactionConstraintEntryType: PolymorphicTransactionConstraintEntry>(
_ entries: [WriteEntryType], constraints: [TransactionConstraintEntryType]) async throws

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public typealias ExecuteItemFilterType = @Sendable (String, String, String, Poly
-> Bool

public protocol InMemoryTransactionDelegate {
/**
Inject errors into a `transactWrite` call.
*/
func injectErrors<AttributesType, ItemType>(
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>],
table: InMemoryDynamoDBCompositePrimaryKeyTable) async throws -> [DynamoDBTableError]

/**
Inject errors into a `polymorphicTransactWrite` call.
*/
func injectErrors<WriteEntryType: PolymorphicWriteEntry,
TransactionConstraintEntryType: PolymorphicTransactionConstraintEntry>(
_ entries: [WriteEntryType], constraints: [TransactionConstraintEntryType],
Expand Down Expand Up @@ -98,6 +108,21 @@ public struct InMemoryDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimary
try await self.storeWrapper.updateItem(newItem: newItem, existingItem: existingItem)
}

public func transactWrite(_ entries: [WriteEntry<some Any, some Any>]) async throws {
try await self.transactWrite(entries, constraints: [])
}

public func transactWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>],
constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
{
// if there is a transaction delegate and it wants to inject errors
if let errors = try await transactionDelegate?.injectErrors(entries, constraints: constraints, table: self), !errors.isEmpty {
throw DynamoDBTableError.transactionCanceled(reasons: errors)
}

return try await self.storeWrapper.bulkWrite(entries, constraints: constraints, isTransaction: true)
}

public func polymorphicTransactWrite(_ entries: [some PolymorphicWriteEntry]) async throws {
let noConstraints: [EmptyPolymorphicTransactionConstraintEntry] = []
return try await self.polymorphicTransactWrite(entries, constraints: noConstraints)
Expand All @@ -120,7 +145,7 @@ public struct InMemoryDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimary
}

public func bulkWrite(_ entries: [WriteEntry<some Any, some Any>]) async throws {
try await self.storeWrapper.bulkWrite(entries)
try await self.storeWrapper.bulkWrite(entries, constraints: [], isTransaction: false)
}

public func bulkWriteWithFallback(_ entries: [WriteEntry<some Any, some Any>]) async throws {
Expand Down
Loading

0 comments on commit 3501167

Please sign in to comment.