Skip to content

Commit

Permalink
Merge pull request #141 from benmccann/initial-import
Browse files Browse the repository at this point in the history
First pass at support for initial import
  • Loading branch information
richardwilly98 committed Sep 26, 2013
2 parents 0faf1dd + 8b25cf1 commit 326bbeb
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 47 deletions.
124 changes: 80 additions & 44 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,7 @@ public void run() {
BSONTimestamp lastTimestamp = null;
BulkRequestBuilder bulk = client.prepareBulk();

// 1. Attempt to fill as much of the bulk request as
// possible
// 1. Attempt to fill as much of the bulk request as possible
QueueEntry entry = stream.take();
lastTimestamp = processBlockingQueue(bulk, entry);
while ((entry = stream.poll(definition.getBulkTimeout()
Expand All @@ -455,8 +454,7 @@ public void run() {

// 2. Update the timestamp
if (lastTimestamp != null) {
updateLastTimestamp(mongoOplogNamespace, lastTimestamp,
bulk);
updateLastTimestamp(mongoOplogNamespace, lastTimestamp, bulk);
}

// 3. Execute the bulk requests
Expand Down Expand Up @@ -491,7 +489,7 @@ public void run() {
private BSONTimestamp processBlockingQueue(
final BulkRequestBuilder bulk, QueueEntry entry) {
if (entry.getData().get(MONGODB_ID_FIELD) == null
&& !entry.getOplogOperation().equals(
&& !entry.getOperation().equals(
OPLOG_COMMAND_OPERATION)) {
logger.warn(
"Cannot get object id. Skip the current item: [{}]",
Expand All @@ -500,7 +498,7 @@ private BSONTimestamp processBlockingQueue(
}

BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
String operation = entry.getOperation();
if (OPLOG_COMMAND_OPERATION.equals(operation)) {
try {
updateBulkRequest(bulk, entry.getData(), null, operation,
Expand Down Expand Up @@ -731,7 +729,7 @@ private BSONTimestamp applyAdvancedTransformation(
final BulkRequestBuilder bulk, QueueEntry entry) {

BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
String operation = entry.getOperation();
String objectId = "";
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
Expand Down Expand Up @@ -949,22 +947,51 @@ public void run() {
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
DBCursor cursor = null;

BSONTimestamp startTimestamp = null;

// Do an initial sync the same way MongoDB does
// https://groups.google.com/forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns
// TODO: support gridfs
if (!definition.isMongoGridFS()) {
BSONTimestamp lastIndexedTimestamp = getLastTimestamp(mongoOplogNamespace);
if (lastIndexedTimestamp == null) {
// TODO: ensure the index type is empty
logger.info("MongoDBRiver is beginning initial import of "
+ slurpedCollection.getFullName());
startTimestamp = getCurrentOplogTimestamp();
try {
cursor = slurpedCollection.find();
while (cursor.hasNext()) {
DBObject object = cursor.next();
Map<String, Object> map = applyFieldFilter(object).toMap();
addToStream(OPLOG_INSERT_OPERATION, null, map);
}
} finally {
if (cursor != null) {
logger.trace("Closing initial import cursor");
cursor.close();
}
}
}
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
// Slurp from oplog
try {
cursor = oplogCursor(startTimestamp);
if (cursor == null) {
cursor = processFullOplog();
}
while (cursor.hasNext()) {
DBObject item = cursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
if (cursor != null) {
logger.trace("Closing oplog cursor");
cursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
Expand Down Expand Up @@ -1058,19 +1085,18 @@ private boolean assignCollections() {

return true;
}

/*
* Remove fscynlock and unlock -
* https://github.com/richardwilly98/elasticsearch
* -river-mongodb/issues/17
*/
private DBCursor processFullCollection() throws InterruptedException {
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find()
.sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1)
.next()
.get(OPLOG_TIMESTAMP);

private BSONTimestamp getCurrentOplogTimestamp() {
return (BSONTimestamp) oplogCollection
.find()
.sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1)
.next()
.get(OPLOG_TIMESTAMP);
}

private DBCursor processFullOplog() throws InterruptedException {
BSONTimestamp currentTimestamp = getCurrentOplogTimestamp();
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
}
Expand Down Expand Up @@ -1181,9 +1207,7 @@ private String getObjectIdFromOplogEntry(DBObject entry) {
return null;
}

private DBObject getIndexFilter(final BSONTimestamp timestampOverride) {
BSONTimestamp time = timestampOverride == null ? getLastTimestamp(mongoOplogNamespace)
: timestampOverride;
private DBObject getOplogFilter(final BSONTimestamp time) {
BasicDBObject filter = new BasicDBObject();
List<DBObject> values2 = new ArrayList<DBObject>();

Expand Down Expand Up @@ -1239,7 +1263,9 @@ private DBObject getMongoFilter() {
}

private DBCursor oplogCursor(final BSONTimestamp timestampOverride) {
DBObject indexFilter = getIndexFilter(timestampOverride);
BSONTimestamp time = timestampOverride == null
? getLastTimestamp(mongoOplogNamespace) : timestampOverride;
DBObject indexFilter = getOplogFilter(time);
if (indexFilter == null) {
return null;
}
Expand Down Expand Up @@ -1392,31 +1418,41 @@ private void updateLastTimestamp(final String namespace,

protected static class QueueEntry {

private BSONTimestamp oplogTimestamp;
private String oplogOperation;
private Map<String, Object> data;
private String operation;
private BSONTimestamp oplogTimestamp;

public QueueEntry(
BSONTimestamp oplogTimestamp,
String oplogOperation,
Map<String, Object> data) {
this.oplogTimestamp = oplogTimestamp;
this.oplogOperation = oplogOperation;
this.data = data;
this.operation = OPLOG_INSERT_OPERATION;
}

public BSONTimestamp getOplogTimestamp() {
return oplogTimestamp;
public QueueEntry(
BSONTimestamp oplogTimestamp,
String oplogOperation,
Map<String, Object> data) {
this.data = data;
this.operation = oplogOperation;
this.oplogTimestamp = oplogTimestamp;
}

public String getOplogOperation() {
return oplogOperation;
public boolean isOplogEntry() {
return oplogTimestamp != null;
}

public Map<String, Object> getData() {
return data;
}

public String getOperation() {
return operation;
}

public BSONTimestamp getOplogTimestamp() {
return oplogTimestamp;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ public class RiverMongoWithGridFSTest extends RiverMongoDBTestAbstract {
private DBCollection mongoCollection;

protected RiverMongoWithGridFSTest() {
super("testgridfs-" + System.currentTimeMillis(), "testgridfs-"
+ System.currentTimeMillis(), "fs", "testattachmentindex-"
+ System.currentTimeMillis());
super("testgridfs-" + System.currentTimeMillis(),
"testgridfs-" + System.currentTimeMillis(),
"fs",
"testattachmentindex-" + System.currentTimeMillis());
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.mongodb.simple;

import static org.elasticsearch.client.Requests.countRequest;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableMap;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;

@Test
public class RiverMongoInitialImportTest extends RiverMongoDBTestAbstract {

private DB mongoDB;
private DBCollection mongoCollection;

protected RiverMongoInitialImportTest() {
super("testmongodb-" + System.currentTimeMillis(),
"testriver-" + System.currentTimeMillis(),
"person-" + System.currentTimeMillis(),
"personindex-" + System.currentTimeMillis());
}

private void createDatabase() {
logger.debug("createDatabase {}", getDatabase());
try {
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
logger.info("Start createCollection");
mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
}
}

private void createRiver() throws Exception {
super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON);
}

private void cleanUp() {
super.deleteRiver();
logger.info("Drop database " + mongoDB.getName());
mongoDB.dropDatabase();
}

@Test
public void InitialImport() throws Throwable {
logger.debug("Start InitialImport");
try {
createDatabase();

DBObject dbObject1 = new BasicDBObject(ImmutableMap.of("name", "Richard"));
WriteResult result1 = mongoCollection.insert(dbObject1);
logger.info("WriteResult: {}", result1.toString());
Thread.sleep(wait);

createRiver();
Thread.sleep(wait);

ActionFuture<IndicesExistsResponse> response = getNode().client()
.admin().indices()
.exists(new IndicesExistsRequest(getIndex()));
assertThat(response.actionGet().isExists(), equalTo(true));
refreshIndex();
CountResponse countResponse = getNode()
.client()
.count(countRequest(getIndex())).actionGet();
assertThat(countResponse.getCount(), equalTo(1l));

DBObject dbObject2 = new BasicDBObject(ImmutableMap.of("name", "Ben"));
WriteResult result2 = mongoCollection.insert(dbObject2);
logger.info("WriteResult: {}", result2.toString());
Thread.sleep(wait);

refreshIndex();
CountResponse countResponse2 = getNode()
.client()
.count(countRequest(getIndex())).actionGet();
assertThat(countResponse2.getCount(), equalTo(2l));

mongoCollection.remove(dbObject1, WriteConcern.REPLICAS_SAFE);

Thread.sleep(wait);
refreshIndex();
countResponse = getNode()
.client()
.count(countRequest(getIndex())).actionGet();
logger.debug("Count after delete request: {}",
countResponse.getCount());
assertThat(countResponse.getCount(), equalTo(1L));

} catch (Throwable t) {
logger.error("InitialImport failed.", t);
t.printStackTrace();
throw t;
} finally {
cleanUp();
}
}

}

0 comments on commit 326bbeb

Please sign in to comment.