Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamicProxyable for ReactiveCouchbaseRepository using ThreadLocal #1838

Closed
RittikeGhosh opened this issue Oct 8, 2023 · 4 comments · Fixed by #1840
Closed

DynamicProxyable for ReactiveCouchbaseRepository using ThreadLocal #1838

RittikeGhosh opened this issue Oct 8, 2023 · 4 comments · Fixed by #1840
Labels
type: bug A general bug

Comments

@RittikeGhosh
Copy link

RittikeGhosh commented Oct 8, 2023

Spring Boot Version: 3.1.4
Spring Dependency Management Version: 1.1.3
Couchbase Enterprise: 7.2.2
Project Link: couchbasedynamicproxyable-demo.zip

DynamicProxyable

Support for Couchbase-specific options, scope and collections The three "with" methods will return a new proxy
instance with the specified options, scope, or collections set. The setters are called with the corresponding
options, scope and collection to set the ThreadLocal fields on the CouchbaseOperations of the repository just
before the call is made to the repository, and called again with 'null' just after the call is made. The repository
method will fetch those values to use in the call.

As mentioned in DynamicProxyable it is mentioned that, it uses ThreadLocal, is it correct in reactive paradigm ?

That's why when I am trying this in reactive programming it is not consistent rather I could say, not supporting.

I have created a demo project recreating this issue.

 var user = new User("hw" + random.nextInt(0, 100), "Hello World");
 userRepository.withScope(TENANT_SCOPE).findById(user.id())
   .doOnNext(u -> {
       throw new RuntimeException("User already Exists!");
   })
   .then(userRepository.withScope(TENANT_SCOPE).save(user))
   .subscribe(u -> log.info("User Persisted Successfully! {}", u));

This piece of code has two separate repository call to the same repository mentioning withScope().

In this example if both the repository calls happen in the same thread, then there is no issue but if the threads are different, the call fails as it fails to pick the correct scope from the ThreadLocal because it is different thread.

This is the log:

2023-10-08T11:59:51.850+05:30 DEBUG 1050611 --- [           main] s.d.c.c.ReactiveFindByIdOperationSupport : findById key=hw49 scope: tenant collection: user options: com.couchbase.client.java.kv.GetOptions@7db40fd5
2023-10-08T11:59:51.920+05:30 DEBUG 1050611 --- [   cb-io-kv-5-2] d.c.c.ReactiveUpsertByIdOperationSupport : upsertById object=User[id=hw49, name=Hello World] scope: _default collection: user options: null
2023-10-08T11:59:51.956+05:30 DEBUG 1050611 --- [   cb-io-kv-5-2] o.s.d.c.core.query.OptionsBuilder        : upsert options: {}{durabilityLevel: Optional.empty, persistTo: NONE, replicateTo: NONE, timeout: Optional.empty, retryStrategy: Optional.empty, clientContext: null, parentSpan: Optional.empty}
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.couchbase.client.core.error.AmbiguousTimeoutException: UpsertRequest, Reason: TIMEOUT {"cancelled":true,"completed":true,"coreId":"0x548ece300000001","idempotent":false,"lastChannelId":"0548ECE300000001/000000006D07D16B","lastDispatchedFrom":"127.0.0.1:55462","lastDispatchedTo":"localhost:11210","reason":"TIMEOUT","requestId":7,"requestType":"UpsertRequest","retried":14,"retryReasons":["COLLECTION_MAP_REFRESH_IN_PROGRESS"],"service":{"bucket":"myService","collection":"user","documentId":"hw49","opaque":"0x10","scope":"_default","type":"kv","vbucket":583},"timeoutMs":2500,"timings":{"encodingMicros":4405,"totalMicros":2513777}}
...Stacktrace skipped
com.couchbase.client.core.error.UnambiguousTimeoutException: GetCollectionIdRequest, Reason: TIMEOUT {"cancelled":true,"completed":true,"coreId":"0x548ece300000001","idempotent":true,"lastChannelId":"0548ECE300000001/000000006D07D16B","lastDispatchedFrom":"127.0.0.1:55462","lastDispatchedTo":"localhost:11210","reason":"TIMEOUT","requestId":6,"requestType":"GetCollectionIdRequest","retried":14,"retryReasons":["KV_COLLECTION_OUTDATED"],"service":{"bucket":"myService","collection":"user","errorCode":{"description":"Operation specified an unknown collection.","name":"UNKNOWN_COLLECTION"},"opaque":"0xf","scope":"_default","type":"kv","vbucket":0},"timeoutMs":2500,"timings":{"dispatchMicros":1345,"totalDispatchMicros":23755,"totalServerMicros":0,"serverMicros":0}}
...Stacktrace skipped

As can be seen in second line of the log, the scope set is _default, which is incorrect as it should be tenant.

Instead of plain then() operator, if I try deferring it works,
.then(Mono.defer(() -> userRepository.withScope(TENANT_SCOPE).save(user).log("2")))

Any appropriate solution ? This is important for multi tenant implementation.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Oct 8, 2023
@mikereiche
Copy link
Collaborator

mikereiche commented Oct 9, 2023

The two calls are independent of each other - it doesn't matter that they are in different threads.

The DynamicProxyable.invoke() intercept the save() call, stores the scope in a ThreadLocal) and then in calls the actual repository.save() method which retrieves the ThreadLocal - it's all in the same Thread. I'll check if there could be an issue with one execution clearing out the ThreadLocal before another execution reads it.

It would be helpful if you provide your complete example so I can be sure I'm running the same thing that you are.

@RittikeGhosh
Copy link
Author

RittikeGhosh commented Oct 9, 2023

Okay! Please refer to example in src/main/java/com/example/couchbasedynamicproxyable/ApplicationInit.java in the project: couchbasedynamicproxyable-demo.zip

@mikereiche
Copy link
Collaborator

mikereiche commented Oct 9, 2023

Thanks.

The issue was introduced with transactions support and is in the repository.save() method. No other repository methods will have the issue. save() starts the flux publisher before the ThreadLocal information is retrieved to determine if it is a transaction or not in order to choose one of insert, upsert or replace. So when the ThreadLocal information is retrieved in ReactiveInsertByIdOperationSupport.one() - it's in a different thread.

The work-around is to use reactiveTemplate.withScope(scopeName).withCollection(collectinName).insertById(User.class).one(user). Or upsert() or replace() as required. Note that the collection name is also needed - since it's not using the repository, the repository annotation will not be used.

mikereiche added a commit that referenced this issue Oct 10, 2023
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
mikereiche added a commit that referenced this issue Oct 10, 2023
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
mikereiche added a commit that referenced this issue Oct 10, 2023
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
mikereiche added a commit that referenced this issue Oct 10, 2023
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
@mikereiche mikereiche added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged labels Oct 10, 2023
@mikereiche mikereiche added this to the 5.1.5 (2023.0.5) milestone Oct 10, 2023
mikereiche added a commit that referenced this issue Oct 11, 2023
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
mikereiche added a commit that referenced this issue Oct 11, 2023
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
@RittikeGhosh
Copy link
Author

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants