From 4c86a2f123f1904a08205278737d02b88cf759d9 Mon Sep 17 00:00:00 2001 From: Oscar Salvador Magallanes Date: Thu, 29 Dec 2022 15:08:58 +0100 Subject: [PATCH] feat: Transform Mongo document and arrays to json string --- .../org/replicadb/manager/MongoDBManager.java | 55 +++++++++++-------- .../replicadb/rowset/MongoDBRowSetImpl.java | 48 ++++++++++------ 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/replicadb/manager/MongoDBManager.java b/src/main/java/org/replicadb/manager/MongoDBManager.java index fc067f4..bfc160e 100644 --- a/src/main/java/org/replicadb/manager/MongoDBManager.java +++ b/src/main/java/org/replicadb/manager/MongoDBManager.java @@ -1,16 +1,16 @@ package org.replicadb.manager; - -import com.mongodb.*; +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCompressor; +import com.mongodb.MongoException; import com.mongodb.client.*; import com.mongodb.client.model.BulkWriteOptions; -import com.mongodb.client.model.IndexOptions; import com.mongodb.client.model.InsertOneModel; import com.mongodb.client.model.WriteModel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bson.BsonDocument; -import org.bson.BsonString; import org.bson.BsonValue; import org.bson.Document; import org.bson.codecs.BsonArrayCodec; @@ -22,9 +22,13 @@ import org.replicadb.manager.util.BandwidthThrottling; import org.replicadb.rowset.MongoDBRowSetImpl; -import java.sql.*; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import static org.replicadb.manager.SupportedManagers.MONGODB; @@ -33,8 +37,6 @@ public class MongoDBManager extends SqlManager { private static final Logger LOG = LogManager.getLogger(MongoDBManager.class.getName()); - private static final String MONGO_LIMIT = "limit"; - private static final String MONGO_SKIP = "skip"; private MongoClient sourceMongoClient; private MongoClient sinkMongoClient; @@ -112,6 +114,11 @@ protected Connection makeSinkConnection () throws SQLException { public ResultSet readTable (String tableName, String[] columns, int nThread) throws SQLException { // If table name parameter is null get it from options String collectionName = tableName == null ? this.options.getSourceTable() : tableName; + // if the chunk size is 0 and the current job is greater than 0, return null + if (chunkSize == 0 && nThread > 0) { + return null; + } + long skip = nThread * chunkSize; mongoDbResultSet = new MongoDBRowSetImpl(); @@ -130,14 +137,14 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr if (this.options.getJobs() == nThread + 1) { // add skip to the pipeline - pipeline.add(BsonDocument.parse("{ $skip: "+skip+" }")); + pipeline.add(BsonDocument.parse("{ $skip: " + skip + " }")); } else { // add skip and limit to the pipeline - pipeline.add(BsonDocument.parse("{ $skip: "+skip+" }")); - pipeline.add(BsonDocument.parse("{ $limit: "+chunkSize+" }")); + pipeline.add(BsonDocument.parse("{ $skip: " + skip + " }")); + pipeline.add(BsonDocument.parse("{ $limit: " + chunkSize + " }")); } - LOG.info("Using this aggregation query to get data from MongoDB: {}", pipeline); + LOG.info("{}: Using this aggregation query to get data from MongoDB: {}",Thread.currentThread().getName(), pipeline); // create a MongoCursor to iterate over the results cursor = collection.aggregate(pipeline).allowDiskUse(true).cursor(); firstDocument = collection.aggregate(pipeline).allowDiskUse(true).first(); @@ -149,24 +156,24 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr if (options.getSourceWhere() != null && !options.getSourceWhere().isEmpty()) { BsonDocument filter = BsonDocument.parse(options.getSourceWhere()); findIterable.filter(filter); - LOG.info("Using this clause to filter data from MongoDB: {}", filter.toJson()); + LOG.info("{}: Using this clause to filter data from MongoDB: {}",Thread.currentThread().getName(), filter.toJson()); } // Source Fields if (options.getSourceColumns() != null && !options.getSourceColumns().isEmpty()) { BsonDocument projection = BsonDocument.parse(options.getSourceColumns()); findIterable.projection(projection); - LOG.info("Using this clause to project data from MongoDB: {}", projection.toJson()); + LOG.info("{}: Using this clause to project data from MongoDB: {}", Thread.currentThread().getName(), projection.toJson()); } if (this.options.getJobs() == nThread + 1) { // add skip to the pipeline findIterable.skip((int) skip); - LOG.info("Using this clause to skip data from MongoDB: {}", skip); + LOG.info("{}: Skip {} data from source",Thread.currentThread().getName(), skip); } else { // add skip and limit to the pipeline findIterable.skip(Math.toIntExact(skip)); findIterable.limit(Math.toIntExact(chunkSize)); - LOG.info("Using this clause to skip and limit data from MongoDB: {} {}", skip, chunkSize); + LOG.info("{}: Skip {}, Limit {} data from source", Thread.currentThread().getName(), skip, chunkSize); } findIterable.batchSize(options.getFetchSize()); @@ -181,7 +188,7 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr mongoDbResultSet.setMongoCursor(cursor); } catch (MongoException me) { - LOG.error("MongoDB error: {}", me.getMessage(), me); + LOG.error("{}: MongoDB error: {}", Thread.currentThread().getName(), me.getMessage(), me); } return mongoDbResultSet; } @@ -216,7 +223,7 @@ public int insertDataToTable (ResultSet resultSet, int taskId) throws Exception // unordered bulk write BulkWriteOptions bulkWriteOptions = new BulkWriteOptions().ordered(false); - if (resultSet.next()) { + if (resultSet != null && resultSet.next()) { // Create Bandwidth Throttling BandwidthThrottling bt = new BandwidthThrottling(options.getBandwidthThrottling(), options.getFetchSize(), resultSet); do { @@ -295,11 +302,9 @@ public void preSourceTasks () throws Exception { } // set chunk size for each task - this.chunkSize = Math.abs( totalRows / this.options.getJobs()); - LOG.info("Total rows: {}, chunk size: {}", totalRows, this.chunkSize); - + this.chunkSize = Math.abs(totalRows / this.options.getJobs()); + LOG.info("Source collection total rows: {}, chunk size per job: {}", totalRows, this.chunkSize); } - } @Override @@ -385,7 +390,9 @@ public void close () throws SQLException { mongoDbResultSet.setFetchSize(0); mongoDbResultSet.close(); MongoCursor cursor = mongoDbResultSet.getCursor(); - cursor.close(); + if (cursor != null) { + cursor.close(); + } } // Close connection, ignore exceptions diff --git a/src/main/java/org/replicadb/rowset/MongoDBRowSetImpl.java b/src/main/java/org/replicadb/rowset/MongoDBRowSetImpl.java index 1a9aa22..221f303 100644 --- a/src/main/java/org/replicadb/rowset/MongoDBRowSetImpl.java +++ b/src/main/java/org/replicadb/rowset/MongoDBRowSetImpl.java @@ -4,6 +4,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bson.Document; +import org.bson.json.Converter; +import org.bson.json.JsonWriterSettings; +import org.bson.json.StrictJsonWriter; import org.bson.types.Binary; import javax.sql.RowSetMetaData; @@ -11,8 +14,12 @@ import java.math.BigDecimal; import java.sql.SQLException; import java.sql.Types; +import java.time.Instant; +import java.time.ZoneId; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -41,7 +48,7 @@ public void execute () throws SQLException { List fields = new ArrayList<>(); // if the sink database is mongodb, the document will be inserted as is - if (this.isSourceAndSinkMongo) { + if (Boolean.TRUE.equals(this.isSourceAndSinkMongo)) { rsmd.setColumnCount(1); rsmd.setColumnName(1, "document"); rsmd.setColumnType(1, java.sql.Types.OTHER); @@ -119,8 +126,10 @@ static int getSqlType (String typeString) { case "class org.bson.types.Binary": return java.sql.Types.BINARY; case "class java.util.List": + case "class java.util.ArrayList": return java.sql.Types.ARRAY; case "class org.bson.Document": + return Types.STRUCT; case "class org.bson.types.ObjectId": case "class java.lang.Object": default: @@ -202,7 +211,7 @@ private void readData () throws SQLException { break; case Types.TIMESTAMP_WITH_TIMEZONE: if (document.getDate(columnName) == null) updateNull(j + 1); - // convert to offsetDateTime + // convert to offsetDateTime else updateObject(j + 1, document.getDate(columnName).toInstant().atOffset(ZoneOffset.UTC)); break; case Types.BINARY: @@ -215,19 +224,16 @@ private void readData () throws SQLException { if (document.getBoolean(columnName) == null) updateNull(j + 1); else updateBoolean(j + 1, document.getBoolean(columnName)); break; - case Types.ARRAY: - if (document.get(columnName) == null) updateNull(j + 1); - // convert to java.sql.Array - else updateString(j + 1, document.get(columnName).toString()); - break; default: - if (document.getString(columnName) == null) updateNull(j + 1); - else updateString(j + 1, document.getString(columnName)); + String json = documentToJson(document.get(columnName, org.bson.Document.class)); + if (json == null) updateNull(j + 1); + else updateString(j + 1, json); break; } } } insertRow(); + document.clear(); } } catch (Exception e) { LOG.error("MongoDB error: {}", e.getMessage(), e); @@ -239,16 +245,22 @@ private void readData () throws SQLException { beforeFirst(); } + public static class JsonDateTimeConverter implements Converter { + static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ISO_INSTANT.withZone(ZoneId.of("UTC")); + @Override + public void convert (Long value, StrictJsonWriter writer) { + Instant instant = new Date(value).toInstant(); + String s = DATE_TIME_FORMATTER.format(instant); + writer.writeString(s); + } + } - /** - * Checks if the value is empty or null and return a null object - * - * @param value - * @return - */ - private String getStringOrNull (String value) { - if (value == null || value.isEmpty()) value = null; - return value; + private String documentToJson (Document document) { + if (document == null) return null; + return document.toJson(JsonWriterSettings + .builder() + .dateTimeConverter(new JsonDateTimeConverter()) + .build()); } public void setMongoCursor (MongoCursor cursor) {