From de3b9021b51f304abe91264f112ada82d2d6d812 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Thu, 1 Dec 2022 13:31:42 -0500 Subject: [PATCH 01/12] KAFKA-13709: Add docs for exactly-once support in Connect --- .../distributed/DistributedConfig.java | 9 +- docs/connect.html | 209 ++++++++++++++++++ 2 files changed, 215 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 9544b79411476..343151caf6724 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -232,9 +232,12 @@ public String toString() { public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " - + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones. "; - // TODO: https://issues.apache.org/jira/browse/KAFKA-13709 - // + "See the exactly-once source support documentation at [add docs link here] for more information on this feature."; + + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones.\n" + + "To enable exactly-once source support on a new cluster, set this property to '" + ExactlyOnceSourceSupport.ENABLED + "'. " + + "To enable support on an existing cluster, first set to '" + ExactlyOnceSourceSupport.PREPARING + "' on every worker in the cluster, " + + "then set to '" + ExactlyOnceSourceSupport.ENABLED + "'. A rolling upgrade may be used for both changes. " + + "See the exactly-once source support documentation " + + "for more information on this feature."; public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString(); private static Object defaultKeyGenerationAlgorithm(Crypto crypto) { diff --git a/docs/connect.html b/docs/connect.html index ba960ed5c222b..c1d44fe2aa3e4 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -369,6 +369,114 @@

Error Reportin # Tolerate all errors. errors.tolerance=all +

Exactly-once support

+ +

Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that support for exactly-once delivery is highly dependent on the type of connector being run. Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.

+ +

Sink connectors
+ +

If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property consumer.isolation.level to read_committed or, if running a version of Kafka Connect that supports it, using a connector client config override policy that allows the consumer.override.isolation.level property to be set to read_committed in individual connector configs. There are no additional ACL requirements.

+ +
Source connectors
+ +

If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.

+ +
Worker configuration
+ +

For new Connect clusters, set the exactly.once.source.support property to enabled in the worker config for each node in the cluster. For existing clusters, two rolling upgrades are necessary. During the first upgrade, the exactly.once.source.support property should be set to preparing, and during the second, it should be set to enabled.

+ +
ACL requirements
+ +

With exactly-once source support enabled, the principal for each Connect worker will require these ACLs:

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OperationResource TypeResource NameNote
WriteTransactionalIdconnect-cluster-${groupId}, where ${groupId} is the group.id of the cluster
DescribeTransactionalIdconnect-cluster-${groupId}, where ${groupId} is the group.id of the cluster
IdempotentWriteClusterID of the Kafka cluster that hosts the worker's config topicThe IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters
+ +

And the principal for each individual connector will require these ACLs:

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OperationResource TypeResource NameNote
WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group.id of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero)A wildcard prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.
DescribeTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group.id of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero)A wildcard prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.
WriteTopicOffsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.
ReadTopicOffsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.
DescribeTopicOffsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.
CreateTopicOffsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.Only necessary if the offsets topic for the connector does not exist yet
IdempotentWriteClusterID of the Kafka cluster that the source connector writes toThe IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters
+

8.3 Connector Development Guide

This guide describes how developers can write new connectors for Kafka Connect to move data between Kafka and other systems. It briefly reviews a few key concepts and then describes how to create a simple connector.

@@ -593,6 +701,107 @@
Resuming from Previous Off

Of course, you might need to read many keys for each of the input streams. The OffsetStorageReader interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.

+
Exactly-once source connectors
+ +
Supporting exactly-once
+ +

With the passing of KIP-618, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.

+ +
Defining transaction boundaries
+ +

By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its poll method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the transaction.boundary property to connector in the config for the connector.

+ +

If enabled, the connector's tasks will have access to a TransactionContext from their SourceTaskContext, which they can use to control when transactions are aborted and committed.

+ +

For example, to commit a transaction at least every ten records:

