Skip to content

Commit

Permalink
Invalidate statement cache even if the statement is not reprepared.
Browse files Browse the repository at this point in the history
This helps with cache hygiene. Also, reduce allocations.

[#382]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Oct 27, 2021
1 parent 7f57957 commit 6a5fc09
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions src/main/java/io/r2dbc/postgresql/ExtendedFlowDelegate.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,28 +253,31 @@ private static Flux<BackendMessage> fetchCursoredWithFlush(ExtendedFlowOperator
}

private static BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleReprepare(Sinks.Many<FrontendMessage> requests, ExtendedFlowOperator operator, MessageFactory messageFactory) {

AtomicBoolean reprepared = new AtomicBoolean();

return (message, sink) -> {

if (message instanceof ErrorResponse && requiresReprepare((ErrorResponse) message) && reprepared.compareAndSet(false, true)) {
if (message instanceof ErrorResponse && requiresReprepare((ErrorResponse) message)) {

operator.evictCachedStatement();

List<FrontendMessage.DirectEncoder> messages = messageFactory.createMessages();
if (!messages.contains(Sync.INSTANCE)) {
messages.add(0, Sync.INSTANCE);
if (reprepared.compareAndSet(false, true)) {

List<FrontendMessage.DirectEncoder> messages = messageFactory.createMessages();
if (!messages.contains(Sync.INSTANCE)) {
messages.add(0, Sync.INSTANCE);
}
requests.emitNext(new CompositeFrontendMessage(messages), Sinks.EmitFailureHandler.FAIL_FAST);

return;
}
requests.emitNext(new CompositeFrontendMessage(messages), Sinks.EmitFailureHandler.FAIL_FAST);
} else {
sink.next(message);
}

sink.next(message);
};
}

private static boolean requiresReprepare(ErrorResponse errorResponse) {

ErrorDetails details = new ErrorDetails(errorResponse.getFields());
String code = details.getCode();

Expand Down Expand Up @@ -304,7 +307,7 @@ interface MessageFactory {
/**
* Operator to encapsulate common activity around the extended flow. Subclasses {@link AtomicInteger} to capture the number of ReadyForQuery frames.
*/
static class ExtendedFlowOperator extends AtomicInteger {
static class ExtendedFlowOperator extends AtomicInteger implements Predicate<BackendMessage> {

private final String sql;

Expand All @@ -328,7 +331,6 @@ public ExtendedFlowOperator(String sql, Binding binding, StatementCache cache, L
this.values = values;
this.portal = portal;
this.forceBinary = forceBinary;
set(1);
}

public void close(Sinks.Many<FrontendMessage> requests) {
Expand All @@ -337,9 +339,6 @@ public void close(Sinks.Many<FrontendMessage> requests) {
}

public void evictCachedStatement() {

incrementAndGet();

synchronized (this) {
this.name = null;
}
Expand All @@ -351,14 +350,16 @@ public void hydrateStatementCache() {
}

public Predicate<BackendMessage> takeUntil() {
return m -> {
return this;
}

if (m instanceof ReadyForQuery) {
return decrementAndGet() <= 0;
}
@Override
public boolean test(BackendMessage backendMessage) {
if (backendMessage instanceof ReadyForQuery) {
return decrementAndGet() <= 0;
}

return false;
};
return false;
}

private boolean isPrepareRequired() {
Expand All @@ -376,6 +377,7 @@ public String getStatementName() {
}

public List<FrontendMessage.DirectEncoder> getMessages(Collection<FrontendMessage.DirectEncoder> append) {
incrementAndGet();
List<FrontendMessage.DirectEncoder> messagesToSend = new ArrayList<>(6);

if (isPrepareRequired()) {
Expand Down

0 comments on commit 6a5fc09

Please sign in to comment.