Skip to content

Commit

Permalink
Fix ThreadLocal Issue with Repository Save. (#1840)
Browse files Browse the repository at this point in the history
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
  • Loading branch information
mikereiche authored Oct 10, 2023
1 parent 98d72ef commit 0abf846
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ReactiveCouchbaseTemplate implements ReactiveCouchbaseOperations, A
private final CouchbaseConverter converter;
private final PersistenceExceptionTranslator exceptionTranslator;
private final ReactiveCouchbaseTemplateSupport templateSupport;
private ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
private final ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
private final QueryScanConsistency scanConsistency;

public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter) {
Expand Down Expand Up @@ -257,14 +257,6 @@ public PseudoArgs<?> getPseudoArgs() {
* set the ThreadLocal field
*/
public void setPseudoArgs(PseudoArgs<?> threadLocalArgs) {
if (this.threadLocalArgs == null) {
synchronized (this) {
if (this.threadLocalArgs == null) {
this.threadLocalArgs = new ThreadLocal<>();
}
}
}

this.threadLocalArgs.set(threadLocalArgs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.lang.reflect.AnnotatedElement;

import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.OptionsBuilder;
import org.springframework.data.couchbase.core.support.PseudoArgs;
import org.springframework.data.couchbase.repository.Collection;
import org.springframework.data.couchbase.repository.ScanConsistency;
import org.springframework.data.couchbase.repository.Scope;
Expand All @@ -35,7 +37,7 @@
*
* @author Michael Reiche
*/
public class CouchbaseRepositoryBase<T, ID> {
public abstract class CouchbaseRepositoryBase<T, ID> {

/**
* Contains information about the entity being used in this repository.
Expand Down Expand Up @@ -82,9 +84,11 @@ <S extends T> String getId(S entity) {

protected String getScope() {
String fromAnnotation = OptionsBuilder.annotationString(Scope.class, CollectionIdentifier.DEFAULT_SCOPE,
new AnnotatedElement[] { getJavaType(), repositoryInterface });
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getScope() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromMetadata, fromAnnotation);
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
String fromThreadLocal = pa != null ? pa.getScope() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromThreadLocal, fromMetadata, fromAnnotation);
}

/**
Expand All @@ -96,12 +100,18 @@ protected String getScope() {
* 1. repository.withCollection()
*/
protected String getCollection() {
String fromAnnotation = OptionsBuilder.annotationString(Collection.class, CollectionIdentifier.DEFAULT_COLLECTION,
new AnnotatedElement[] { getJavaType(), repositoryInterface });
String fromAnnotation = OptionsBuilder.annotationString(Collection.class,
CollectionIdentifier.DEFAULT_COLLECTION,
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getCollection() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromMetadata, fromAnnotation);
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
String fromThreadLocal = pa != null ? pa.getCollection() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromThreadLocal, fromMetadata,
fromAnnotation);
}

protected abstract ReactiveCouchbaseTemplate getReactiveTemplate();

/**
* Get the QueryScanConsistency from <br>
* 1. The method annotation (method *could* be available from crudMethodMetadata)<br>
Expand Down Expand Up @@ -132,4 +142,5 @@ QueryScanConsistency getQueryScanConsistency() {
void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
this.crudMethodMetadata = crudMethodMetadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.stream.Collectors;

import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.CouchbaseTemplate;
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.repository.CouchbaseRepository;
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
Expand All @@ -37,7 +37,6 @@
import org.springframework.util.Assert;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.util.ReflectionUtils;

/**
* Repository base implementation for Couchbase.
Expand Down Expand Up @@ -71,7 +70,13 @@ public SimpleCouchbaseRepository(CouchbaseEntityInformation<T, String> entityInf
@Override
@SuppressWarnings("unchecked")
public <S extends T> S save(S entity) {
return operations.save(entity, getScope(), getCollection());
String scopeName = getScope();
String collectionName = getCollection();
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
// not that this will also clear out Options, but that's ok as any options would not work
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
getReactiveTemplate().setPseudoArgs(null);
return operations.save(entity, scopeName, collectionName);
}

@Override
Expand Down Expand Up @@ -177,4 +182,8 @@ public CouchbaseOperations getOperations() {
return operations;
}

@Override
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
return ((CouchbaseTemplate) getOperations()).reactive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import org.reactivestreams.Publisher;
import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.repository.ReactiveCouchbaseRepository;
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
import org.springframework.data.domain.Sort;
import org.springframework.data.util.Streamable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/**
* Reactive repository base implementation for Couchbase.
Expand Down Expand Up @@ -76,7 +74,13 @@ public Flux<T> findAll(Sort sort) {
@SuppressWarnings("unchecked")
@Override
public <S extends T> Mono<S> save(S entity) {
return save(entity, getScope(), getCollection());
String scopeName = getScope();
String collectionName = getCollection();
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
// not that this will also clear out Options, but that's ok as any options would not work
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
getReactiveTemplate().setPseudoArgs(null);
return operations.save(entity, scopeName, collectionName);
}

@Override
Expand Down Expand Up @@ -227,4 +231,9 @@ public ReactiveCouchbaseOperations getOperations() {
return operations;
}

@Override
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
return (ReactiveCouchbaseTemplate) getOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.springframework.data.couchbase.domain;

import org.springframework.data.couchbase.repository.Collection;
import org.springframework.data.couchbase.repository.Scope;

@Scope("must set scope name")
@Collection("my_collection")
public interface ReactiveAirportMustScopeRepository extends ReactiveAirportRepository {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import reactor.core.Disposable;

import java.util.List;
import java.util.Random;
import java.util.UUID;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -31,6 +35,7 @@
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.domain.Airport;
import org.springframework.data.couchbase.domain.ConfigScoped;
import org.springframework.data.couchbase.domain.ReactiveAirportMustScopeRepository;
import org.springframework.data.couchbase.domain.ReactiveAirportRepository;
import org.springframework.data.couchbase.domain.ReactiveAirportRepositoryAnnotated;
import org.springframework.data.couchbase.domain.ReactiveUserColRepository;
Expand Down Expand Up @@ -61,6 +66,7 @@ public class ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests extends

@Autowired ReactiveAirportRepository reactiveAirportRepository;
@Autowired ReactiveAirportRepositoryAnnotated reactiveAirportRepositoryAnnotated;
@Autowired ReactiveAirportMustScopeRepository reactiveAirportMustScopeRepository;
@Autowired ReactiveUserColRepository userColRepository;
@Autowired public CouchbaseTemplate couchbaseTemplate;
@Autowired public ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;
Expand Down Expand Up @@ -116,6 +122,21 @@ public void myTest() {

}

@Test
void testThreadLocal() throws InterruptedException {

String scopeName = "my_scope";
String id = UUID.randomUUID().toString();

Airport airport = new Airport(id, "testThreadLocal", "icao");
Disposable s = reactiveAirportMustScopeRepository.withScope(scopeName).findById(airport.getId()).doOnNext(u -> {
throw new RuntimeException("User already Exists! " + u);
}).then(reactiveAirportMustScopeRepository.withScope(scopeName).save(airport))
.subscribe(u -> LOGGER.info("User Persisted Successfully! {}", u));

reactiveAirportMustScopeRepository.withScope(scopeName).deleteById(id).block();
}

/**
* can test against _default._default without setting up additional scope/collection and also test for collections and
* scopes that do not exist These same tests should be repeated on non-default scope and collection in a test that
Expand Down

0 comments on commit 0abf846

Please sign in to comment.