Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NoSuchTransaction in reactive MongoDB client when working with transactions #4804

Closed
fremarti opened this issue Oct 9, 2024 · 31 comments
Closed
Assignees
Labels
type: bug A general bug

Comments

@fremarti
Copy link

fremarti commented Oct 9, 2024

Setup

Spring Boot 3.3.4 with org.springframework.boot:spring-boot-starter-webflux and org.springframework.boot:spring-boot-starter-data-mongodb-reactive. MongoDB version is 6.0.18.

MongoDB Config

Reactive mongo client is configured in configuration to activate transactional feature in mongo templates:

@Configuration
@EnableConfigurationProperties(MongoProperties::class)
class MongoConfiguration(
    private val mongoProperties: MongoProperties,
) : AbstractReactiveMongoConfiguration() {
    ...
    @Bean
    fun transactionManager(
        factory: ReactiveMongoDatabaseFactory?,
        properties: MongoProperties,
    ): ReactiveMongoTransactionManager {
        return ReactiveMongoTransactionManager(
            factory!!,
            TransactionOptions.builder().readPreference(ReadPreference.valueOf(properties.readPreference)).build(),
        )
    }
    ...

Docker Setup

For local and integration testing a mongodb is configured using docker compose. The db is configured as single node replica set. The here mentioned init script just runs rs.initiate(...) to register replica set. In the application properties the according connection string is set with mongodb://localhost:27017/?replicaSet=rs0.

services:
  mongo:
    image: mongo:6.0.18
    ports:
      - "27017:27017"
    volumes:
      - ./bin/mongodb-init-replica-set.sh:/docker-entrypoint-initdb.d/mongodb-init-replica-set.sh:ro
    command: ["mongod", "--replSet", "rs0", "--bind_ip_all"]
  ...

Application Code

I have an endpoint PUT /foo which should update multiple entries in a single collection. This update should be transactional. Before updating the entries, all entries are fetched by ids and some validation is done before updating the entries:

// FooController.kt
@RestController
class FooController(private val fooUseCase: FooUseCase) {
    ...
    @Transactional(label = ["mongo:readPreference=PRIMARY"])
    @PutMapping(
        value = ["/foo"],
        consumes = [MediaType.APPLICATION_JSON_VALUE],
    )
    fun foo(@RequestBody request: RequestDto): Mono<Void> {
        return fooUseCase
            .process(request)
            .doOnError { error ->
                logger.error("Failed", error)
            }
    }
}

// FooUseCase.kt
@Service
class FooUseCase(private val repo: FooRepository, private val factory: FooFactory) {
    fun process(request: RequestDto): Mono<Void> {
        return repo
            .findAllById(request.ids)
            .collectList()
            .flatMap { entries ->
                // Do some checks
                repo
                    .saveAll(entries.map { factory.from(request, it) }
                    .then()
            }
    }
}

Integration Tests

To test the transactional behavior I wrote a Spring Boot integration test. I leveraged coroutines to fire 100 requests concurrently against the endpoint using the web test client to make sure there are no side-effects.

@SpringBootTest(
    webEnvironment = RANDOM_PORT,
    properties = ["server.error.include-stacktrace=always"]
)
class FooIntegrationTest {

    @Autowired
    lateinit var webTestClient: WebTestClient

    @Autowired
    lateinit var fooUsecase: FooUseCase

    @Autowired
    lateinit var repo: FooRepository

    // Clean-up in @BeforeEach and @AfterEach

    @Test
    fun `should rollback`() = runTest {
        // 1. Store entries which should be updated in db
        // 2. Assert entries are there
        // 3. Run db command to set validator rule for specific id to enforce exception on db request without the need to mock something

        // 4. Run test using web client:
        val responseSpecs = (1..100).map {
            async {
                webTestClient
                    .put()
                    .uri {
                        it.path("/foo")
                    }
                    .body(Mono.just(request), RequestDto::class.java)
                    .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                    .exchange()
                    .expectStatus().is5xxServerError
                    .expectBody()
                    .jsonPath("$.trace").value<String> { stackTrace ->
                        stackTrace.shouldContain("DataIntegrityViolationException")
                        stackTrace.shouldNotContain("NoSuchTransaction")
                    }

            }
        }
        responseSpecs.awaitAll()

        // 5. Validate original entries in db are not altered
    }

Unfortunately, I see side effects in the transactional behavior. On a random basis there is a MongoTransactionException thrown with NoSuchTransaction instead of the expected DataIntegrityViolationException. Therefore this test fails and I cannot explain why that is. Can anybody help?

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Oct 9, 2024
@fremarti
Copy link
Author

fremarti commented Oct 9, 2024

What I forgot to mention:
Originally, I had the transactional annotation attached to the service method. This is when I recognized the side effects, so I adjusted that.
Besides the option with the @Transactional annotation I also tried to use the TransactionalOperator and wrapped the according chains in transactionOperator.execute { ... }, but didn't see any difference.

@fremarti
Copy link
Author

fremarti commented Oct 10, 2024

I either see that the transaction has already been aborted which imo means that requests are sharing a transaction:

org.springframework.data.mongodb.MongoTransactionException: Command failed with error 251 (NoSuchTransaction): 'Transaction with { txnNumber: 67 } has been aborted.' on server localhost:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Transaction with { txnNumber: 67 } has been aborted.", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1728556211, "i": 10}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1728556211, "i": 10}}}

Or I see NoSuchTransaction with the message that the txNumber is not matching which seams like it is not finding the correct transaction in the Reactor context.

Error disappears when I set max connection pool size in the mongo config to 1.

@christophstrobl
Copy link
Member

Thank you @fremarti for getting in touch. That's a hard problem to spot - we'll look into it - thanks already for the pool size hint.

@christophstrobl
Copy link
Member

@fremarti can you please package things up and share a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem. Thank you!

@DavidFischer1010
Copy link

minimal-sample.zip

@christophstrobl we have created a minimal-sample for you. The tests in the IntergrationTest class are flapping due to the before mentioned issue.

  • The test named 'should save AnyDocuments transactionally' is green when run seperately, but sometimes red when run together in bulk.
  • The transaction rollback test always emits NoSuchTransaction messages in the test execution log.

If you got further questions, please don't hesitate to ask.
We would be happy to hearing from you, soon.

@DavidFischer1010
Copy link

minimal-sample.zip

We have changed up the sample a little bit to simplify the demo even further. We now have a setup with one test updating different documents in separate requests and transactions. We are still getting NoSuchTransaction errors without any custom configuration.

Kind Regards
David

@LennardWolf2023
Copy link

minimal-sample-template.zip

In order to analyze the error, David, Frederik and I have also replaced the Repository pattern with the direct usage of the reactive mongo template. Please see the modified minimal sample attached. We were not able to reproduce the NoSuchTransaction exception when using only reactive mongo template for injection. However we were not quite sure, if this is just the case due to avoiding spring-data-mongodb error handling by usage of the template or if the error does not occur.

Kind regards
Lennard (colleague of Frederik and David)

@christophstrobl
Copy link
Member

👍 - thank you both - it's on this weeks todo list.

@christophstrobl
Copy link
Member

From a Spring Data point of view it looks like things are working as expected.
Judging from the log output it seems as if the drivers internal transaction number counter gets out of sync at some point, leading to the encountered failure where the given transaction number (txNumber in the logs) does not match the one within the ServerSession.

o.s.d.m.ReactiveMongoTransactionManager  : About to start transaction for session 
[
    ClientSessionPublisherImpl@b55eb88 id = {"id": {"$binary": {"base64": "UYdiB/LBQW+xdCuRI/XQOw==", "subType": "04"}}}, 
    causallyConsistent = true, 
    txActive = false, 
    txNumber = 32, 
    closed = false, 
    clusterTime = null
]

o.s.d.m.ReactiveMongoTransactionManager  : Started transaction for session 
[
    ClientSessionPublisherImpl@b55eb88 id = {"id": {"$binary": {"base64": "UYdiB/LBQW+xdCuRI/XQOw==", "subType": "04"}}},
    causallyConsistent = true,
    txActive = true,
    txNumber = 33,
    closed = false,
    clusterTime = null
].
Command failed with error 251 (NoSuchTransaction): 'Given transaction number 33 does not match any in-progress transactions. 
The active transaction number is 32' on server 127.0.0.1:27017. 
The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 33 does not match any in-progress transactions. The active transaction number is 32", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730184660, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730184660, "i": 11}}}
o.s.d.m.ReactiveMongoTransactionManager  : Initiating transaction rollback
o.s.d.m.ReactiveMongoTransactionManager  : About to abort transaction for session 
[
    ClientSessionPublisherImpl@b55eb88 id = {"id": {"$binary": {"base64": "UYdiB/LBQW+xdCuRI/XQOw==", "subType": "04"}}},
    causallyConsistent = true,
    txActive = true,
    txNumber = 33,
    closed = false,
    clusterTime = {"clusterTime": {"$timestamp": {"t": 1730184660, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}
]

@jyemin could you have a look/share some thoughts from the drivers perspective?

@jyemin
Copy link
Contributor

jyemin commented Oct 29, 2024

@christophstrobl will have a look, but wanted to get your thoughts on this comment:

We were not able to reproduce the NoSuchTransaction exception when using only reactive mongo template for injection. However we were not quite sure, if this is just the case due to avoiding spring-data-mongodb error handling by usage of the template or if the error does not occur.

Does it suggest anything to you? I'm not clear how the two scenarios differ in practice.

@christophstrobl
Copy link
Member

christophstrobl commented Oct 30, 2024

@jyemin For the ReactiveMongoTransactionManager the two scenarios should not really differ.
We've seen such issues, that only manifest in larger/more complex chains, in other projects as well. Might be caused by Reactor switching Threads and ServerSessionImpl#transactionNumber not being volatile.

@jyemin
Copy link
Contributor

jyemin commented Oct 30, 2024

@DavidFischer1010 I tried reproducing the error using https://github.com/user-attachments/files/17540925/minimal-sample.zip and was unsuccessful. Here's the steps I followed:

  1. Downloaded MongoDB 7.0.1
  2. Started a single node replica set on port 27017
  3. Download the project and loaded it into IntelliJ
  4. Configured IntelliJ to use Java 17 for both Gradle and the project itself
  5. Ran the com.example.demo.IntegrationTest#should process multiple save AnyDocument requests test 5 times from IntelliJ. All runs completed successfully.

Can you shed any light on what you are doing differently?

@fremarti
Copy link
Author

@jyemin we used MongoDB 6.0.18 and Java 21, but the problem should also occur with your version setup as I just tried it. When I run the tests approx. every second run fails. Weird. Can you try to run the test more often to verify that it is green all the time?

@jyemin
Copy link
Contributor

jyemin commented Oct 30, 2024

I just tried it 10 times in a row and every run is green. Just to confirm, I'm using the zip file that contains this configuration:

@Configuration
@EnableReactiveMongoRepositories
class MongoConfiguration : AbstractReactiveMongoConfiguration() {

    override fun getDatabaseName(): String {
        return "anyname"
    }

    @Bean
    fun transactionManager(factory: ReactiveMongoDatabaseFactory): ReactiveMongoTransactionManager {
        return ReactiveMongoTransactionManager(factory)
    }
}

Is that the right one?

@fremarti
Copy link
Author

Yes, that is correct. I did exactly the same. I downloaded the https://github.com/user-attachments/files/17540925/minimal-sample.zip demo project and ran that with the exact same versions as you in Intellij. When I replace @Test with @RepeatedTest(50) I get the following result:
Screenshot 2024-10-30 at 15 55 15

We also confirmed that on a second machine.
Did you use the docker-compose setup from the minimal example with the given init script for the replica set?

@fremarti
Copy link
Author

I used temurin-17.0.13+11 to run the test. A failed test stack trace looks likes this:

2024-10-30T16:15:28.409+01:00 ERROR 92039 --- [      Thread-10] a.w.r.e.AbstractErrorWebExceptionHandler : [a6c7130e-5]  500 Server Error for HTTP POST "/foo"

org.springframework.data.mongodb.MongoTransactionException: Command failed with error 251 (NoSuchTransaction): 'Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4' on server 127.0.0.1:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730301327, "i": 10}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730301327, "i": 9}}}
	at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:130) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ Handler com.example.demo.FooController#foo(AnyDocumentRequest) [DispatcherHandler]
	*__checkpoint ⇢ HTTP POST "/foo" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:130) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
		at org.springframework.data.mongodb.core.ReactiveMongoTemplate.potentiallyConvertRuntimeException(ReactiveMongoTemplate.java:2768) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
		at org.springframework.data.mongodb.core.ReactiveMongoTemplate.lambda$translateException$99(ReactiveMongoTemplate.java:2751) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
		at reactor.core.publisher.Mono.lambda$onErrorMap$29(Mono.java:3862) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.10.jar:3.6.10]
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:205) ~[reactor-core-3.6.10.jar:3.6.10]
		at com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.lambda$sinkToCallback$33(MongoOperationPublisher.java:520) ~[mongodb-driver-reactivestreams-5.0.1.jar:na]
		at com.mongodb.reactivestreams.client.internal.OperationExecutorImpl.lambda$execute$9(OperationExecutorImpl.java:126) ~[mongodb-driver-reactivestreams-5.0.1.jar:na]
		at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.operation.AsyncOperationHelper.lambda$exceptionTransformingCallback$17(AsyncOperationHelper.java:330) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier$RetryingCallback.onResult(RetryingAsyncCallbackSupplier.java:118) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$9(MixedBulkWriteOperation.java:339) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:73) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:62) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$7(MixedBulkWriteOperation.java:333) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.lambda$executeAsync$0(DefaultServer.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.CommandProtocolImpl.lambda$executeAsync$0(CommandProtocolImpl.java:86) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.lambda$sendAndReceiveAsync$1(DefaultConnectionPool.java:774) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.UsageTrackingInternalConnection.lambda$sendAndReceiveAsync$1(UsageTrackingInternalConnection.java:150) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection.lambda$sendCommandMessageAsync$0(InternalStreamConnection.java:534) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:826) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:789) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
		at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
		at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160) ~[na:na]
		at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573) ~[na:na]
		at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) ~[na:na]
		at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) ~[na:na]
		at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:122) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:111) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:93) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:779) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:763) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
		at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
		at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
		at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:447) ~[na:na]
		at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:195) ~[na:na]
		at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217) ~[na:na]
		at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312) ~[na:na]
		at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113) ~[na:na]
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
		at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4' on server 127.0.0.1:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730301327, "i": 10}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730301327, "i": 9}}}
	at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection.lambda$sendCommandMessageAsync$0(InternalStreamConnection.java:523) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:826) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:789) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
	at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573) ~[na:na]
	at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) ~[na:na]
	at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) ~[na:na]
	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:122) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:111) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:93) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:779) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:763) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:447) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:195) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217) ~[na:na]
	at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312) ~[na:na]
	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]

