Skip to content

Commit

Permalink
Initial import with GridFS enabled #47
Browse files Browse the repository at this point in the history
- TODO: Initial import with GridFS will still to be optimized
- Directly DBObject instead of Map
- Cleanup the logic with GridFS enabled
- New unit test for initial import with GridFS
- Reduce wait time unit test from 6 sec to 2 sec
- Script filter is not used anymore when GridFS enabled
  • Loading branch information
richardwilly98 committed Sep 27, 2013
2 parents 3ca5723 + c520223 commit 220e51c
Show file tree
Hide file tree
Showing 6 changed files with 499 additions and 226 deletions.
62 changes: 41 additions & 21 deletions src/main/java/org/elasticsearch/river/mongodb/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;

import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.gridfs.GridFSDBFile;

class Indexer implements Runnable {
Expand Down Expand Up @@ -149,16 +151,29 @@ private BSONTimestamp processBlockingQueue(
return lastTimestamp;
}

if (scriptExecutable != null
&& definition.isAdvancedTransformation()) {
return applyAdvancedTransformation(bulk, entry);
}

String objectId = "";
if (entry.getData().get(MongoDBRiver.MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MongoDBRiver.MONGODB_ID_FIELD).toString();
}

// TODO: Should the river support script filter,
// advanced_transformation, include_collection for GridFS?
if (entry.isAttachment()) {
try {
updateBulkRequest(bulk, entry.getData(), objectId,
operation, definition.getIndexName(),
definition.getTypeName(), null, null);
} catch (IOException ioEx) {
logger.error("Update bulk failed.", ioEx);
}
return lastTimestamp;
}

if (scriptExecutable != null
&& definition.isAdvancedTransformation()) {
return applyAdvancedTransformation(bulk, entry);
}

if (logger.isDebugEnabled()) {
logger.debug("updateBulkRequest for id: [{}], operation: [{}]",
objectId, operation);
Expand All @@ -180,7 +195,7 @@ private BSONTimestamp processBlockingQueue(
} catch (IOException e) {
logger.warn("failed to parse {}", e);
}
Map<String, Object> data = entry.getData();
Map<String, Object> data = entry.getData().toMap();
if (scriptExecutable != null) {
if (ctx != null) {
ctx.put("document", entry.getData());
Expand Down Expand Up @@ -234,7 +249,7 @@ private BSONTimestamp processBlockingQueue(
String parent = extractParent(ctx);
String routing = extractRouting(ctx);
objectId = extractObjectId(ctx, objectId);
updateBulkRequest(bulk, data, objectId, operation, index, type,
updateBulkRequest(bulk, new BasicDBObject(data), objectId, operation, index, type,
routing, parent);
} catch (IOException e) {
logger.warn("failed to parse {}", e, entry.getData());
Expand All @@ -243,20 +258,25 @@ private BSONTimestamp processBlockingQueue(
}

private void updateBulkRequest(final BulkRequestBuilder bulk,
Map<String, Object> data, String objectId, String operation,
DBObject data, String objectId, String operation,
String index, String type, String routing, String parent)
throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(
"Operation: {} - index: {} - type: {} - routing: {} - parent: {}",
operation, index, type, routing, parent);
}
boolean isAttachment = false;

if (logger.isDebugEnabled()) {
isAttachment = (data instanceof GridFSDBFile);
}
if (MongoDBRiver.OPLOG_INSERT_OPERATION.equals(operation)) {
if (logger.isDebugEnabled()) {
logger.debug(
"Insert operation - id: {} - contains attachment: {}",
operation, objectId,
data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT));
isAttachment);
}
bulk.add(indexRequest(index).type(type).id(objectId)
.source(build(data, objectId)).routing(routing)
Expand All @@ -267,7 +287,7 @@ private void updateBulkRequest(final BulkRequestBuilder bulk,
if (logger.isDebugEnabled()) {
logger.debug(
"Update operation - id: {} - contains attachment: {}",
objectId, data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT));
objectId, isAttachment);
}
deleteBulkRequest(bulk, objectId, index, type, routing, parent);
bulk.add(indexRequest(index).type(type).id(objectId)
Expand All @@ -283,7 +303,7 @@ private void updateBulkRequest(final BulkRequestBuilder bulk,
}
if (MongoDBRiver.OPLOG_COMMAND_OPERATION.equals(operation)) {
if (definition.isDropCollection()) {
if (data.containsKey(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION)
if (data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null
&& data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(
definition.getMongoCollection())) {
logger.info("Drop collection request [{}], [{}]",
Expand Down Expand Up @@ -400,7 +420,7 @@ private BSONTimestamp applyAdvancedTransformation(
if (scriptExecutable != null) {
if (ctx != null && documents != null) {

document.put("data", entry.getData());
document.put("data", entry.getData().toMap());
if (!objectId.isEmpty()) {
document.put("id", objectId);
}
Expand Down Expand Up @@ -449,9 +469,9 @@ private BSONTimestamp applyAdvancedTransformation(
String routing = extractRouting(item);
String action = extractOperation(item);
boolean ignore = isDocumentIgnored(item);
Map<String, Object> _data = (Map<String, Object>) item
Map<String, Object> data = (Map<String, Object>) item
.get("data");
objectId = extractObjectId(_data, objectId);
objectId = extractObjectId(data, objectId);
if (logger.isDebugEnabled()) {
logger.debug(
"Id: {} - operation: {} - ignore: {} - index: {} - type: {} - routing: {} - parent: {}",
Expand All @@ -462,7 +482,7 @@ private BSONTimestamp applyAdvancedTransformation(
continue;
}
try {
updateBulkRequest(bulk, _data, objectId,
updateBulkRequest(bulk, new BasicDBObject(data), objectId,
operation, index, type, routing,
parent);
} catch (IOException ioEx) {
Expand All @@ -477,16 +497,16 @@ private BSONTimestamp applyAdvancedTransformation(
return lastTimestamp;
}

private XContentBuilder build(final Map<String, Object> data,
final String objectId) throws IOException {
if (data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)) {
@SuppressWarnings("unchecked")
private XContentBuilder build(final DBObject data, final String objectId)
throws IOException {
if (data instanceof GridFSDBFile) {
logger.info("Add Attachment: {} to index {} / type {}",
objectId, definition.getIndexName(),
definition.getTypeName());
return MongoDBHelper.serialize((GridFSDBFile) data
.get(MongoDBRiver.MONGODB_ATTACHMENT));
return MongoDBHelper.serialize((GridFSDBFile) data);
} else {
return XContentFactory.jsonBuilder().map(data);
return XContentFactory.jsonBuilder().map(data.toMap());
}
}

Expand Down
30 changes: 15 additions & 15 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.ServerAddress;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.util.JSON;

/**
* @author richardwilly98 (Richard Louapre)
* @author flaper87 (Flavio Percoco Premoli)
* @author aparo (Alberto Paro)
* @author kryptt (Rodolfo Hansen)
* @author benmccann (Ben McCann)
*/
public class MongoDBRiver extends AbstractRiverComponent implements River {

public final static String IS_MONGODB_ATTACHMENT = "is_mongodb_attachment";
public final static String MONGODB_ATTACHMENT = "mongodb_attachment";
public final static String TYPE = "mongodb";
public final static String NAME = "mongodb-river";
public final static String STATUS = "_mongodbstatus";
Expand Down Expand Up @@ -450,30 +450,30 @@ static void updateLastTimestamp(final MongoDBRiverDefinition definition,

protected static class QueueEntry {

private Map<String, Object> data;
private String operation;
private BSONTimestamp oplogTimestamp;
private final DBObject data;
private final String operation;
private final BSONTimestamp oplogTimestamp;

public QueueEntry(
Map<String, Object> data) {
this.data = data;
this.operation = OPLOG_INSERT_OPERATION;
public QueueEntry(DBObject data) {
this(null, OPLOG_INSERT_OPERATION, data);
}

public QueueEntry(
BSONTimestamp oplogTimestamp,
String oplogOperation,
Map<String, Object> data) {
public QueueEntry(BSONTimestamp oplogTimestamp, String oplogOperation,
DBObject data) {
this.data = data;
this.operation = oplogOperation;
this.oplogTimestamp = oplogTimestamp;
}

public boolean isOplogEntry() {
return oplogTimestamp != null;
return oplogTimestamp != null;
}

public boolean isAttachment() {
return (data instanceof GridFSDBFile);
}

public Map<String, Object> getData() {
public DBObject getData() {
return data;
}

Expand Down
60 changes: 36 additions & 24 deletions src/main/java/org/elasticsearch/river/mongodb/Slurper.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.elasticsearch.river.mongodb;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

Expand All @@ -30,6 +28,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSFile;
import com.mongodb.util.JSON;

class Slurper implements Runnable {
Expand Down Expand Up @@ -137,11 +136,25 @@ protected BSONTimestamp doInitialImport() throws InterruptedException {
BSONTimestamp startTimestamp = getCurrentOplogTimestamp();
DBCursor cursor = null;
try {
cursor = slurpedCollection.find();
while (cursor.hasNext()) {
DBObject object = cursor.next();
Map<String, Object> map = applyFieldFilter(object).toMap();
addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, map);
if (!definition.isMongoGridFS()) {
cursor = slurpedCollection.find();
while (cursor.hasNext()) {
DBObject object = cursor.next();
addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, applyFieldFilter(object));
}
} else {
// TODO: To be optimized. https://github.com/mongodb/mongo-java-driver/pull/48#issuecomment-25241988
// possible option: Get the object id list from .fs collection then call GriDFS.findOne
GridFS grid = new GridFS(mongo.getDB(definition.getMongoDb()),
definition.getMongoCollection());
cursor = grid.getFileList();
while (cursor.hasNext()) {
DBObject object = cursor.next();
if (object instanceof GridFSDBFile) {
GridFSDBFile file = grid.findOne(new ObjectId(object.get(MongoDBRiver.MONGODB_ID_FIELD).toString()));
addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, file);
}
}
}
} finally {
if (cursor != null) {
Expand Down Expand Up @@ -240,7 +253,6 @@ private DBCursor processFullOplog() throws InterruptedException {
return oplogCursor(currentTimestamp);
}

@SuppressWarnings("unchecked")
private void processOplogEntry(final DBObject entry)
throws InterruptedException {
String operation = entry.get(MongoDBRiver.OPLOG_OPERATION).toString();
Expand Down Expand Up @@ -300,29 +312,31 @@ private void processOplogEntry(final DBObject entry)
throw new NullPointerException(MongoDBRiver.MONGODB_ID_FIELD);
}
logger.info("Add attachment: {}", objectId);
object = applyFieldFilter(object);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put(MongoDBRiver.IS_MONGODB_ATTACHMENT, true);
data.put(MongoDBRiver.MONGODB_ATTACHMENT, object);
data.put(MongoDBRiver.MONGODB_ID_FIELD, objectId);
addToStream(operation, oplogTimestamp, data);
addToStream(operation, oplogTimestamp, applyFieldFilter(object));
} else {
if (MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(operation)) {
DBObject update = (DBObject) entry.get(MongoDBRiver.OPLOG_UPDATE);
logger.debug("Updated item: {}", update);
addQueryToStream(operation, oplogTimestamp, update);
} else {
Map<String, Object> map = applyFieldFilter(object).toMap();
addToStream(operation, oplogTimestamp, map);
addToStream(operation, oplogTimestamp, applyFieldFilter(object));
}
}
}

private DBObject applyFieldFilter(DBObject object) {
object = MongoDBHelper.applyExcludeFields(object,
definition.getExcludeFields());
object = MongoDBHelper.applyIncludeFields(object,
definition.getIncludeFields());
if (object instanceof GridFSFile) {
GridFSFile file = (GridFSFile) object;
DBObject metadata = file.getMetaData();
if (metadata != null) {
file.setMetaData(applyFieldFilter(metadata));
}
} else {
object = MongoDBHelper.applyExcludeFields(object,
definition.getExcludeFields());
object = MongoDBHelper.applyIncludeFields(object,
definition.getIncludeFields());
}
return object;
}

Expand Down Expand Up @@ -422,7 +436,6 @@ private DBCursor oplogCursor(final BSONTimestamp timestampOverride) {
.setOptions(options);
}

@SuppressWarnings("unchecked")
private void addQueryToStream(final String operation,
final BSONTimestamp currentTimestamp, final DBObject update)
throws InterruptedException {
Expand All @@ -433,13 +446,12 @@ private void addQueryToStream(final String operation,
}

for (DBObject item : slurpedCollection.find(update, findKeys)) {
addToStream(operation, currentTimestamp, item.toMap());
addToStream(operation, currentTimestamp, item);
}
}

private void addToStream(final String operation,
final BSONTimestamp currentTimestamp,
final Map<String, Object> data) throws InterruptedException {
final BSONTimestamp currentTimestamp, final DBObject data) throws InterruptedException {
if (logger.isDebugEnabled()) {
logger.debug(
"addToStream - operation [{}], currentTimestamp [{}], data [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class RiverMongoDBTestAbstract {
public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/script/test-simple-mongodb-document.json";

protected final ESLogger logger = Loggers.getLogger(getClass());
protected final static long wait = 6000;
protected final static long wait = 2000;

public static final String ADMIN_DATABASE_NAME = "admin";
public static final String LOCAL_DATABASE_NAME = "local";
Expand Down
Loading

0 comments on commit 220e51c

Please sign in to comment.