From c52022380ff99f393e2d9cf6fe3fa9f785e65792 Mon Sep 17 00:00:00 2001 From: Richard Louapre Date: Fri, 27 Sep 2013 09:09:22 -0400 Subject: [PATCH] Initial implement for initial import for GridFS - Also content refactoring when GridFS is enabled --- .../river/mongodb/MongoDBRiver.java | 162 +++++--- .../mongodb/RiverMongoDBTestAbstract.java | 2 +- ...RiverMongoWithGridFSInitialImportTest.java | 200 ++++++++++ .../gridfs/RiverMongoWithGridFSTest.java | 371 ++++++++++-------- 4 files changed, 504 insertions(+), 231 deletions(-) create mode 100644 src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index 31a24eba..a346abc4 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -86,6 +86,7 @@ import com.mongodb.ServerAddress; import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.gridfs.GridFSFile; import com.mongodb.util.JSON; /** @@ -93,11 +94,12 @@ * @author flaper87 (Flavio Percoco Premoli) * @author aparo (Alberto Paro) * @author kryptt (Rodolfo Hansen) + * @author benmccann (Ben McCann) */ public class MongoDBRiver extends AbstractRiverComponent implements River { - public final static String IS_MONGODB_ATTACHMENT = "is_mongodb_attachment"; - public final static String MONGODB_ATTACHMENT = "mongodb_attachment"; +// public final static String IS_MONGODB_ATTACHMENT = "is_mongodb_attachment"; +// public final static String MONGODB_ATTACHMENT = "mongodb_attachment"; public final static String TYPE = "mongodb"; public final static String NAME = "mongodb-river"; public final static String STATUS = "_mongodbstatus"; @@ -501,7 +503,7 @@ private BSONTimestamp processBlockingQueue( String operation = entry.getOperation(); if (OPLOG_COMMAND_OPERATION.equals(operation)) { try { - updateBulkRequest(bulk, entry.getData(), null, operation, + updateBulkRequest(bulk, entry.getData(), null, operation, definition.getIndexName(), definition.getTypeName(), null, null); } catch (IOException ioEx) { @@ -510,16 +512,29 @@ private BSONTimestamp processBlockingQueue( return lastTimestamp; } - if (scriptExecutable != null - && definition.isAdvancedTransformation()) { - return applyAdvancedTransformation(bulk, entry); - } - String objectId = ""; if (entry.getData().get(MONGODB_ID_FIELD) != null) { objectId = entry.getData().get(MONGODB_ID_FIELD).toString(); } + // TODO: Should the river support script filter, + // advanced_transformation, include_collection for GridFS? + if (entry.isAttachment()) { + try { + updateBulkRequest(bulk, entry.getData(), objectId, + operation, definition.getIndexName(), + definition.getTypeName(), null, null); + } catch (IOException ioEx) { + logger.error("Update bulk failed.", ioEx); + } + return lastTimestamp; + } + + if (scriptExecutable != null + && definition.isAdvancedTransformation()) { + return applyAdvancedTransformation(bulk, entry); + } + if (logger.isDebugEnabled()) { logger.debug("updateBulkRequest for id: [{}], operation: [{}]", objectId, operation); @@ -541,7 +556,7 @@ private BSONTimestamp processBlockingQueue( } catch (IOException e) { logger.warn("failed to parse {}", e); } - Map data = entry.getData(); + Map data = entry.getData().toMap(); if (scriptExecutable != null) { if (ctx != null) { ctx.put("document", entry.getData()); @@ -595,7 +610,7 @@ private BSONTimestamp processBlockingQueue( String parent = extractParent(ctx); String routing = extractRouting(ctx); objectId = extractObjectId(ctx, objectId); - updateBulkRequest(bulk, data, objectId, operation, index, type, + updateBulkRequest(bulk, new BasicDBObject(data), objectId, operation, index, type, routing, parent); } catch (IOException e) { logger.warn("failed to parse {}", e, entry.getData()); @@ -604,20 +619,23 @@ private BSONTimestamp processBlockingQueue( } private void updateBulkRequest(final BulkRequestBuilder bulk, - Map data, String objectId, String operation, - String index, String type, String routing, String parent) - throws IOException { + DBObject data, String objectId, String operation, String index, + String type, String routing, String parent) throws IOException { if (logger.isDebugEnabled()) { logger.debug( "Operation: {} - index: {} - type: {} - routing: {} - parent: {}", operation, index, type, routing, parent); } + boolean isAttachment = false; + + if (logger.isDebugEnabled()) { + isAttachment = (data instanceof GridFSDBFile); + } if (OPLOG_INSERT_OPERATION.equals(operation)) { if (logger.isDebugEnabled()) { logger.debug( "Insert operation - id: {} - contains attachment: {}", - operation, objectId, - data.containsKey(IS_MONGODB_ATTACHMENT)); + operation, objectId, isAttachment); } bulk.add(indexRequest(index).type(type).id(objectId) .source(build(data, objectId)).routing(routing) @@ -628,7 +646,7 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, if (logger.isDebugEnabled()) { logger.debug( "Update operation - id: {} - contains attachment: {}", - objectId, data.containsKey(IS_MONGODB_ATTACHMENT)); + objectId, isAttachment); } deleteBulkRequest(bulk, objectId, index, type, routing, parent); bulk.add(indexRequest(index).type(type).id(objectId) @@ -644,7 +662,7 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, } if (OPLOG_COMMAND_OPERATION.equals(operation)) { if (definition.isDropCollection()) { - if (data.containsKey(OPLOG_DROP_COMMAND_OPERATION) + if (data.get(OPLOG_DROP_COMMAND_OPERATION) != null && data.get(OPLOG_DROP_COMMAND_OPERATION).equals( definition.getMongoCollection())) { logger.info("Drop collection request [{}], [{}]", @@ -761,7 +779,7 @@ private BSONTimestamp applyAdvancedTransformation( if (scriptExecutable != null) { if (ctx != null && documents != null) { - document.put("data", entry.getData()); + document.put("data", entry.getData().toMap()); if (!objectId.isEmpty()) { document.put("id", objectId); } @@ -823,7 +841,7 @@ private BSONTimestamp applyAdvancedTransformation( continue; } try { - updateBulkRequest(bulk, _data, objectId, + updateBulkRequest(bulk, new BasicDBObject(_data), objectId, operation, index, type, routing, parent); } catch (IOException ioEx) { @@ -838,16 +856,15 @@ private BSONTimestamp applyAdvancedTransformation( return lastTimestamp; } - private XContentBuilder build(final Map data, - final String objectId) throws IOException { - if (data.containsKey(IS_MONGODB_ATTACHMENT)) { + private XContentBuilder build(final DBObject data, final String objectId) + throws IOException { + if (data instanceof GridFSDBFile) { logger.info("Add Attachment: {} to index {} / type {}", objectId, definition.getIndexName(), definition.getTypeName()); - return MongoDBHelper.serialize((GridFSDBFile) data - .get(MONGODB_ATTACHMENT)); + return MongoDBHelper.serialize((GridFSDBFile) data); } else { - return XContentFactory.jsonBuilder().map(data); + return XContentFactory.jsonBuilder().map(data.toMap()); } } @@ -953,27 +970,39 @@ public void run() { // Do an initial sync the same way MongoDB does // https://groups.google.com/forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns - // TODO: support gridfs - if (!definition.isMongoGridFS()) { - BSONTimestamp lastIndexedTimestamp = getLastTimestamp(mongoOplogNamespace); - if (lastIndexedTimestamp == null) { - // TODO: ensure the index type is empty - logger.info("MongoDBRiver is beginning initial import of " - + slurpedCollection.getFullName()); - startTimestamp = getCurrentOplogTimestamp(); - try { + // TODO: support GridFS + BSONTimestamp lastIndexedTimestamp = getLastTimestamp(mongoOplogNamespace); + if (lastIndexedTimestamp == null) { + // TODO: ensure the index type is empty + logger.info("MongoDBRiver is beginning initial import of " + + slurpedCollection.getFullName()); + startTimestamp = getCurrentOplogTimestamp(); + try { + if (!definition.isMongoGridFS()) { cursor = slurpedCollection.find(); while (cursor.hasNext()) { DBObject object = cursor.next(); - Map map = applyFieldFilter(object).toMap(); - addToStream(OPLOG_INSERT_OPERATION, null, map); + addToStream(OPLOG_INSERT_OPERATION, null, applyFieldFilter(object)); } - } finally { - if (cursor != null) { - logger.trace("Closing initial import cursor"); - cursor.close(); + } else { + // TODO: To be optimized. https://github.com/mongodb/mongo-java-driver/pull/48#issuecomment-25241988 + // possible option: Get the object id list from .fs collection then call GriDFS.findOne + GridFS grid = new GridFS(mongo.getDB(definition.getMongoDb()), + definition.getMongoCollection()); + cursor = grid.getFileList(); + while (cursor.hasNext()) { + DBObject object = cursor.next(); + if (object instanceof GridFSDBFile) { + GridFSDBFile file = grid.findOne(new ObjectId(object.get(MONGODB_ID_FIELD).toString())); + addToStream(OPLOG_INSERT_OPERATION, null, file); + } } } + } finally { + if (cursor != null) { + logger.trace("Closing initial import cursor"); + cursor.close(); + } } } @@ -1101,7 +1130,6 @@ private DBCursor processFullOplog() throws InterruptedException { return oplogCursor(currentTimestamp); } - @SuppressWarnings("unchecked") private void processOplogEntry(final DBObject entry) throws InterruptedException { String operation = entry.get(OPLOG_OPERATION).toString(); @@ -1161,29 +1189,31 @@ private void processOplogEntry(final DBObject entry) throw new NullPointerException(MONGODB_ID_FIELD); } logger.info("Add attachment: {}", objectId); - object = applyFieldFilter(object); - HashMap data = new HashMap(); - data.put(IS_MONGODB_ATTACHMENT, true); - data.put(MONGODB_ATTACHMENT, object); - data.put(MONGODB_ID_FIELD, objectId); - addToStream(operation, oplogTimestamp, data); + addToStream(operation, oplogTimestamp, applyFieldFilter(object)); } else { if (OPLOG_UPDATE_OPERATION.equals(operation)) { DBObject update = (DBObject) entry.get(OPLOG_UPDATE); logger.debug("Updated item: {}", update); addQueryToStream(operation, oplogTimestamp, update); } else { - Map map = applyFieldFilter(object).toMap(); - addToStream(operation, oplogTimestamp, map); + addToStream(operation, oplogTimestamp, applyFieldFilter(object)); } } } private DBObject applyFieldFilter(DBObject object) { - object = MongoDBHelper.applyExcludeFields(object, - definition.getExcludeFields()); - object = MongoDBHelper.applyIncludeFields(object, - definition.getIncludeFields()); + if (object instanceof GridFSFile) { + GridFSFile file = (GridFSFile) object; + DBObject metadata = file.getMetaData(); + if (metadata != null) { + file.setMetaData(applyFieldFilter(metadata)); + } + } else { + object = MongoDBHelper.applyExcludeFields(object, + definition.getExcludeFields()); + object = MongoDBHelper.applyIncludeFields(object, + definition.getIncludeFields()); + } return object; } @@ -1283,7 +1313,6 @@ private DBCursor oplogCursor(final BSONTimestamp timestampOverride) { .setOptions(options); } - @SuppressWarnings("unchecked") private void addQueryToStream(final String operation, final BSONTimestamp currentTimestamp, final DBObject update) throws InterruptedException { @@ -1294,13 +1323,13 @@ private void addQueryToStream(final String operation, } for (DBObject item : slurpedCollection.find(update, findKeys)) { - addToStream(operation, currentTimestamp, item.toMap()); + addToStream(operation, currentTimestamp, item); } } private void addToStream(final String operation, - final BSONTimestamp currentTimestamp, - final Map data) throws InterruptedException { + final BSONTimestamp currentTimestamp, final DBObject data) + throws InterruptedException { if (logger.isDebugEnabled()) { logger.debug( "addToStream - operation [{}], currentTimestamp [{}], data [{}]", @@ -1418,20 +1447,19 @@ private void updateLastTimestamp(final String namespace, protected static class QueueEntry { - private Map data; - private String operation; - private BSONTimestamp oplogTimestamp; + private final DBObject data; + private final String operation; + private final BSONTimestamp oplogTimestamp; public QueueEntry( - Map data) { - this.data = data; - this.operation = OPLOG_INSERT_OPERATION; + DBObject data) { + this(null, OPLOG_INSERT_OPERATION, data); } public QueueEntry( BSONTimestamp oplogTimestamp, String oplogOperation, - Map data) { + DBObject data) { this.data = data; this.operation = oplogOperation; this.oplogTimestamp = oplogTimestamp; @@ -1441,7 +1469,11 @@ public boolean isOplogEntry() { return oplogTimestamp != null; } - public Map getData() { + public boolean isAttachment() { + return (data instanceof GridFSDBFile); + } + + public DBObject getData() { return data; } diff --git a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java index 68f29cee..8fb712a2 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java +++ b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java @@ -80,7 +80,7 @@ public abstract class RiverMongoDBTestAbstract { public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/script/test-simple-mongodb-document.json"; protected final ESLogger logger = Loggers.getLogger(getClass()); - protected final static long wait = 6000; + protected final static long wait = 2000; public static final String ADMIN_DATABASE_NAME = "admin"; public static final String LOCAL_DATABASE_NAME = "local"; diff --git a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java new file mode 100644 index 00000000..9ef8f780 --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.river.mongodb.gridfs; + +import static org.elasticsearch.client.Requests.countRequest; +import static org.elasticsearch.common.io.Streams.copyToBytesFromClasspath; +import static org.elasticsearch.index.query.QueryBuilders.fieldQuery; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.bson.types.ObjectId; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.WriteConcern; +import com.mongodb.gridfs.GridFS; +import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.gridfs.GridFSInputFile; + +public class RiverMongoWithGridFSInitialImportTest extends + RiverMongoDBTestAbstract { + + private DB mongoDB; + private DBCollection mongoCollection; + + protected RiverMongoWithGridFSInitialImportTest() { + super("testgridfs-initialimport-" + System.currentTimeMillis(), + "testgridfs-initialimport-" + System.currentTimeMillis(), "fs", + "testattachmentindex-initialimport-" + + System.currentTimeMillis()); + } + + // @BeforeClass + public void createDatabase() { + logger.debug("createDatabase {}", getDatabase()); + try { + mongoDB = getMongo().getDB(getDatabase()); + mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); + logger.info("Start createCollection"); + mongoCollection = mongoDB.createCollection(getCollection(), null); + Assert.assertNotNull(mongoCollection); + Thread.sleep(wait); + } catch (Throwable t) { + logger.error("createDatabase failed.", t); + } + } + + private void createRiver() throws Exception { + super.createRiver(TEST_MONGODB_RIVER_GRIDFS_JSON); + Thread.sleep(wait); + } + + // @AfterClass + public void cleanUp() { + super.deleteRiver(); + logger.info("Drop database " + mongoDB.getName()); + mongoDB.dropDatabase(); + } + + @Test + public void testImportAttachmentInitialImport() throws Exception { + logger.debug("*** testImportAttachmentInitialImport ***"); + try { + createDatabase(); + byte[] content = copyToBytesFromClasspath(RiverMongoWithGridFSTest.TEST_ATTACHMENT_HTML); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("test-attachment.html"); + in.setContentType("text/html"); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + createRiver(); + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode().client() + .prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Aliquam")).execute() + .actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + in = gridFS.createFile(content); + in.setFilename("test-attachment-2.html"); + in.setContentType("text/html"); + in.save(); + in.validate(); + + id = in.getId().toString(); + + out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode().client().count(countRequest(getIndex())) + .actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(2l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + response = getNode().client().prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Aliquam")).execute() + .actionGet(); + logger.debug("SearchResponse {}", response.toString()); + totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(2l)); + + DBCursor cursor = gridFS.getFileList(); + try { + while (cursor.hasNext()) { + DBObject object = cursor.next(); + gridFS.remove(new ObjectId(object.get("_id").toString())); + } + } finally { + cursor.close(); + } + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportAttachmentInitialImport failed.", t); + Assert.fail("testImportAttachmentInitialImport failed", t); + } finally { + cleanUp(); + } + } + +} diff --git a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java index e0d9f140..a0ee752b 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java @@ -42,19 +42,17 @@ import com.mongodb.gridfs.GridFSDBFile; import com.mongodb.gridfs.GridFSInputFile; -@Test public class RiverMongoWithGridFSTest extends RiverMongoDBTestAbstract { - private static final String TEST_ATTACHMENT_PDF = "/org/elasticsearch/river/mongodb/gridfs/lorem.pdf"; - private static final String TEST_ATTACHMENT_HTML = "/org/elasticsearch/river/mongodb/gridfs/test-attachment.html"; + public static final String TEST_ATTACHMENT_PDF = "/org/elasticsearch/river/mongodb/gridfs/lorem.pdf"; + public static final String TEST_ATTACHMENT_HTML = "/org/elasticsearch/river/mongodb/gridfs/test-attachment.html"; private DB mongoDB; private DBCollection mongoCollection; protected RiverMongoWithGridFSTest() { - super("testgridfs-" + System.currentTimeMillis(), - "testgridfs-" + System.currentTimeMillis(), - "fs", - "testattachmentindex-" + System.currentTimeMillis()); + super("testgridfs-" + System.currentTimeMillis(), "testgridfs-" + + System.currentTimeMillis(), "fs", "testattachmentindex-" + + System.currentTimeMillis()); } @BeforeClass @@ -64,6 +62,7 @@ public void createDatabase() { mongoDB = getMongo().getDB(getDatabase()); mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); super.createRiver(TEST_MONGODB_RIVER_GRIDFS_JSON); + Thread.sleep(wait); logger.info("Start createCollection"); mongoCollection = mongoDB.createCollection(getCollection(), null); Assert.assertNotNull(mongoCollection); @@ -82,113 +81,139 @@ public void cleanUp() { @Test public void testImportAttachment() throws Exception { logger.debug("*** testImportAttachment ***"); - byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); - logger.debug("Content in bytes: {}", content.length); - GridFS gridFS = new GridFS(mongoDB); - GridFSInputFile in = gridFS.createFile(content); - in.setFilename("test-attachment.html"); - in.setContentType("text/html"); - in.save(); - in.validate(); - - String id = in.getId().toString(); - logger.debug("GridFS in: {}", in); - logger.debug("Document created with id: {}", id); - - GridFSDBFile out = gridFS.findOne(in.getFilename()); - logger.debug("GridFS from findOne: {}", out); - out = gridFS.findOne(new ObjectId(id)); - logger.debug("GridFS from findOne: {}", out); - Assert.assertEquals(out.getId(), in.getId()); - - Thread.sleep(wait); - refreshIndex(); - - CountResponse countResponse = getNode().client() - .count(countRequest(getIndex())).actionGet(); - logger.debug("Index total count: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Index count for id {}: {}", id, countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - SearchResponse response = getNode().client().prepareSearch(getIndex()) - .setQuery(QueryBuilders.queryString("Aliquam")).execute() - .actionGet(); - logger.debug("SearchResponse {}", response.toString()); - long totalHits = response.getHits().getTotalHits(); - logger.debug("TotalHits: {}", totalHits); - assertThat(totalHits, equalTo(1l)); - - gridFS.remove(new ObjectId(id)); - - Thread.sleep(wait); - refreshIndex(); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(0L)); + try { +// createDatabase(); + byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("test-attachment.html"); + in.setContentType("text/html"); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode().client() + .prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Aliquam")).execute() + .actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + gridFS.remove(new ObjectId(id)); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportAttachment failed.", t); + Assert.fail("testImportAttachment failed", t); + } finally { +// cleanUp(); + } } @Test public void testImportPDFAttachment() throws Exception { logger.debug("*** testImportPDFAttachment ***"); - byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_PDF); - logger.debug("Content in bytes: {}", content.length); - GridFS gridFS = new GridFS(mongoDB); - GridFSInputFile in = gridFS.createFile(content); - in.setFilename("lorem.pdf"); - in.setContentType("application/pdf"); - in.save(); - in.validate(); - - String id = in.getId().toString(); - logger.debug("GridFS in: {}", in); - logger.debug("Document created with id: {}", id); - - GridFSDBFile out = gridFS.findOne(in.getFilename()); - logger.debug("GridFS from findOne: {}", out); - out = gridFS.findOne(new ObjectId(id)); - logger.debug("GridFS from findOne: {}", out); - Assert.assertEquals(out.getId(), in.getId()); - - Thread.sleep(wait); - refreshIndex(); - - CountResponse countResponse = getNode().client() - .count(countRequest(getIndex())).actionGet(); - logger.debug("Index total count: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Index count for id {}: {}", id, countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - SearchResponse response = getNode().client().prepareSearch(getIndex()) - .setQuery(QueryBuilders.queryString("Lorem ipsum dolor")) - .execute().actionGet(); - logger.debug("SearchResponse {}", response.toString()); - long totalHits = response.getHits().getTotalHits(); - logger.debug("TotalHits: {}", totalHits); - assertThat(totalHits, equalTo(1l)); - - gridFS.remove(new ObjectId(id)); - - Thread.sleep(wait); - refreshIndex(); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(0L)); + try { +// createDatabase(); + byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_PDF); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("lorem.pdf"); + in.setContentType("application/pdf"); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode().client() + .prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Lorem ipsum dolor")) + .execute().actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + gridFS.remove(new ObjectId(id)); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportPDFAttachment failed.", t); + Assert.fail("testImportPDFAttachment failed", t); + } finally { +// cleanUp(); + } } /* @@ -196,62 +221,78 @@ public void testImportPDFAttachment() throws Exception { */ @Test public void testImportAttachmentWithCustomMetadata() throws Exception { - logger.debug("*** testImportAttachment ***"); - byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); - logger.debug("Content in bytes: {}", content.length); - GridFS gridFS = new GridFS(mongoDB); - GridFSInputFile in = gridFS.createFile(content); - in.setFilename("test-attachment.html"); - in.setContentType("text/html"); - BasicDBObject metadata = new BasicDBObject(); - metadata.put("attribut1", "value1"); - metadata.put("attribut2", "value2"); - in.put("metadata", metadata); - in.save(); - in.validate(); - - String id = in.getId().toString(); - logger.debug("GridFS in: {}", in); - logger.debug("Document created with id: {}", id); - - GridFSDBFile out = gridFS.findOne(in.getFilename()); - logger.debug("GridFS from findOne: {}", out); - out = gridFS.findOne(new ObjectId(id)); - logger.debug("GridFS from findOne: {}", out); - Assert.assertEquals(out.getId(), in.getId()); - - Thread.sleep(wait); - refreshIndex(); - - CountResponse countResponse = getNode().client() - .count(countRequest(getIndex())).actionGet(); - logger.debug("Index total count: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Index count for id {}: {}", id, countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - SearchResponse response = getNode().client().prepareSearch(getIndex()) - .setQuery(QueryBuilders.queryString("metadata.attribut1:value1")).execute() - .actionGet(); - logger.debug("SearchResponse {}", response.toString()); - long totalHits = response.getHits().getTotalHits(); - logger.debug("TotalHits: {}", totalHits); - assertThat(totalHits, equalTo(1l)); - - gridFS.remove(new ObjectId(id)); - - Thread.sleep(wait); - refreshIndex(); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(0L)); + logger.debug("*** testImportAttachmentWithCustomMetadata ***"); + try { +// createDatabase(); + byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("test-attachment.html"); + in.setContentType("text/html"); + BasicDBObject metadata = new BasicDBObject(); + metadata.put("attribut1", "value1"); + metadata.put("attribut2", "value2"); + in.put("metadata", metadata); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode() + .client() + .prepareSearch(getIndex()) + .setQuery( + QueryBuilders + .queryString("metadata.attribut1:value1")) + .execute().actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + gridFS.remove(new ObjectId(id)); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportAttachmentWithCustomMetadata failed.", t); + Assert.fail("testImportAttachmentWithCustomMetadata failed", t); + } finally { +// cleanUp(); + } } }