2024-10-30T16:15:28.415+01:00 ERROR 92039 --- [    Test worker] o.s.t.w.reactive.server.ExchangeResult   : Request details for assertion failure:

> POST http://localhost:62932/foo
> accept-encoding: [gzip]
> user-agent: [ReactorNetty/1.1.22]
> host: [localhost:62932]
> WebTestClient-Request-Id: [5]
> Accept: [application/json]
> Content-Type: [application/json]
> Content-Length: [32]

{"list":[{"id":"3"},{"id":"4"}]}

< 500 INTERNAL_SERVER_ERROR Internal Server Error
< Content-Type: [application/json]
< Content-Length: [129]

{"timestamp":"2024-10-30T15:15:28.409+00:00","path":"/foo","status":500,"error":"Internal Server Error","requestId":"a6c7130e-5"}


Status
Expected :200 OK
Actual   :500 INTERNAL_SERVER_ERROR
<Click to see difference>

java.lang.AssertionError: Status expected:<200 OK> but was:<500 INTERNAL_SERVER_ERROR>
	at org.springframework.test.util.AssertionErrors.fail(AssertionErrors.java:59)
	at org.springframework.test.util.AssertionErrors.assertEquals(AssertionErrors.java:122)
	at org.springframework.test.web.reactive.server.StatusAssertions.lambda$assertStatusAndReturn$4(StatusAssertions.java:236)
	at org.springframework.test.web.reactive.server.ExchangeResult.assertWithDiagnostics(ExchangeResult.java:232)
	at org.springframework.test.web.reactive.server.StatusAssertions.assertStatusAndReturn(StatusAssertions.java:236)
	at org.springframework.test.web.reactive.server.StatusAssertions.isOk(StatusAssertions.java:68)
	at com.example.demo.IntegrationTest.should process multiple save AnyDocument requests(IntegrationTest.kt:88)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
	at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
	at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

