Skip to content

Commit

Permalink
Fix for #101
Browse files Browse the repository at this point in the history
- New parameter options/include_collection can be used to insert the
collection name in the document indexed by ES
  • Loading branch information
richardwilly98 committed Jul 16, 2013
1 parent 3d90733 commit 94ed033
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 0 deletions.
1 change: 1 addition & 0 deletions resources/issues/101/01_create-river.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl -XPUT "http://localhost:9200/_river/river101/_meta" -d @mongodb-river-simple.json
3 changes: 3 additions & 0 deletions resources/issues/101/02_test-issue-101.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
%MONGO_HOME%\bin\mongo < test-issue-101.js
pause
curl -XGET localhost:9200/mydb101/_search?q=collection:mycollec101
14 changes: 14 additions & 0 deletions resources/issues/101/mongodb-river-simple.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "mongodb",
"mongodb": {
"db": "mydb101",
"collection": "mycollec101",
"options": {
"include_collection": "collection"
}
},
"index": {
"name": "mydb101",
"type": "type101"
}
}
5 changes: 5 additions & 0 deletions resources/issues/101/test-issue-101.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use mydb101
var o = {
'name': 'issue101'
}
db.mycollec101.save(o)
10 changes: 10 additions & 0 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String SSL_VERIFY_CERT_FIELD = "ssl_verify_certificate";
public final static String DROP_COLLECTION_FIELD = "drop_collection";
public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields";
public final static String INCLUDE_COLLECTION_FIELD = "include_collection";
public final static String FILTER_FIELD = "filter";
public final static String CREDENTIALS_FIELD = "credentials";
public final static String USER_FIELD = "user";
Expand Down Expand Up @@ -187,6 +188,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected final int throttleSize;
protected final boolean dropCollection;
protected final Set<String> excludeFields;
protected final String includeCollection;

private final ExecutableScript script;

Expand Down Expand Up @@ -271,6 +273,8 @@ public MongoDBRiver(final RiverName riverName,
mongoSSLVerifyCertificate = XContentMapValues.nodeBooleanValue(
mongoOptionsSettings.get(SSL_VERIFY_CERT_FIELD), true);

includeCollection = XContentMapValues.nodeStringValue(
mongoOptionsSettings.get(INCLUDE_COLLECTION_FIELD), "");
if (mongoOptionsSettings.containsKey(EXCLUDE_FIELDS_FIELD)) {
excludeFields = new HashSet<String>();
Object excludeFieldsSettings = mongoOptionsSettings
Expand All @@ -293,6 +297,7 @@ public MongoDBRiver(final RiverName riverName,
} else {
mongoSecondaryReadPreference = false;
dropCollection = false;
includeCollection = "";
excludeFields = null;
mongoUseSSL = false;
mongoSSLVerifyCertificate = false;
Expand Down Expand Up @@ -398,6 +403,7 @@ public MongoDBRiver(final RiverName riverName,
// mongoDbPassword = "";
script = null;
dropCollection = false;
includeCollection = "";
excludeFields = null;
mongoUseSSL = false;
mongoSSLVerifyCertificate = false;
Expand Down Expand Up @@ -755,6 +761,10 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
logger.debug("updateBulkRequest for id: [{}], operation: [{}]",
objectId, operation);
}

if (!includeCollection.isEmpty()) {
data.put(includeCollection, mongoCollection);
}

Map<String, Object> ctx = null;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package test.elasticsearch.plugin.river.mongodb.simple;

import static org.elasticsearch.client.Requests.countRequest;
import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath;
import static org.elasticsearch.index.query.QueryBuilders.fieldQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.count.CountResponse;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import test.elasticsearch.plugin.river.mongodb.RiverMongoDBTestAsbtract;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.util.JSON;

@Test
public class RiverMongoIncludeCollectionTest extends RiverMongoDBTestAsbtract {

private static final String TEST_SIMPLE_MONGODB_RIVER_INCLUDE_COLLECTION_JSON = "/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river-include-collection.json";
private DB mongoDB;
private DBCollection mongoCollection;
private String includeCollectionOption = "mycollection";

protected RiverMongoIncludeCollectionTest() {
super("include-river-" + System.currentTimeMillis(), "include-river-"
+ System.currentTimeMillis(), "include-collection"
+ System.currentTimeMillis(), "include-index-"
+ System.currentTimeMillis());
}

protected RiverMongoIncludeCollectionTest(String river, String database,
String collection, String index) {
super(river, database, collection, index);
}

@BeforeClass
public void createDatabase() {
logger.debug("createDatabase {}", getDatabase());
try {
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
super.createRiver(
TEST_SIMPLE_MONGODB_RIVER_INCLUDE_COLLECTION_JSON,
getRiver(), (Object) String.valueOf(getMongoPort1()),
(Object) String.valueOf(getMongoPort2()),
(Object) String.valueOf(getMongoPort3()),
(Object) includeCollectionOption, (Object) getDatabase(),
(Object) getCollection(), (Object) getIndex(),
(Object) getDatabase());
logger.info("Start createCollection");
mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
}
}

@AfterClass
public void cleanUp() {
super.deleteRiver();
logger.info("Drop database " + mongoDB.getName());
mongoDB.dropDatabase();
}

@Test
public void testIncludeCollection() throws Throwable {
logger.debug("Start testIncludeCollection");
try {
String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));
assertThat(
getNode().client().admin().indices()
.prepareTypesExists(getIndex())
.setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(true));

String collectionName = mongoCollection.getName();

refreshIndex();

CountResponse countResponse = getNode()
.client()
.count(countRequest(getIndex())
.query(fieldQuery(includeCollectionOption, collectionName))).actionGet();
assertThat(countResponse.getCount(), equalTo(1L));
} catch (Throwable t) {
logger.error("testIncludeCollection failed.", t);
t.printStackTrace();
throw t;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"type": "mongodb",
"mongodb": {
"servers": [{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
}],
"options": {
"secondary_read_preference": true,
"include_collection": "%s"
},
"db": "%s",
"collection": "%s",
"gridfs": false
},
"index": {
"name": "%s",
"type": "%s",
"throttle_size": 2000
}
}

1 comment on commit 94ed033

@buildhive
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Richard Louapre » elasticsearch-river-mongodb #41 UNSTABLE
Looks like this commit caused a build failure
(what's this?)

Please sign in to comment.