Skip to content

Commit

Permalink
Backport compound statement result splitting.
Browse files Browse the repository at this point in the history
[closes #446]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Sep 24, 2021
1 parent 0be701d commit 0c7b4ce
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
*/
final class SimpleQueryPostgresqlStatement implements PostgresqlStatement {

private static final boolean REACTOR_3_4_AVAILABLE = isPresent("reactor.util.context.ContextView", SimpleQueryPostgresqlStatement.class.getClassLoader());

private static final Predicate<BackendMessage> WINDOW_UNTIL = or(CommandComplete.class::isInstance, EmptyQueryResponse.class::isInstance, ErrorResponse.class::isInstance);

private final ConnectionResources resources;
Expand Down Expand Up @@ -143,56 +141,16 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {

if (this.fetchSize != NO_LIMIT) {

Flux<BackendMessage> messages = ExtendedFlowDelegate.runQuery(this.resources, factory, sql, Binding.EMPTY, Collections.emptyList(), this.fetchSize);
return REACTOR_3_4_AVAILABLE ? messages.windowUntil(WINDOW_UNTIL).map(msg -> new PostgresqlResult(this.resources, msg, factory)).as(Operators::discardOnCancel) :
Flux.just(new PostgresqlResult(this.resources, messages, factory));
return ExtendedFlowDelegate.runQuery(this.resources, factory, sql, Binding.EMPTY, Collections.emptyList(), this.fetchSize)
.windowUntil(WINDOW_UNTIL)
.map(messages -> PostgresqlResult.toResult(this.resources, messages, factory))
.as(Operators::discardOnCancel);
}

Flux<BackendMessage> messages = SimpleQueryMessageFlow.exchange(this.resources.getClient(), sql);
return REACTOR_3_4_AVAILABLE ? messages.windowUntil(WINDOW_UNTIL).map(msg -> new PostgresqlResult(this.resources, msg, factory)).as(Operators::discardOnCancel) :
Flux.just(PostgresqlResult.toResult(this.resources, messages, factory));
}

/**
* Determine whether the {@link Class} identified by the supplied name is present
* and can be loaded. Will return {@code false} if either the class or
* one of its dependencies is not present or cannot be loaded.
*
* @param className the name of the class to check
* @param classLoader the class loader to use
* (may be {@code null} which indicates the default class loader)
* @return whether the specified class is present (including all of its
* superclasses and interfaces)
* @throws IllegalStateException if the corresponding class is resolvable but
* there was a readability mismatch in the inheritance hierarchy of the class
* (typically a missing dependency declaration in a Jigsaw module definition
* for a superclass or interface implemented by the class to be checked here)
*/
private static boolean isPresent(String className, @Nullable ClassLoader classLoader) {
try {
try {
Class.forName(className, false, classLoader);
} catch (ClassNotFoundException ex) {
int lastDotIndex = className.lastIndexOf(".");
if (lastDotIndex != -1) {
String innerClassName =
className.substring(0, lastDotIndex) + "$" + className.substring(lastDotIndex + 1);
try {
Class.forName(innerClassName, false, classLoader);
} catch (ClassNotFoundException ex2) {
// Swallow - let original exception get through
}
}
throw ex;
}
return true;
} catch (IllegalAccessError err) {
throw new IllegalStateException("Readability mismatch in inheritance hierarchy of class [" +
className + "]: " + err.getMessage(), err);
} catch (Throwable ex) {
// Typically ClassNotFoundException or NoClassDefFoundError...
return false;
}
return SimpleQueryMessageFlow.exchange(this.resources.getClient(), sql)
.windowUntil(WINDOW_UNTIL)
.map(messages -> PostgresqlResult.toResult(this.resources, messages, factory))
.as(Operators::discardOnCancel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void add() {
.add("test-query-2")
.execute()
.as(StepVerifier::create)
.expectNextCount(1)
.expectNextCount(2)
.verifyComplete();
}

Expand Down
7 changes: 0 additions & 7 deletions src/test/java/io/r2dbc/postgresql/PostgresqlTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.test.TestKit;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.jdbc.core.JdbcOperations;

Expand Down Expand Up @@ -97,9 +95,4 @@ public String getPlaceholder(int index) {
return String.format("$%d", index + 1);
}

@Test
@Disabled("Disabled until we get a fixed windowUntil(…) or we have a better way to split result streams into multiple Result objects. https://github.com/pgjdbc/r2dbc-postgresql/issues/401")
public void compoundStatement() {
}

}

0 comments on commit 0c7b4ce

Please sign in to comment.