Skip to content

Commit

Permalink
Implement include_fields (#119)
Browse files Browse the repository at this point in the history
- include_fields works the same way as exclude_fields.
- These 2 settings are mutually exclusive (exclude_fields has the
priority).
  • Loading branch information
richardwilly98 committed Sep 10, 2013
1 parent 1eb8f55 commit 2611238
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 5 deletions.
21 changes: 17 additions & 4 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ public MongoDBRiver(final RiverName riverName,
for (String key : definition.getExcludeFields()) {
findKeys.put(key, 0);
}
} else if (definition.getIncludeFields() != null) {
// if (! definition.getIncludeCollection().contains(MONGODB_ID_FIELD)) {
// findKeys.put(MONGODB_ID_FIELD, 1);
// }
for (String key : definition.getIncludeFields()) {
findKeys.put(key, 1);
}
}
mongoOplogNamespace = definition.getMongoDb() + "."
+ definition.getMongoCollection();
Expand Down Expand Up @@ -1306,8 +1313,7 @@ private void processOplogEntry(final DBObject entry)
throw new NullPointerException(MONGODB_ID_FIELD);
}
logger.info("Add attachment: {}", objectId);
object = MongoDBHelper.applyExcludeFields(object,
definition.getExcludeFields());
object = applyFieldFilter(object);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put(IS_MONGODB_ATTACHMENT, true);
data.put(MONGODB_ATTACHMENT, object);
Expand All @@ -1319,13 +1325,20 @@ private void processOplogEntry(final DBObject entry)
logger.debug("Updated item: {}", update);
addQueryToStream(operation, oplogTimestamp, update);
} else {
object = MongoDBHelper.applyExcludeFields(object,
definition.getExcludeFields());
object = applyFieldFilter(object);
addToStream(operation, oplogTimestamp, object.toMap());
}
}
}

private DBObject applyFieldFilter(DBObject object) {
object = MongoDBHelper.applyExcludeFields(object,
definition.getExcludeFields());
object = MongoDBHelper.applyIncludeFields(object,
definition.getIncludeFields());
return object;
}

