|
15 | 15 | */
|
16 | 16 | package org.springframework.data.r2dbc.repository.support;
|
17 | 17 |
|
18 |
| -import org.springframework.transaction.annotation.Transactional; |
19 | 18 | import reactor.core.publisher.Flux;
|
20 | 19 | import reactor.core.publisher.Mono;
|
21 | 20 |
|
|
24 | 23 |
|
25 | 24 | import org.reactivestreams.Publisher;
|
26 | 25 |
|
| 26 | +import org.springframework.dao.TransientDataAccessResourceException; |
27 | 27 | import org.springframework.data.r2dbc.convert.R2dbcConverter;
|
28 | 28 | import org.springframework.data.r2dbc.core.DatabaseClient;
|
29 | 29 | import org.springframework.data.r2dbc.core.PreparedOperation;
|
|
37 | 37 | import org.springframework.data.relational.core.sql.render.SqlRenderer;
|
38 | 38 | import org.springframework.data.relational.repository.query.RelationalEntityInformation;
|
39 | 39 | import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
| 40 | +import org.springframework.transaction.annotation.Transactional; |
40 | 41 | import org.springframework.util.Assert;
|
41 | 42 |
|
42 | 43 | /**
|
@@ -83,8 +84,16 @@ public <S extends T> Mono<S> save(S objectToSave) {
|
83 | 84 | return this.databaseClient.update() //
|
84 | 85 | .table(this.entity.getJavaType()) //
|
85 | 86 | .table(this.entity.getTableName()).using(objectToSave) //
|
86 |
| - .then() // |
87 |
| - .thenReturn(objectToSave); |
| 87 | + .fetch().rowsUpdated().handle((rowsUpdated, sink) -> { |
| 88 | + |
| 89 | + if (rowsUpdated == 0) { |
| 90 | + sink.error(new TransientDataAccessResourceException( |
| 91 | + String.format("Failed to update table [%s]. Row with Id [%s] does not exist.", |
| 92 | + this.entity.getTableName(), this.entity.getId(objectToSave)))); |
| 93 | + } else { |
| 94 | + sink.next(objectToSave); |
| 95 | + } |
| 96 | + }); |
88 | 97 | }
|
89 | 98 |
|
90 | 99 | /* (non-Javadoc)
|
|
0 commit comments