+ +
+private int recordsSent;
+
+@Override
+public void start(Map<String, String> props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List<SourceRecord> poll() {
+    List<SourceRecord> records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+}
+
+ +

Or to commit a transaction for exactly every tenth record:

+ +
+private int recordsSent;
+
+@Override
+public void start(Map<String, String> props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List<SourceRecord> poll() {
+    List<SourceRecord> records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+}
+
+ +

Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.

+ +

Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the TransactionContext returned by context.transactionContext() will be null.

+ +
Validation APIs
+ +

A few additional preflight validation APIs can be implemented by source connector developers.

+ +

Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the exactly.once.support property to required in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the exactlyOnceSupport method on the connector.

+ +

If a connector doesn't support exactly-once, it should still implement this method, to let users know for certain that it cannot provide exactly-once delivery guarantees:

+ +
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+    // This connector cannot provide exactly-once delivery guarantees under any conditions
+    return ExactlyOnceSupport.UNSUPPORTED;
+}
+
+ +

Otherwise, a connector should examine the configuration, and return ExactlyOnceSupport.SUPPORTED if it can provide exactly-once delivery guarantees:

+ +
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+    // This connector can always provide exactly-once delivery guarantees
+    return ExactlyOnceSupport.SUPPORTED;
+}
+
+ +

Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, via the canDefineTransactionBoundaries method:

+ +
+@Override
+public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> props) {
+    // This connector can always define its own transaction boundaries
+    return ConnectorTransactionBoundaries.SUPPORTED;
+}
+
+ +

This method need only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.

+

Dynamic Input/Output Streams

Kafka Connect is intended to define bulk data copying jobs, such as copying an entire database rather than creating many jobs to copy each table individually. One consequence of this design is that the set of input or output streams for a connector can vary over time.

From df33fc62a5304635aeea5cac550a3634adad7832 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:11:29 -0500 Subject: [PATCH 02/12] Update docs/connect.html Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com> --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index c1d44fe2aa3e4..7606a9122a8fa 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -371,7 +371,7 @@

Error Reportin

Exactly-once support

-

Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that support for exactly-once delivery is highly dependent on the type of connector being run. Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.

+

Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that support for exactly-once delivery is highly dependent on the type of connector you run. Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.

Sink connectors
From aaaf452e4dcd363e49fd491d54e499d6a128fa1a Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:16:28 -0500 Subject: [PATCH 03/12] Update docs/connect.html Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com> --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index 7606a9122a8fa..a2c287318d4ea 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -379,7 +379,7 @@
Sink connect
Source connectors
-

If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.

+

If a source connector supports exactly-once delivery, you must configure your Connect cluster to enable framework-level support for exactly-once delivery for source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.

Worker configuration
From 0450e6ad4d79c279fe24dc769b553b1a79dfc0d9 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:17:03 -0500 Subject: [PATCH 04/12] Update docs/connect.html Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com> --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index a2c287318d4ea..576462fffbaec 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -387,7 +387,7 @@
Worker configuration
ACL requirements
-

With exactly-once source support enabled, the principal for each Connect worker will require these ACLs:

+

With exactly-once source support enabled, the principal for each Connect worker will require the following ACLs:

From 870dff7f062875a1510c77397343c3659551d018 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:21:01 -0500 Subject: [PATCH 05/12] Update docs/connect.html Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com> --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index 576462fffbaec..944533059ea7b 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -420,7 +420,7 @@
ACL requirements
-

And the principal for each individual connector will require these ACLs:

+

And the principal for each individual connector will require the following ACLs:

From 8d6c7d98722e4fc883e61327c49db1c37258aab6 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:21:09 -0500 Subject: [PATCH 06/12] Update docs/connect.html Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com> --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index 944533059ea7b..f3112e8345884 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -790,7 +790,7 @@
Validation APIs
} -

Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, via the canDefineTransactionBoundaries method:

+

Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, using the canDefineTransactionBoundaries method:

 @Override

From 754493789b8267d14058ae291ca6a2facd4d1a5e Mon Sep 17 00:00:00 2001
From: Chris Egerton 
Date: Fri, 9 Dec 2022 12:21:24 -0500
Subject: [PATCH 07/12] Update docs/connect.html

Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com>
---
 docs/connect.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/connect.html b/docs/connect.html
index f3112e8345884..90d1145ce8281 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -800,7 +800,7 @@ 
Validation APIs
}
-

This method need only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.

+

This method should only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.

