Skip to content

Commit

Permalink
Fix ThreadLocal Issue with Repository Save.
Browse files Browse the repository at this point in the history
The issue was introduced when the Mono.deferContextual() was added to
determine if the save() is in a transaction. It may be executing in a
different thread when the PseudoArgs (scope, collection, and options)
are retrieved ThreadLocal. This change ensures scope and collection
are retrieved, but options are ignored and discarded.

Closes #1838.
  • Loading branch information
mikereiche committed Oct 10, 2023
1 parent 98d72ef commit cef273d
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ReactiveCouchbaseTemplate implements ReactiveCouchbaseOperations, A
private final CouchbaseConverter converter;
private final PersistenceExceptionTranslator exceptionTranslator;
private final ReactiveCouchbaseTemplateSupport templateSupport;
private ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
private final ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
private final QueryScanConsistency scanConsistency;

public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter) {
Expand Down Expand Up @@ -257,14 +257,6 @@ public PseudoArgs<?> getPseudoArgs() {
* set the ThreadLocal field
*/
public void setPseudoArgs(PseudoArgs<?> threadLocalArgs) {
if (this.threadLocalArgs == null) {
synchronized (this) {
if (this.threadLocalArgs == null) {
this.threadLocalArgs = new ThreadLocal<>();
}
}
}

this.threadLocalArgs.set(threadLocalArgs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.lang.reflect.AnnotatedElement;

import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.OptionsBuilder;
import org.springframework.data.couchbase.core.support.PseudoArgs;
import org.springframework.data.couchbase.repository.Collection;
import org.springframework.data.couchbase.repository.ScanConsistency;
import org.springframework.data.couchbase.repository.Scope;
Expand All @@ -35,7 +37,7 @@
*
* @author Michael Reiche
*/
public class CouchbaseRepositoryBase<T, ID> {
public abstract class CouchbaseRepositoryBase<T, ID> {

/**
* Contains information about the entity being used in this repository.
Expand Down Expand Up @@ -82,9 +84,11 @@ <S extends T> String getId(S entity) {

protected String getScope() {
String fromAnnotation = OptionsBuilder.annotationString(Scope.class, CollectionIdentifier.DEFAULT_SCOPE,
new AnnotatedElement[] { getJavaType(), repositoryInterface });
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getScope() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromMetadata, fromAnnotation);
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
String fromThreadLocal = pa != null ? pa.getScope() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromThreadLocal, fromMetadata, fromAnnotation);
}

/**
Expand All @@ -96,12 +100,18 @@ protected String getScope() {
* 1. repository.withCollection()
*/
protected String getCollection() {
String fromAnnotation = OptionsBuilder.annotationString(Collection.class, CollectionIdentifier.DEFAULT_COLLECTION,
new AnnotatedElement[] { getJavaType(), repositoryInterface });
String fromAnnotation = OptionsBuilder.annotationString(Collection.class,
CollectionIdentifier.DEFAULT_COLLECTION,
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getCollection() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromMetadata, fromAnnotation);
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
String fromThreadLocal = pa != null ? pa.getCollection() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromThreadLocal, fromMetadata,
fromAnnotation);
}

protected abstract ReactiveCouchbaseTemplate getReactiveTemplate();

/**
* Get the QueryScanConsistency from <br>
* 1. The method annotation (method *could* be available from crudMethodMetadata)<br>
Expand Down Expand Up @@ -132,4 +142,5 @@ QueryScanConsistency getQueryScanConsistency() {
void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
this.crudMethodMetadata = crudMethodMetadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.stream.Collectors;

import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.CouchbaseTemplate;
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.repository.CouchbaseRepository;
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
Expand All @@ -37,7 +37,6 @@
import org.springframework.util.Assert;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.util.ReflectionUtils;

/**
* Repository base implementation for Couchbase.
Expand Down Expand Up @@ -71,7 +70,13 @@ public SimpleCouchbaseRepository(CouchbaseEntityInformation<T, String> entityInf
@Override
@SuppressWarnings("unchecked")
public <S extends T> S save(S entity) {
return operations.save(entity, getScope(), getCollection());
String scopeName = getScope();
String collectionName = getCollection();
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
// not that this will also clear out Options, but that's ok as any options would not work
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
getReactiveTemplate().setPseudoArgs(null);
return operations.save(entity, scopeName, collectionName);
}

@Override
Expand Down Expand Up @@ -177,4 +182,8 @@ public CouchbaseOperations getOperations() {
return operations;
}

@Override
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
return ((CouchbaseTemplate) getOperations()).reactive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import org.reactivestreams.Publisher;
import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.repository.ReactiveCouchbaseRepository;
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
import org.springframework.data.domain.Sort;
import org.springframework.data.util.Streamable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/**
* Reactive repository base implementation for Couchbase.
Expand Down Expand Up @@ -76,7 +74,13 @@ public Flux<T> findAll(Sort sort) {
@SuppressWarnings("unchecked")
@Override
public <S extends T> Mono<S> save(S entity) {
return save(entity, getScope(), getCollection());
String scopeName = getScope();
String collectionName = getCollection();
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
// not that this will also clear out Options, but that's ok as any options would not work
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
getReactiveTemplate().setPseudoArgs(null);
return operations.save(entity, scopeName, collectionName);
}

@Override
Expand Down Expand Up @@ -227,4 +231,9 @@ public ReactiveCouchbaseOperations getOperations() {
return operations;
}

@Override
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
return (ReactiveCouchbaseTemplate) getOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.springframework.data.couchbase.domain;

import org.springframework.data.couchbase.repository.Collection;
import org.springframework.data.couchbase.repository.Scope;

@Scope("must set scope name")
@Collection("my_collection")
public interface ReactiveAirportMustScopeRepository extends ReactiveAirportRepository {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import reactor.core.Disposable;

import java.util.List;
import java.util.Random;
import java.util.UUID;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -31,6 +35,7 @@
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.domain.Airport;
import org.springframework.data.couchbase.domain.ConfigScoped;
import org.springframework.data.couchbase.domain.ReactiveAirportMustScopeRepository;
import org.springframework.data.couchbase.domain.ReactiveAirportRepository;
import org.springframework.data.couchbase.domain.ReactiveAirportRepositoryAnnotated;
import org.springframework.data.couchbase.domain.ReactiveUserColRepository;
Expand Down Expand Up @@ -61,6 +66,7 @@ public class ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests extends

@Autowired ReactiveAirportRepository reactiveAirportRepository;
@Autowired ReactiveAirportRepositoryAnnotated reactiveAirportRepositoryAnnotated;
@Autowired ReactiveAirportMustScopeRepository reactiveAirportMustScopeRepository;
@Autowired ReactiveUserColRepository userColRepository;
@Autowired public CouchbaseTemplate couchbaseTemplate;
@Autowired public ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;
Expand Down Expand Up @@ -116,6 +122,21 @@ public void myTest() {

}

@Test
void testThreadLocal() throws InterruptedException {

String scopeName = "my_scope";
String id = UUID.randomUUID().toString();

Airport airport = new Airport(id, "testThreadLocal", "icao");
Disposable s = reactiveAirportMustScopeRepository.withScope(scopeName).findById(airport.getId()).doOnNext(u -> {
throw new RuntimeException("User already Exists! " + u);
}).then(reactiveAirportMustScopeRepository.withScope(scopeName).save(airport))
.subscribe(u -> LOGGER.info("User Persisted Successfully! {}", u));

reactiveAirportMustScopeRepository.withScope(scopeName).deleteById(id).block();
}

/**
* can test against _default._default without setting up additional scope/collection and also test for collections and
* scopes that do not exist These same tests should be repeated on non-default scope and collection in a test that
Expand Down

0 comments on commit cef273d

Please sign in to comment.