diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index a6f1e014..31a24eba 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -441,8 +441,7 @@ public void run() { BSONTimestamp lastTimestamp = null; BulkRequestBuilder bulk = client.prepareBulk(); - // 1. Attempt to fill as much of the bulk request as - // possible + // 1. Attempt to fill as much of the bulk request as possible QueueEntry entry = stream.take(); lastTimestamp = processBlockingQueue(bulk, entry); while ((entry = stream.poll(definition.getBulkTimeout() @@ -455,8 +454,7 @@ public void run() { // 2. Update the timestamp if (lastTimestamp != null) { - updateLastTimestamp(mongoOplogNamespace, lastTimestamp, - bulk); + updateLastTimestamp(mongoOplogNamespace, lastTimestamp, bulk); } // 3. Execute the bulk requests @@ -491,7 +489,7 @@ public void run() { private BSONTimestamp processBlockingQueue( final BulkRequestBuilder bulk, QueueEntry entry) { if (entry.getData().get(MONGODB_ID_FIELD) == null - && !entry.getOplogOperation().equals( + && !entry.getOperation().equals( OPLOG_COMMAND_OPERATION)) { logger.warn( "Cannot get object id. Skip the current item: [{}]", @@ -500,7 +498,7 @@ private BSONTimestamp processBlockingQueue( } BSONTimestamp lastTimestamp = entry.getOplogTimestamp(); - String operation = entry.getOplogOperation(); + String operation = entry.getOperation(); if (OPLOG_COMMAND_OPERATION.equals(operation)) { try { updateBulkRequest(bulk, entry.getData(), null, operation, @@ -731,7 +729,7 @@ private BSONTimestamp applyAdvancedTransformation( final BulkRequestBuilder bulk, QueueEntry entry) { BSONTimestamp lastTimestamp = entry.getOplogTimestamp(); - String operation = entry.getOplogOperation(); + String operation = entry.getOperation(); String objectId = ""; if (entry.getData().get(MONGODB_ID_FIELD) != null) { objectId = entry.getData().get(MONGODB_ID_FIELD).toString(); @@ -949,22 +947,51 @@ public void run() { // slurpedCollection } - DBCursor oplogCursor = null; - try { - oplogCursor = oplogCursor(null); - if (oplogCursor == null) { - oplogCursor = processFullCollection(); + DBCursor cursor = null; + + BSONTimestamp startTimestamp = null; + + // Do an initial sync the same way MongoDB does + // https://groups.google.com/forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns + // TODO: support gridfs + if (!definition.isMongoGridFS()) { + BSONTimestamp lastIndexedTimestamp = getLastTimestamp(mongoOplogNamespace); + if (lastIndexedTimestamp == null) { + // TODO: ensure the index type is empty + logger.info("MongoDBRiver is beginning initial import of " + + slurpedCollection.getFullName()); + startTimestamp = getCurrentOplogTimestamp(); + try { + cursor = slurpedCollection.find(); + while (cursor.hasNext()) { + DBObject object = cursor.next(); + Map map = applyFieldFilter(object).toMap(); + addToStream(OPLOG_INSERT_OPERATION, null, map); + } + } finally { + if (cursor != null) { + logger.trace("Closing initial import cursor"); + cursor.close(); + } + } } + } - while (oplogCursor.hasNext()) { - DBObject item = oplogCursor.next(); + // Slurp from oplog + try { + cursor = oplogCursor(startTimestamp); + if (cursor == null) { + cursor = processFullOplog(); + } + while (cursor.hasNext()) { + DBObject item = cursor.next(); processOplogEntry(item); } Thread.sleep(500); } finally { - if (oplogCursor != null) { - logger.trace("Closing oplogCursor cursor"); - oplogCursor.close(); + if (cursor != null) { + logger.trace("Closing oplog cursor"); + cursor.close(); } } } catch (MongoInterruptedException mIEx) { @@ -1058,19 +1085,18 @@ private boolean assignCollections() { return true; } - - /* - * Remove fscynlock and unlock - - * https://github.com/richardwilly98/elasticsearch - * -river-mongodb/issues/17 - */ - private DBCursor processFullCollection() throws InterruptedException { - BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection - .find() - .sort(new BasicDBObject(OPLOG_TIMESTAMP, -1)) - .limit(1) - .next() - .get(OPLOG_TIMESTAMP); + + private BSONTimestamp getCurrentOplogTimestamp() { + return (BSONTimestamp) oplogCollection + .find() + .sort(new BasicDBObject(OPLOG_TIMESTAMP, -1)) + .limit(1) + .next() + .get(OPLOG_TIMESTAMP); + } + + private DBCursor processFullOplog() throws InterruptedException { + BSONTimestamp currentTimestamp = getCurrentOplogTimestamp(); addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null); return oplogCursor(currentTimestamp); } @@ -1181,9 +1207,7 @@ private String getObjectIdFromOplogEntry(DBObject entry) { return null; } - private DBObject getIndexFilter(final BSONTimestamp timestampOverride) { - BSONTimestamp time = timestampOverride == null ? getLastTimestamp(mongoOplogNamespace) - : timestampOverride; + private DBObject getOplogFilter(final BSONTimestamp time) { BasicDBObject filter = new BasicDBObject(); List values2 = new ArrayList(); @@ -1239,7 +1263,9 @@ private DBObject getMongoFilter() { } private DBCursor oplogCursor(final BSONTimestamp timestampOverride) { - DBObject indexFilter = getIndexFilter(timestampOverride); + BSONTimestamp time = timestampOverride == null + ? getLastTimestamp(mongoOplogNamespace) : timestampOverride; + DBObject indexFilter = getOplogFilter(time); if (indexFilter == null) { return null; } @@ -1392,31 +1418,41 @@ private void updateLastTimestamp(final String namespace, protected static class QueueEntry { - private BSONTimestamp oplogTimestamp; - private String oplogOperation; private Map data; + private String operation; + private BSONTimestamp oplogTimestamp; public QueueEntry( - BSONTimestamp oplogTimestamp, - String oplogOperation, Map data) { - this.oplogTimestamp = oplogTimestamp; - this.oplogOperation = oplogOperation; this.data = data; + this.operation = OPLOG_INSERT_OPERATION; } - public BSONTimestamp getOplogTimestamp() { - return oplogTimestamp; + public QueueEntry( + BSONTimestamp oplogTimestamp, + String oplogOperation, + Map data) { + this.data = data; + this.operation = oplogOperation; + this.oplogTimestamp = oplogTimestamp; } - public String getOplogOperation() { - return oplogOperation; + public boolean isOplogEntry() { + return oplogTimestamp != null; } public Map getData() { return data; } + public String getOperation() { + return operation; + } + + public BSONTimestamp getOplogTimestamp() { + return oplogTimestamp; + } + } } diff --git a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java index c90bb1cc..e0d9f140 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java @@ -51,9 +51,10 @@ public class RiverMongoWithGridFSTest extends RiverMongoDBTestAbstract { private DBCollection mongoCollection; protected RiverMongoWithGridFSTest() { - super("testgridfs-" + System.currentTimeMillis(), "testgridfs-" - + System.currentTimeMillis(), "fs", "testattachmentindex-" - + System.currentTimeMillis()); + super("testgridfs-" + System.currentTimeMillis(), + "testgridfs-" + System.currentTimeMillis(), + "fs", + "testattachmentindex-" + System.currentTimeMillis()); } @BeforeClass diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java new file mode 100644 index 00000000..d0d2ad7b --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.river.mongodb.simple; + +import static org.elasticsearch.client.Requests.countRequest; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; + +@Test +public class RiverMongoInitialImportTest extends RiverMongoDBTestAbstract { + + private DB mongoDB; + private DBCollection mongoCollection; + + protected RiverMongoInitialImportTest() { + super("testmongodb-" + System.currentTimeMillis(), + "testriver-" + System.currentTimeMillis(), + "person-" + System.currentTimeMillis(), + "personindex-" + System.currentTimeMillis()); + } + + private void createDatabase() { + logger.debug("createDatabase {}", getDatabase()); + try { + mongoDB = getMongo().getDB(getDatabase()); + mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); + logger.info("Start createCollection"); + mongoCollection = mongoDB.createCollection(getCollection(), null); + Assert.assertNotNull(mongoCollection); + } catch (Throwable t) { + logger.error("createDatabase failed.", t); + } + } + + private void createRiver() throws Exception { + super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); + } + + private void cleanUp() { + super.deleteRiver(); + logger.info("Drop database " + mongoDB.getName()); + mongoDB.dropDatabase(); + } + + @Test + public void InitialImport() throws Throwable { + logger.debug("Start InitialImport"); + try { + createDatabase(); + + DBObject dbObject1 = new BasicDBObject(ImmutableMap.of("name", "Richard")); + WriteResult result1 = mongoCollection.insert(dbObject1); + logger.info("WriteResult: {}", result1.toString()); + Thread.sleep(wait); + + createRiver(); + Thread.sleep(wait); + + ActionFuture response = getNode().client() + .admin().indices() + .exists(new IndicesExistsRequest(getIndex())); + assertThat(response.actionGet().isExists(), equalTo(true)); + refreshIndex(); + CountResponse countResponse = getNode() + .client() + .count(countRequest(getIndex())).actionGet(); + assertThat(countResponse.getCount(), equalTo(1l)); + + DBObject dbObject2 = new BasicDBObject(ImmutableMap.of("name", "Ben")); + WriteResult result2 = mongoCollection.insert(dbObject2); + logger.info("WriteResult: {}", result2.toString()); + Thread.sleep(wait); + + refreshIndex(); + CountResponse countResponse2 = getNode() + .client() + .count(countRequest(getIndex())).actionGet(); + assertThat(countResponse2.getCount(), equalTo(2l)); + + mongoCollection.remove(dbObject1, WriteConcern.REPLICAS_SAFE); + + Thread.sleep(wait); + refreshIndex(); + countResponse = getNode() + .client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1L)); + + } catch (Throwable t) { + logger.error("InitialImport failed.", t); + t.printStackTrace(); + throw t; + } finally { + cleanUp(); + } + } + +}