Skip to content

Commit

Permalink
Late key generation for consistent evaluation of unless expressions
Browse files Browse the repository at this point in the history
Includes removal of evict step on pipeline exception, retaining a previous cache value and avoiding an incomplete key (for consistency with non-reactive caching).

Closes gh-31626
  • Loading branch information
jhoeller committed Nov 21, 2023
1 parent 1410c46 commit 5a3ad6b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,8 @@ private void collectPutRequests(Collection<CacheOperationContext> contexts,
@Nullable Object result, Collection<CachePutRequest> putRequests) {

for (CacheOperationContext context : contexts) {
if (isConditionPassing(context, result) && context.canPutToCache(result)) {
Object key = generateKey(context, result);
putRequests.add(new CachePutRequest(context, key));
if (isConditionPassing(context, result)) {
putRequests.add(new CachePutRequest(context));
}
}
}
Expand Down Expand Up @@ -943,22 +942,16 @@ private class CachePutRequest {

private final CacheOperationContext context;

private final Object key;

public CachePutRequest(CacheOperationContext context, Object key) {
public CachePutRequest(CacheOperationContext context) {
this.context = context;
this.key = key;
}

@Nullable
public Object apply(@Nullable Object result) {
if (result instanceof CompletableFuture<?> future) {
return future.whenComplete((value, ex) -> {
if (ex != null) {
performEvict(ex);
}
else {
performPut(value);
if (ex == null) {
performCachePut(value);
}
});
}
Expand All @@ -968,27 +961,20 @@ public Object apply(@Nullable Object result) {
return returnValue;
}
}
performPut(result);
performCachePut(result);
return null;
}

void performPut(@Nullable Object value) {
if (logger.isTraceEnabled()) {
logger.trace("Creating cache entry for key '" + this.key + "' in cache(s) " +
this.context.getCacheNames());
}
for (Cache cache : this.context.getCaches()) {
doPut(cache, this.key, value);
}
}

void performEvict(Throwable cause) {
if (logger.isTraceEnabled()) {
logger.trace("Removing cache entry for key '" + this.key + "' from cache(s) " +
this.context.getCacheNames() + " due to exception: " + cause);
}
for (Cache cache : this.context.getCaches()) {
doEvict(cache, this.key, false);
public void performCachePut(@Nullable Object value) {
if (this.context.canPutToCache(value)) {
Object key = generateKey(this.context, value);
if (logger.isTraceEnabled()) {
logger.trace("Creating cache entry for key '" + key + "' in cache(s) " +
this.context.getCacheNames());
}
for (Cache cache : this.context.getCaches()) {
doPut(cache, key, value);
}
}
}
}
Expand Down Expand Up @@ -1017,11 +1003,10 @@ public void onNext(Object o) {
}
@Override
public void onError(Throwable t) {
this.request.performEvict(t);
}
@Override
public void onComplete() {
this.request.performPut(this.cacheValue);
this.request.performCachePut(this.cacheValue);
}
}

Expand Down Expand Up @@ -1107,7 +1092,7 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
.doOnSuccess(request::performPut).doOnError(request::performEvict));
.doOnSuccess(request::performCachePut));
}
}
return NOT_HANDLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ void spr14235AdaptsToCompletableFuture() {
assertThat(bean.findById("tb1").join()).isSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb);

tb = bean.findById("tb2").join();
assertThat(tb).isNotNull();
assertThat(bean.findById("tb2").join()).isNotSameAs(tb);
assertThat(cache.get("tb2")).isNull();

context.close();
}

Expand Down Expand Up @@ -247,6 +252,11 @@ void spr14235AdaptsToReactorMono() {
assertThat(bean.findById("tb1").block()).isSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb);

tb = bean.findById("tb2").block();
assertThat(tb).isNotNull();
assertThat(bean.findById("tb2").block()).isNotSameAs(tb);
assertThat(cache.get("tb2")).isNull();

context.close();
}

Expand Down Expand Up @@ -297,6 +307,11 @@ void spr14235AdaptsToReactorFlux() {
assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb);
assertThat(cache.get("tb1").get()).isEqualTo(tb);

tb = bean.findById("tb2").collectList().block();
assertThat(tb).isNotEmpty();
assertThat(bean.findById("tb2").collectList().block()).isNotEqualTo(tb);
assertThat(cache.get("tb2")).isNull();

context.close();
}

Expand Down Expand Up @@ -548,7 +563,7 @@ public Spr14230Service service() {

public static class Spr14235FutureService {

@Cacheable(value = "itemCache")
@Cacheable(value = "itemCache", unless = "#result.name == 'tb2'")
public CompletableFuture<TestBean> findById(String id) {
return CompletableFuture.completedFuture(new TestBean(id));
}
Expand Down Expand Up @@ -581,7 +596,7 @@ public TestBean insertItem(TestBean item) {

public static class Spr14235MonoService {

@Cacheable(value = "itemCache")
@Cacheable(value = "itemCache", unless = "#result.name == 'tb2'")
public Mono<TestBean> findById(String id) {
return Mono.just(new TestBean(id));
}
Expand Down Expand Up @@ -616,7 +631,7 @@ public static class Spr14235FluxService {

private int counter = 0;

@Cacheable(value = "itemCache")
@Cacheable(value = "itemCache", unless = "#result[0].name == 'tb2'")
public Flux<TestBean> findById(String id) {
return Flux.just(new TestBean(id), new TestBean(id + (counter++)));
}
Expand Down

0 comments on commit 5a3ad6b

Please sign in to comment.