Dynamic Input/Output Streams

From b56efa58c7c5dc40d25d3f28301fce7427403962 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:21:43 -0500 Subject: [PATCH 08/12] Update docs/connect.html Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com> --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index 90d1145ce8281..023e0f24b8b83 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -770,7 +770,7 @@
Validation APIs

Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the exactly.once.support property to required in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the exactlyOnceSupport method on the connector.

-

If a connector doesn't support exactly-once, it should still implement this method, to let users know for certain that it cannot provide exactly-once delivery guarantees:

+

If a connector doesn't support exactly-once delivery, it should still implement this method to let users know for certain that it cannot provide exactly-once delivery guarantees:

 @Override

From f6089eeee8cae0553aa600f7e1bf91780134fa85 Mon Sep 17 00:00:00 2001
From: Chris Egerton 
Date: Fri, 9 Dec 2022 12:21:53 -0500
Subject: [PATCH 09/12] Update docs/connect.html

Co-authored-by: tikimims <39631000+tikimims@users.noreply.github.com>
---
 docs/connect.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/connect.html b/docs/connect.html
index 023e0f24b8b83..0a17ba58f2149 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -762,7 +762,7 @@ 
Defining transaction boundaries

Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.

-

Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the TransactionContext returned by context.transactionContext() will be null.

+

Note that if the user has not enabled connector-defined transaction boundaries in the connector configuration, the TransactionContext returned by context.transactionContext() will be null.

Validation APIs
From 44e349b9aeaffeeb4ec8de7661ba8373a675435f Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:23:19 -0500 Subject: [PATCH 10/12] Tweak description for exactly.once.source.support property in DistributedConfig --- .../kafka/connect/runtime/distributed/DistributedConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 343151caf6724..8c75f3a4076dc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -236,8 +236,8 @@ public String toString() { + "To enable exactly-once source support on a new cluster, set this property to '" + ExactlyOnceSourceSupport.ENABLED + "'. " + "To enable support on an existing cluster, first set to '" + ExactlyOnceSourceSupport.PREPARING + "' on every worker in the cluster, " + "then set to '" + ExactlyOnceSourceSupport.ENABLED + "'. A rolling upgrade may be used for both changes. " - + "See the exactly-once source support documentation " - + "for more information on this feature."; + + "For more information on this feature, see the " + + "exactly-once source support documentation."; public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString(); private static Object defaultKeyGenerationAlgorithm(Crypto crypto) { From 1d8df8253917e30e4997f8e9380105f2d3d8f09a Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 9 Dec 2022 12:24:50 -0500 Subject: [PATCH 11/12] Tweak sink connector exactly-once support docs --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index 0a17ba58f2149..29fd4637e9dc9 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -375,7 +375,7 @@

Exactly-once support
Sink connectors
-

If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property consumer.isolation.level to read_committed or, if running a version of Kafka Connect that supports it, using a connector client config override policy that allows the consumer.override.isolation.level property to be set to read_committed in individual connector configs. There are no additional ACL requirements.

+

If a sink connector supports exactly-once delivery, to enable exactly-once delivery at the Connect worker level, you must ensure its consumer group is configured to ignore records in aborted transactions. You can do this by setting the worker property consumer.isolation.level to read_committed or, if running a version of Kafka Connect that supports it, using a connector client config override policy that allows the consumer.override.isolation.level property to be set to read_committed in individual connector configs. There are no additional ACL requirements.

Source connectors
From 841720ba397c7f43e3f3ef6b6dfb54cc9372277c Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 14 Dec 2022 14:02:16 -0500 Subject: [PATCH 12/12] Add missing word, add return statements to code examples for SourceTask::poll --- docs/connect.html | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index 29fd4637e9dc9..fca18ddf7d135 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -705,7 +705,7 @@
KIP-618, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.

+

With the passing of KIP-618, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.

Defining transaction boundaries
@@ -736,6 +736,7 @@
Defining transaction boundaries
this.recordsSent = 0; this.context.transactionContext().commitTransaction(); } + return records; }
@@ -757,6 +758,7 @@
Defining transaction boundaries
this.context.transactionContext().commitTransaction(record); } } + return records; }