Skip to content

Commit

Permalink
Merge pull request #598 from liquibase/DAT-19473
Browse files Browse the repository at this point in the history
Fix rows affected reporting for MongoDB operations
  • Loading branch information
CharlesQueiroz authored Feb 24, 2025
2 parents 60e3824 + 4bd1e07 commit 86ad244
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@

import com.mongodb.MongoException;
import com.mongodb.client.MongoDatabase;
import liquibase.Scope;
import liquibase.executor.jvm.JdbcExecutor;
import liquibase.ext.mongodb.database.MongoLiquibaseDatabase;
import liquibase.nosql.executor.NoSqlExecutor;
import liquibase.nosql.statement.NoSqlExecuteStatement;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.bson.Document;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Objects.nonNull;
import static liquibase.executor.jvm.JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY;

@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
Expand All @@ -43,12 +48,17 @@ public abstract class AbstractRunCommandStatement extends AbstractMongoStatement
public static final String OK = "ok";
public static final String WRITE_ERRORS = "writeErrors";

public static final String N = "n";
public static final String N_MODIFIED = "nModified";
public static final String N_REMOVED = "nRemoved";

@Getter
protected final Document command;

@Override
public void execute(final MongoLiquibaseDatabase database) {
run(database);
Document response = run(database);
updateRowsAffected(response);
}

public Document run(final MongoLiquibaseDatabase database) {
Expand All @@ -74,16 +84,50 @@ public Document run(final MongoDatabase mongoDatabase) {
* Check the response and throw an appropriate exception if the command was not successful
*/
protected void checkResponse(final Document responseDocument) throws MongoException {
final Double ok = responseDocument.get(OK) instanceof Integer ? (double) responseDocument.getInteger(OK) :
responseDocument.getDouble(OK);
final double ok = responseDocument.get(OK) instanceof Integer
? (double) responseDocument.getInteger(OK)
: responseDocument.getDouble(OK);

final List<Document> writeErrors = responseDocument.getList(WRITE_ERRORS, Document.class);

if (nonNull(ok) && !ok.equals(1.0d)
|| nonNull(writeErrors) && !writeErrors.isEmpty()) {
if (ok != 1.0d || nonNull(writeErrors) && !writeErrors.isEmpty()) {
throw new MongoException("Command failed. The full response is " + responseDocument.toJson());
}
}

protected void updateRowsAffected(Document response) {
int affectedCount = extractAffectedCount(response);
if (affectedCount <= 0) {
return;
}

AtomicInteger scopeRowsAffected = Scope.getCurrentScope().get(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, AtomicInteger.class);
if (scopeRowsAffected == null) {
scopeRowsAffected = NoSqlExecutor.GLOBAL_ROWS_AFFECTED;
}
Boolean shouldUpdate = Scope.getCurrentScope().get(SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, Boolean.TRUE);
if (Boolean.TRUE.equals(shouldUpdate)) {
scopeRowsAffected.addAndGet(affectedCount);
Scope.getCurrentScope().getLog(getClass()).fine("Added " + affectedCount + " to ROWS_AFFECTED_SCOPE_KEY; new total=" + scopeRowsAffected.get());
}
}

protected int extractAffectedCount(Document response) {
if (response.containsKey(N)) {
return response.getInteger(N, 0);
}
// Then 'nModified' (some updates)
if (response.containsKey(N_MODIFIED)) {
return response.getInteger(N_MODIFIED, 0);
}
// Then 'nRemoved'
if (response.containsKey(N_REMOVED)) {
return response.getInteger(N_REMOVED, 0);
}
// or 0 for commands that do not return anything about affected docs
return 0;
}

@Override
public String getCommandName() {
return COMMAND_NAME;
Expand All @@ -105,4 +149,4 @@ public String toJs() {
+ BsonUtils.toJson(command)
+ ");";
}
}
}
26 changes: 22 additions & 4 deletions src/main/java/liquibase/nosql/executor/NoSqlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import liquibase.database.Database;
import liquibase.exception.DatabaseException;
import liquibase.executor.AbstractExecutor;
import liquibase.executor.jvm.JdbcExecutor;
import liquibase.ext.mongodb.changelog.MongoHistoryService;
import liquibase.ext.mongodb.database.MongoLiquibaseDatabase;
import liquibase.logging.Logger;
Expand All @@ -40,6 +41,7 @@
import liquibase.statement.core.UpdateStatement;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -51,6 +53,8 @@
@NoArgsConstructor
public class NoSqlExecutor extends AbstractExecutor {

public static final AtomicInteger GLOBAL_ROWS_AFFECTED = new AtomicInteger(0);

public static final String EXECUTOR_NAME = "jdbc";
private final Logger log = Scope.getCurrentScope().getLog(getClass());

Expand Down Expand Up @@ -166,7 +170,7 @@ public List<Object> queryForList(final SqlStatement sql, final Class elementType
* @throws DatabaseException in case of a failure
*/
public void execute(final UpdateStatement updateStatement) throws DatabaseException {
if(updateStatement.getNewColumnValues().containsKey("MD5SUM")
if (updateStatement.getNewColumnValues().containsKey("MD5SUM")
&& updateStatement.getNewColumnValues().get("MD5SUM") == null) {
try {
Scope.getCurrentScope().getSingleton(ChangeLogHistoryServiceFactory.class)
Expand All @@ -181,8 +185,22 @@ public void execute(final UpdateStatement updateStatement) throws DatabaseExcept

@Override
public void execute(final SqlStatement sql) throws DatabaseException {
disableRowAffectedMessage();
this.execute(sql, emptyList());
try {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, GLOBAL_ROWS_AFFECTED);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
this.execute(sql, emptyList());
return null;
});

} catch (Exception e) {
if (e instanceof DatabaseException) {
throw (DatabaseException) e;
}
throw new DatabaseException(e);
}
}

@Override
Expand All @@ -199,7 +217,7 @@ public void execute(final SqlStatement sql, final List<SqlVisitor> sqlVisitors)
ChangeLogHistoryService changeLogHistoryService = Scope.getCurrentScope().getSingleton(ChangeLogHistoryServiceFactory.class)
.getChangeLogService(getDatabase());
if (changeLogHistoryService instanceof MongoHistoryService) {
((MongoHistoryService)changeLogHistoryService).updateCheckSum(((UpdateChangeSetChecksumStatement) sql).getChangeSet());
((MongoHistoryService) changeLogHistoryService).updateCheckSum(((UpdateChangeSetChecksumStatement) sql).getChangeSet());
} else {
throw new DatabaseException("Could not execute as we are not in a MongoDB");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package liquibase.ext.mongodb.statement;

import liquibase.Scope;
import liquibase.executor.jvm.JdbcExecutor;
import liquibase.ext.AbstractMongoIntegrationTest;
import lombok.SneakyThrows;
import org.bson.Document;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static liquibase.ext.mongodb.TestUtils.COLLECTION_NAME_1;
import static org.assertj.core.api.Assertions.assertThat;

class RowsAffectedStatementIT extends AbstractMongoIntegrationTest {

private String collectionName;
private AtomicInteger rowsAffected;

@BeforeEach
public void setUp() throws Exception {
collectionName = COLLECTION_NAME_1 + System.nanoTime();
rowsAffected = new AtomicInteger(0);

// Setup scope with rowsAffected counter
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

// The test methods will run within this scope
Scope.child(scopeValues, () -> {
super.setUpEach();
collectionName = COLLECTION_NAME_1 + System.nanoTime();
});
}

@Override
@SneakyThrows
protected void tearDownEach() {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
executor.execute(new DropAllCollectionsStatement());
connection.close();
});
}

@Test
@SneakyThrows
void shouldTrackRowsAffectedForCreateCollection() {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName);
createStatement.execute(database);
assertThat(rowsAffected.get()).isEqualTo(1);
});
}

@Test
@SneakyThrows
void shouldTrackRowsAffectedForInsertOne() {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName);
createStatement.execute(database);
rowsAffected.set(0);

Document doc = new Document("test", "value");
final InsertOneStatement insertStatement = new InsertOneStatement(collectionName, doc);
insertStatement.execute(database);

assertThat(rowsAffected.get()).isEqualTo(1);
});
}