@jyemin
Copy link
Contributor

jyemin commented Oct 30, 2024

Thanks. Using @RepeatedTest(50) I can reproduce the issue. It fails 9 of the 50 executions.

@christophstrobl I also tried with a locally modified version of mongodb-driver-core where I changed com.mongodb.internal.session.ServerSessionPool.ServerSessionImpl#transactionNumber to volatile, but it had no effect. So something else must be going on. Will continue to investigate.

@jyemin
Copy link
Contributor

jyemin commented Oct 30, 2024

I enabled debug logging for org.mongodb.driver.protocol.command and I think I can now explain what's going on.

The application is calling the method SimpleReactiveMongoRepository#saveAll(java.lang.Iterable<S>) to save the two entities. Because EntityInformation#isNew returns false, that method in turn executes this line:

Flux.fromIterable(entities).flatMap(this::save)

The interesting part here is that the save operations are all executed concurrently, and because the execution has been made transactional, they all execute in the same transaction context. This ultimately results in the following two update commands being sent to the server concurrently:

{"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730310127, "i": 8}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "588FpS8YSsuL3Ev0Oizuww==", "subType": "04"}}}, "txnNumber": 50, "startTransaction": true, "autocommit": false, "updates": [{"q": {"_id": "1"}, "u": {"_id": "1", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
{"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730310127, "i": 8}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "588FpS8YSsuL3Ev0Oizuww==", "subType": "04"}}}, "txnNumber": 50, "autocommit": false, "updates": [{"q": {"_id": "2"}, "u": {"_id": "2", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}

Because the commands are both part of the same transaction, they have the same lsid and txnNumber, but only the first has startTransaction: true, which is required by the server for the first command in any transaction. But because these commands are executed concurrently, occasionally the second one will arrive before the first, and since the second one does not include startTransaction: true, the server will, correctly, reply with a NoSuchTransaction error.

That's why the documentation for com.mongodb.reactivestreams.client.MongoCluster#startSession() has the following admonition:

Note: A ClientSession instance can not be used concurrently in multiple operations.

@christophstrobl, given this, I think there's a good argument that the bug is in that saveAll method, because it's executing operations concurrently using the same session. Instead of using flatMap to update each document independently, could it perhaps use ReactiveBulkOperations to do all the updates at once, similar to what it does for inserts when EntityInformation#isNew returns true (by calling ReactiveMongoOperations#insert(Collection<? extends T>, String))?

@christophstrobl
Copy link
Member

Thank you @jyemin for your findings. I'll update the save operation and get back to you.

@christophstrobl
Copy link
Member

My bad - there was indeed a flatMap in that particular code path where it should have been flatMapSequential.
Thank you all for your time ❤️ !
#4824 will fix the issue - snapshot builds for 4.4.x-GH-4804-SNAPSHOT should be available soon if you want to give it a try.

@jyemin
Copy link
Contributor

jyemin commented Oct 31, 2024

Thanks @christophstrobl . flatMapSequential seems correct, but the performance will be poor compared to the insert, which is done in bulk. Is there any way that the code could take advantage of MongoCollection#bulkWrite here, so there is just one database round trip for all the updates?

@christophstrobl
Copy link
Member

there are differences in our event publication & callback handling. If one wants to do true batched insert it needs to be done via the template and ReactiveBulkOperations.

@fremarti
Copy link
Author

fremarti commented Nov 1, 2024

Thanks for your help with this and the quick solution!

@fremarti
Copy link
Author

fremarti commented Nov 1, 2024

I just tried to apply the fix and still get the same error:

// FooController.kt
@PostMapping(value = ["/foo"], consumes = [MediaType.APPLICATION_JSON_VALUE])
@Transactional
fun foo(@RequestBody request: AnyDocumentRequest): Mono<Void> {
    return Flux.fromIterable(request.list)
        .flatMapSequential { anyDocumentRepository.save(AnyDocument(it.id)) }
        .then()
}

I also enabled debug logging for org.mongodb.driver.protocol.command.
It seems that race conditions can still occur. Here e.g. the order of command execution seems correct (according to the logs), but according to the timestamp they seem to be executed in about the same moment.

2024-11-01T10:39:35.317+01:00 DEBUG 26060 --- [tter-3-thread-1] org.mongodb.driver.protocol.command      : Command "update" started on database "anyname" using a connection with driver-generated ID 6 and server-generated ID 60 to 127.0.0.1:27017. The request ID is 56 and the operation ID is 54. Command: {"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "Ux/pvBjaRxK7kXC0EqqCNA==", "subType": "04"}}}, "txnNumber": 13, "startTransaction": true, "autocommit": false, "updates": [{"q": {"_id": "1"}, "u": {"_id": "1", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
2024-11-01T10:39:35.317+01:00 DEBUG 26060 --- [tter-3-thread-1] org.mongodb.driver.protocol.command      : Command "update" started on database "anyname" using a connection with driver-generated ID 5 and server-generated ID 59 to 127.0.0.1:27017. The request ID is 57 and the operation ID is 55. Command: {"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "Ux/pvBjaRxK7kXC0EqqCNA==", "subType": "04"}}}, "txnNumber": 13, "autocommit": false, "updates": [{"q": {"_id": "2"}, "u": {"_id": "2", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
2024-11-01T10:39:35.319+01:00 DEBUG 26060 --- [       Thread-5] org.mongodb.driver.protocol.command      : Command "update" succeeded on database "anyname" in 1.959125 ms using a connection with driver-generated ID 6 and server-generated ID 60 to 127.0.0.1:27017. The request ID is 56 and the operation ID is 54. Command reply: {"n": 1, "electionId": {"$oid": "7fffffff0000000000000002"}, "opTime": {"ts": {"$timestamp": {"t": 1730453975, "i": 41}}, "t": 2}, "upserted": [{"index": 0, "_id": "1"}], "nModified": 0, "ok": 1.0, "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730453975, "i": 41}}}
2024-11-01T10:39:35.320+01:00 DEBUG 26060 --- [      Thread-14] org.mongodb.driver.protocol.command      : Command "update" failed on database "anyname" in 2.354958 ms using a connection with driver-generated ID 5 and server-generated ID 59 to 127.0.0.1:27017. The request ID is 57 and the operation ID is 55.

com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Given transaction number 13 does not match any in-progress transactions. The active transaction number is 12' on server 127.0.0.1:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 13 does not match any in-progress transactions. The active transaction number is 12", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730453975, "i": 41}}}

What do you think?

@fremarti
Copy link
Author

fremarti commented Nov 1, 2024

I can get it fixed if I use concatMap instead of flatMapSequential. As I understand this ensures that not only the subscription process is sequentially, but also the result emission and therefore the result order is ensured:

@PostMapping(value = ["/foo"], consumes = [MediaType.APPLICATION_JSON_VALUE])
@Transactional
fun foo(@RequestBody request: AnyDocumentRequest): Mono<Void> {
    return Flux.fromIterable(request.list)
        .concatMap { anyDocumentRepository.save(AnyDocument(it.id)) }
        .then()
}

@jyemin
Copy link
Contributor

jyemin commented Nov 1, 2024

There are differences in our event publication & callback handling. If one wants to do true batched insert it needs to be done via the template and ReactiveBulkOperations.

I'm not sure I understand this, @christophstrobl . The current code looks like this:

return source.stream().allMatch(entityInformation::isNew) ? //
		mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : //
		Flux.fromIterable(entities).flatMap(this::save);

IIUC the insert is done in bulk, so couldn't the save be done in bulk as well? Is the issue that there is no method in ReactiveMongoOperations like:

<T> Flux<T> save(Collection<? extends T> batchToSave, String collectionName);

that translates to MongoCollection#bulkWrite?

I'm just concerned because it might surprise users of saveAll that inserts execute in bulk while upserts execute sequentially.

@jyemin
Copy link
Contributor

jyemin commented Nov 1, 2024

I did a bit more digging, and now I see that the implementation of

<T> Flux<T> insert(Collection<? extends T> batchToSave, String collectionName);

in ReactiveMongoTemplate also inserts each document individually. However, it also uses flatMapSequential. I suspect these also need to use concatMap in order to be correct when executed within a transaction.

But this change is going to have potentially significant effects on performance, so it's worrisome.

@christophstrobl
Copy link
Member

@jyemin for save operations of versioned entities we need to make sure a replace can only happen if the db version still matches which is something we cannot do in bulk. Generally speaking the idea of spring data repository does not really fit bulk operations.

Maybe @violetagg & @chemicL can give some advise on flatMapSquential vs. concatMap for the given scenario.

@jyemin
Copy link
Contributor

jyemin commented Nov 4, 2024

for save operations of versioned entities we need to make sure a replace can only happen if the db version still matches which is something we cannot do in bulk.

Why can't this be done in bulk? Is the issue that you need to report, separately for each document, whether there actually was a version mismatch?

@mp911de mp911de added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged labels Nov 12, 2024
@mp911de mp911de added this to the 4.3.6 (2024.0.6) milestone Nov 12, 2024
@mp911de mp911de changed the title NoSuchTransaction in reactive mongodb client when working with transactions NoSuchTransaction in reactive MongoDB client when working with transactions Nov 12, 2024
mp911de pushed a commit that referenced this issue Nov 12, 2024
Original pull request: #4824
Closes #4804
mp911de pushed a commit that referenced this issue Nov 12, 2024
Ensure subscription order on multi document operations.

Original pull request: #4824
Closes #4804
mp911de pushed a commit that referenced this issue Nov 12, 2024
Original pull request: #4824
Closes #4804
@chemicL
Copy link
Member

chemicL commented Nov 19, 2024

@christophstrobl Hey, only now I just found the time to look into this, so it looks I'm late to the party. However, from the vague understanding that I seem to have after reading the lengthy discussion, I feel this is what was settled upon:

  1. flatMap ran concurrently and the transaction start operation could have been issued after another operation that referred to its ID was issued -> error.
  2. flatMapSequential seemed to improve things but in the end after more testing had the same issue.
  3. concatMap issues requests sequentially so everything runs in the appropriate order.

What I was wondering is whether the request containing "startTransaction": true needs to run first, but the rest can be parallelized. If that's the case it's worth considering a sequence similar to the below:

Flux.just(firstItem)
    .flatMap(this::save)
    .concatWith(Flux.fromIterable(remainderOfItems)
            .flatMapSequential(this::save)
    )

This would ensure the transaction beginning arrives at the target server before the others. But then the rest could run in parallel, speeding things up a bit, potentially. It's also worth noting that flatMapSequential subscribes eagerly just like flatMap, but the difference is that it produces results in the order of the upstream chain's emissions and requires a buffer for out-of-order arrivals.

Let me know if this helps or whether concurrent execution is undesired - if so, please disregard the above :)

@christophstrobl
Copy link
Member

christophstrobl commented Nov 19, 2024

thank you @chemicL - yes, we've been wondering if a construct like the outlined one would make sense in this case. Apparently it does. We already closed this issue using concatMap for the service & GA releases.
I'll open a new one (#4838) for next iteration to bring in the suggested improvement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
8 participants