/*
* Extract "_id" from "o" if it fails try to extract from "o2"
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class MongoDBRiverDefinition {
public final static String SSL_VERIFY_CERT_FIELD = "ssl_verify_certificate";
public final static String DROP_COLLECTION_FIELD = "drop_collection";
public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields";
public final static String INCLUDE_FIELDS_FIELD = "include_fields";
public final static String INCLUDE_COLLECTION_FIELD = "include_collection";
public final static String INITIAL_TIMESTAMP_FIELD = "initial_timestamp";
public final static String INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD = "script_type";
Expand Down Expand Up @@ -83,6 +84,7 @@ public class MongoDBRiverDefinition {
private final boolean mongoSSLVerifyCertificate;
private final boolean dropCollection;
private final Set<String> excludeFields;
private final Set<String> includeFields;
private final String includeCollection;
private final BSONTimestamp initialTimestamp;
private final String script;
Expand Down Expand Up @@ -115,6 +117,7 @@ public static class Builder {
private boolean mongoSSLVerifyCertificate = false;
private boolean dropCollection = false;
private Set<String> excludeFields = null;
private Set<String> includeFields = null;
private String includeCollection = "";
private BSONTimestamp initialTimestamp = null;
private String script = null;
Expand Down Expand Up @@ -201,6 +204,11 @@ public Builder excludeFields(Set<String> excludeFields) {
return this;
}

public Builder includeFields(Set<String> includeFields) {
this.includeFields = includeFields;
return this;
}

public Builder includeCollection(String includeCollection) {
this.includeCollection = includeCollection;
return this;
Expand Down Expand Up @@ -352,7 +360,29 @@ public static MongoDBRiverDefinition parseSettings(

builder.includeCollection(XContentMapValues.nodeStringValue(
mongoOptionsSettings.get(INCLUDE_COLLECTION_FIELD), ""));
if (mongoOptionsSettings.containsKey(EXCLUDE_FIELDS_FIELD)) {

if (mongoOptionsSettings.containsKey(INCLUDE_FIELDS_FIELD)) {
Set<String> includeFields = new HashSet<String>();
Object includeFieldsSettings = mongoOptionsSettings
.get(INCLUDE_FIELDS_FIELD);
logger.info("includeFieldsSettings: "
+ includeFieldsSettings);
boolean array = XContentMapValues
.isArray(includeFieldsSettings);

if (array) {
ArrayList<String> fields = (ArrayList<String>) includeFieldsSettings;
for (String field : fields) {
logger.info("Field: " + field);
includeFields.add(field);
}
}

if (! includeFields.contains(MongoDBRiver.MONGODB_ID_FIELD)) {
includeFields.add(MongoDBRiver.MONGODB_ID_FIELD);
}
builder.includeFields(includeFields);
} else if (mongoOptionsSettings.containsKey(EXCLUDE_FIELDS_FIELD)) {
Set<String> excludeFields = new HashSet<String>();
Object excludeFieldsSettings = mongoOptionsSettings
.get(EXCLUDE_FIELDS_FIELD);
Expand All @@ -371,6 +401,7 @@ public static MongoDBRiverDefinition parseSettings(

builder.excludeFields(excludeFields);
}

if (mongoOptionsSettings.containsKey(INITIAL_TIMESTAMP_FIELD)) {
BSONTimestamp timeStamp = null;
try {
Expand Down Expand Up @@ -542,6 +573,7 @@ private MongoDBRiverDefinition(final Builder builder) {
this.mongoSSLVerifyCertificate = builder.mongoSSLVerifyCertificate;
this.dropCollection = builder.dropCollection;
this.excludeFields = builder.excludeFields;
this.includeFields = builder.includeFields;
this.includeCollection = builder.includeCollection;
this.initialTimestamp = builder.initialTimestamp;
this.script = builder.script;
Expand Down Expand Up @@ -614,6 +646,10 @@ public Set<String> getExcludeFields() {
return excludeFields;
}

public Set<String> getIncludeFields() {
return includeFields;
}

public String getIncludeCollection() {
return includeCollection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;

import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.gridfs.GridFSDBFile;

Expand Down Expand Up @@ -114,6 +115,36 @@ public static DBObject applyExcludeFields(DBObject bsonObject,
return filteredObject;
}

public static DBObject applyIncludeFields(DBObject bsonObject,
final Set<String> includeFields) {
if (includeFields == null) {
return bsonObject;
}

DBObject filteredObject = new BasicDBObject();

for (String field : bsonObject.keySet()) {
if (includeFields.contains(field)) {
filteredObject.put(field, bsonObject.get(field));
}
}

for (String field : includeFields) {
if (field.contains(".")) {
String rootObject = field.substring(0, field.indexOf("."));
String childObject = field.substring(field.indexOf(".") + 1);
Object object = bsonObject.get(rootObject);
if (object instanceof DBObject) {
DBObject object2 = (DBObject) object;
object2 = applyIncludeFields(object2, new HashSet<String>(
Arrays.asList(childObject)));
filteredObject.put(rootObject, object2);
}
}
}
return filteredObject;
}

public static String getRiverVersion() {
String version = "Undefined";
if (MongoDBHelper.class.getPackage() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,29 @@ public void testExcludeFields() {
}
}

public void testIncludeFields() {
try {
Set<String> includeFields = new HashSet<String>(Arrays.asList(
"lastName", "hobbies", "address.street"));
// test-exclude-fields-document.json
String mongoDocument = copyToStringFromClasspath("/test/elasticsearch/plugin/river/mongodb/test-exclude-fields-document.json");
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
logger.debug("Initial BSON object: {}", dbObject);
DBObject filteredObject = MongoDBHelper.applyIncludeFields(dbObject,
includeFields);
logger.debug("Filtered BSON object: {}", filteredObject);
Assert.assertNotNull(filteredObject);
Assert.assertFalse(filteredObject.containsField("firstName"));
Assert.assertTrue(filteredObject.containsField("lastName"));
Assert.assertTrue(filteredObject.containsField("hobbies"));
Assert.assertTrue(filteredObject.containsField("address"));
Assert.assertTrue(((DBObject) filteredObject.get("address"))
.containsField("street"));
Assert.assertFalse(((DBObject) filteredObject.get("address"))
.containsField("apartment"));
} catch (Throwable t) {
logger.error("testIncludeFields failed", t);
Assert.fail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public abstract class RiverMongoDBTestAsbtract {
public static final String TEST_MONGODB_RIVER_GRIDFS_JSON = "/test/elasticsearch/plugin/river/mongodb/gridfs/test-gridfs-mongodb-river.json";
public static final String TEST_MONGODB_RIVER_WITH_SCRIPT_JSON = "/test/elasticsearch/plugin/river/mongodb/script/test-mongodb-river-with-script.json";
public static final String TEST_MONGODB_RIVER_EXCLUDE_FIELDS_JSON = "/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river-exclude-fields.json";
public static final String TEST_MONGODB_RIVER_INCLUDE_FIELDS_JSON = "/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river-include-fields.json";
public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/test/elasticsearch/plugin/river/mongodb/script/test-simple-mongodb-document.json";

protected final ESLogger logger = Loggers.getLogger(getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package test.elasticsearch.plugin.river.mongodb.simple;

import static org.elasticsearch.index.query.QueryBuilders.fieldQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.util.Map;

import org.bson.types.ObjectId;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import test.elasticsearch.plugin.river.mongodb.RiverMongoDBTestAsbtract;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;

@Test
public class RiverMongoIncludeFieldsTest extends RiverMongoDBTestAsbtract {

private DB mongoDB;
private DBCollection mongoCollection;
protected boolean dropCollectionOption = true;

protected RiverMongoIncludeFieldsTest() {
super("include-fields-river-" + System.currentTimeMillis(),
"include-fields-db-" + System.currentTimeMillis(),
"include-fields-collection-" + System.currentTimeMillis(),
"include-fields-index-" + System.currentTimeMillis());
}

protected RiverMongoIncludeFieldsTest(String river, String database,
String collection, String index) {
super(river, database, collection, index);
}

@BeforeClass
public void createDatabase() {
logger.debug("createDatabase {}", getDatabase());
try {
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
super.createRiver(TEST_MONGODB_RIVER_INCLUDE_FIELDS_JSON,
getRiver(), (Object) String.valueOf(getMongoPort1()),
(Object) String.valueOf(getMongoPort2()),
(Object) String.valueOf(getMongoPort3()),
(Object) "[\"include-field-1\", \"include-field-2\"]",
(Object) getDatabase(), (Object) getCollection(),
(Object) getIndex(), (Object) getDatabase());
logger.info("Start createCollection");
mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
}
}

@AfterClass
public void cleanUp() {
super.deleteRiver();
logger.info("Drop database " + mongoDB.getName());
mongoDB.dropDatabase();
}

@Test
public void testIncludeFields() throws Throwable {
logger.debug("Start testIncludeFields");
try {
DBObject dbObject = new BasicDBObject();
dbObject.put("include-field-1", System.currentTimeMillis());
dbObject.put("include-field-2", System.currentTimeMillis());
dbObject.put("field-3", System.currentTimeMillis());
mongoCollection.insert(dbObject);
Thread.sleep(wait);
String id = dbObject.get("_id").toString();
assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));
refreshIndex();

SearchResponse sr = getNode().client().prepareSearch(getIndex())
.setQuery(fieldQuery("_id", id)).execute().actionGet();
logger.debug("SearchResponse {}", sr.toString());
long totalHits = sr.getHits().getTotalHits();
logger.debug("TotalHits: {}", totalHits);
assertThat(totalHits, equalTo(1l));

Map<String, Object> object = sr.getHits().getHits()[0]
.sourceAsMap();
assertThat(object.containsKey("include-field-1"), equalTo(true));
assertThat(object.containsKey("include-field-2"), equalTo(true));
assertThat(object.containsKey("field-3"), equalTo(false));

// Update Mongo object
dbObject = mongoCollection.findOne(new BasicDBObject("_id", new ObjectId(id)));
dbObject.put("field-4", System.currentTimeMillis());
mongoCollection.save(dbObject);
Thread.sleep(wait);

sr = getNode().client().prepareSearch(getIndex())
.setQuery(fieldQuery("_id", id)).execute().actionGet();
logger.debug("SearchResponse {}", sr.toString());
totalHits = sr.getHits().getTotalHits();
logger.debug("TotalHits: {}", totalHits);
assertThat(totalHits, equalTo(1l));

object = sr.getHits().getHits()[0].sourceAsMap();
assertThat(object.containsKey("include-field-1"), equalTo(true));
assertThat(object.containsKey("include-field-2"), equalTo(true));
assertThat(object.containsKey("field-3"), equalTo(false));
assertThat(object.containsKey("field-4"), equalTo(false));
} catch (Throwable t) {
logger.error("testIncludeFields failed.", t);
t.printStackTrace();
throw t;
}
}

}
Loading

0 comments on commit 2611238

Please sign in to comment.