@Test
@SneakyThrows
void shouldTrackRowsAffectedForInsertMany() {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName);
createStatement.execute(database);
rowsAffected.set(0);

Document doc1 = new Document("test", "value1");
Document doc2 = new Document("test", "value2");
final InsertManyStatement insertStatement = new InsertManyStatement(collectionName, Arrays.asList(doc1, doc2));
insertStatement.execute(database);

assertThat(rowsAffected.get()).isEqualTo(2);
});
}

@Test
@SneakyThrows
void shouldTrackRowsAffectedForDropCollection() {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName);
createStatement.execute(database);
rowsAffected.set(0);

final DropCollectionStatement dropStatement = new DropCollectionStatement(collectionName);
dropStatement.execute(database);

assertThat(rowsAffected.get()).isEqualTo(1);
});
}

@Test
@SneakyThrows
void shouldTrackTotalRowsAffectedForMultipleOperations() {
Map<String, Object> scopeValues = new HashMap<>();
scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected);
scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true);

Scope.child(scopeValues, () -> {
final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName);
createStatement.execute(database);

Document doc1 = new Document("test", "value1");
Document doc2 = new Document("test", "value2");
final InsertManyStatement insertStatement = new InsertManyStatement(collectionName, Arrays.asList(doc1, doc2));
insertStatement.execute(database);
assertThat(rowsAffected.get()).isEqualTo(3); // 1 to create + 2 to insert
});
}
}

0 comments on commit 86ad244

Please sign in to comment.