Skip to content

Commit

Permalink
Let Result.getRowsUpdated() return Long.
Browse files Browse the repository at this point in the history
[resolves #484]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Jan 27, 2022
1 parent 775b1aa commit 1277a52
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<postgresql.version>42.3.1</postgresql.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<r2dbc-spi.version>0.9.0.RELEASE</r2dbc-spi.version>
<r2dbc-spi.version>1.0.0.BUILD-SNAPSHOT</r2dbc-spi.version>
<reactor.version>2020.0.15</reactor.version>
<scram-client.version>2.1</scram-client.version>
<spring-framework.version>5.3.14</spring-framework.version>
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo

private final Codecs codecs;

private final Flux<Integer> validationQuery;
private final Flux<Long> validationQuery;

private final AtomicReference<NotificationAdapter> notificationAdapter = new AtomicReference<>();

Expand Down Expand Up @@ -380,15 +380,15 @@ public Mono<Boolean> validate(ValidationDepth depth) {
return;
}

this.validationQuery.subscribe(new CoreSubscriber<Integer>() {
this.validationQuery.subscribe(new CoreSubscriber<Long>() {

@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
public void onNext(Long integer) {

}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/r2dbc/postgresql/PostgresqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public Mono<Integer> getRowsUpdated() {
public Mono<Long> getRowsUpdated() {

return this.messages
.<Integer>handle((message, sink) -> {
Expand All @@ -88,7 +88,7 @@ public Mono<Integer> getRowsUpdated() {
return;
}

int sum = 0;
long sum = 0;

for (Integer integer : list) {
sum += integer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private PostgresqlSegmentResult(Flux<Segment> segments) {
}

@Override
public Mono<Integer> getRowsUpdated() {
public Mono<Long> getRowsUpdated() {
return this.segments
.<Integer>handle((segment, sink) -> {

Expand All @@ -137,7 +137,7 @@ public Mono<Integer> getRowsUpdated() {
return;
}

int sum = 0;
long sum = 0;

for (Integer integer : list) {
sum += integer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface PostgresqlResult extends Result {
* {@inheritDoc}
*/
@Override
Mono<Integer> getRowsUpdated();
Mono<Long> getRowsUpdated();

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ private <IN, OUT> void testCodec(Class<IN> javaType, IN value, Class<OUT> outTyp

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

SERVER.getJdbcOperations().execute("DELETE FROM test");
Expand All @@ -754,7 +754,7 @@ private <IN, OUT> void testCodec(Class<IN> javaType, IN value, Class<OUT> outTyp

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();
} else {

Expand All @@ -769,7 +769,7 @@ private <IN, OUT> void testCodec(Class<IN> javaType, IN value, Class<OUT> outTyp

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

SERVER.getJdbcOperations().execute("DELETE FROM test");
Expand All @@ -785,7 +785,7 @@ private <IN, OUT> void testCodec(Class<IN> javaType, IN value, Class<OUT> outTyp

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();
}

Expand Down Expand Up @@ -856,7 +856,7 @@ private <W, R> void testCodecReadAs(W toWrite, Class<R> javaTypeToRead, Consumer

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

this.connectionFactory.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void toResultCommandComplete() {

result.getRowsUpdated()
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();
}

Expand All @@ -57,7 +57,7 @@ void toResultCommandCompleteUsingSegments() {

result.getRowsUpdated()
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void shouldConsumeRowsUpdated() {

result.getRowsUpdated()
.as(StepVerifier::create)
.expectNext(42)
.expectNext(42L)
.verifyComplete();
}

Expand All @@ -105,7 +105,7 @@ void filterShouldRetainUpdateCount() {

result.filter(Result.UpdateCount.class::isInstance).getRowsUpdated()
.as(StepVerifier::create)
.expectNext(42)
.expectNext(42L)
.verifyComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void shouldReportDataIntegrityViolationUsingSimpleFlow() {

SERVER.getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY)");

Flux<Integer> insert = Flux.from(this.connection.createStatement("INSERT INTO test (id) VALUES (1) RETURNING *").execute()).flatMap(Result::getRowsUpdated);
Flux<?> insert = Flux.from(this.connection.createStatement("INSERT INTO test (id) VALUES (1) RETURNING *").execute()).flatMap(Result::getRowsUpdated);

insert.thenMany(insert).as(StepVerifier::create).verifyError(R2dbcDataIntegrityViolationException.class);
}
Expand All @@ -57,7 +57,7 @@ void shouldReportDataIntegrityViolationUsingExtendedFlow() {

SERVER.getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY)");

Flux<Integer> insert = Flux.from(this.connection.createStatement("INSERT INTO test (id) VALUES ($1) RETURNING *").bind("$1", 1).execute()).flatMap(Result::getRowsUpdated);
Flux<?> insert = Flux.from(this.connection.createStatement("INSERT INTO test (id) VALUES ($1) RETURNING *").bind("$1", 1).execute()).flatMap(Result::getRowsUpdated);

insert.thenMany(insert).as(StepVerifier::create).verifyError(R2dbcDataIntegrityViolationException.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static Builder builder() {
}

@Override
public Mono<Integer> getRowsUpdated() {
public Mono<Long> getRowsUpdated() {
return Mono.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void shouldBindEnumTypeAsString() {
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

String result = SERVER.getJdbcOperations().queryForObject("SELECT the_value FROM enum_test", String.class);
Expand All @@ -107,7 +107,7 @@ void shouldBindEnumArrayTypeAsString() {
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

String result = SERVER.getJdbcOperations().queryForObject("SELECT the_value FROM enum_test", String.class);
Expand All @@ -125,7 +125,7 @@ void shouldBindEnumArrayType() {
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

this.connection.createStatement("SELECT * FROM enum_test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void shouldReceiveReplication() {

ReplicationStream replicationStream = replicationConnection.startReplication(replicationRequest).block(Duration.ofSeconds(10));

connection.createStatement("INSERT INTO logical_decode_test VALUES('Hello World')").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).expectNext(1).verifyComplete();
connection.createStatement("INSERT INTO logical_decode_test VALUES('Hello World')").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).expectNext(1L).verifyComplete();

replicationStream.map(byteBuf -> byteBuf.toString(StandardCharsets.UTF_8))
.as(StepVerifier::create)
Expand Down

0 comments on commit 1277a52

Please sign in to comment.