Skip to content

Commit

Permalink
Fix #167
Browse files Browse the repository at this point in the history
- New parameter options/skip_initial_import is available (default is
false).
  • Loading branch information
richardwilly98 committed Nov 13, 2013
1 parent 68366b0 commit 161f18c
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MongoDBRiverDefinition {
public final static String INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD = "script_type";
public final static String INITIAL_TIMESTAMP_SCRIPT_FIELD = "script";
public final static String ADVANCED_TRANSFORMATION_FIELD = "advanced_transformation";
public final static String SKIP_INITIAL_IMPORT_FIELD = "skip_initial_import";
public final static String PARENT_TYPES_FIELD = "parent_types";
public final static String FILTER_FIELD = "filter";
public final static String CREDENTIALS_FIELD = "credentials";
Expand Down Expand Up @@ -126,6 +127,7 @@ public class MongoDBRiverDefinition {
private final String script;
private final String scriptType;
private final boolean advancedTransformation;
private final boolean skipInitialImport;
private final Set<String> parentTypes;
// index
private final String indexName;
Expand Down Expand Up @@ -168,6 +170,7 @@ public static class Builder {
private String script = null;
private String scriptType = null;
private boolean advancedTransformation = false;
private boolean skipInitialImport;
private Set<String> parentTypes = null;

// index
Expand Down Expand Up @@ -297,6 +300,11 @@ public Builder advancedTransformation(boolean advancedTransformation) {
return this;
}

public Builder skipInitialImport(boolean skipInitialImport) {
this.skipInitialImport = skipInitialImport;
return this;
}

public Builder parentTypes(Set<String> parentTypes) {
this.parentTypes = parentTypes;
return this;
Expand Down Expand Up @@ -467,7 +475,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName
builder.mongoSSLVerifyCertificate(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(SSL_VERIFY_CERT_FIELD), true));
builder.advancedTransformation(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(ADVANCED_TRANSFORMATION_FIELD),
false));

builder.skipInitialImport(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(SKIP_INITIAL_IMPORT_FIELD), false));

mongoClientOptionsBuilder.connectTimeout(builder.connectTimeout).socketTimeout(builder.socketTimeout);

if (builder.mongoSecondaryReadPreference) {
Expand Down Expand Up @@ -774,6 +783,7 @@ private MongoDBRiverDefinition(final Builder builder) {
this.script = builder.script;
this.scriptType = builder.scriptType;
this.advancedTransformation = builder.advancedTransformation;
this.skipInitialImport = builder.skipInitialImport;
this.parentTypes = builder.parentTypes;

// index
Expand Down Expand Up @@ -889,6 +899,10 @@ public boolean isAdvancedTransformation() {
return advancedTransformation;
}

public boolean isSkipInitialImport() {
return skipInitialImport;
}

public Set<String> getParentTypes() {
return parentTypes;
}
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/org/elasticsearch/river/mongodb/Slurper.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ public void run() {
}

BSONTimestamp startTimestamp = null;
if (!riverHasIndexedFromOplog() && definition.getInitialTimestamp() == null) {
if (!isIndexEmpty()) {
MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);
break;
if (!definition.isSkipInitialImport()) {
if (!riverHasIndexedFromOplog() && definition.getInitialTimestamp() == null) {
if (!isIndexEmpty()) {
MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);
break;
}
startTimestamp = doInitialImport();
}
startTimestamp = doInitialImport();
} else {
logger.info("Skip initial import from collection {}", definition.getMongoCollection());
}

// Slurp from oplog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.script.ScriptService;
Expand All @@ -27,8 +28,8 @@ public void testLoadMongoDBRiverSimpleDefinition() {
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings,
scriptService);
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(),
RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverSettings, scriptService);
Assert.assertNotNull(definition);
Assert.assertEquals("mydb", definition.getMongoDb());
Assert.assertEquals("mycollection", definition.getMongoCollection());
Expand All @@ -39,7 +40,8 @@ public void testLoadMongoDBRiverSimpleDefinition() {
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_CONCURRENT_REQUESTS, definition.getBulk().getConcurrentRequests());
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_BULK_SIZE, definition.getBulk().getBulkSize());
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_FLUSH_INTERVAL, definition.getBulk().getFlushInterval());

Assert.assertFalse(definition.isSkipInitialImport());

} catch (Throwable t) {
Assert.fail("testLoadMongoDBRiverSimpleDefinition failed", t);
}
Expand All @@ -53,8 +55,8 @@ public void testLoadMongoDBRiverDefinition() {
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings,
scriptService);
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(),
RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverSettings, scriptService);
Assert.assertNotNull(definition);
Assert.assertEquals("mycollection", definition.getIncludeCollection());
Assert.assertTrue(definition.getParentTypes().contains("parent1"));
Expand Down Expand Up @@ -85,8 +87,8 @@ public void testLoadMongoDBRiverNewDefinition() {
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings,
scriptService);
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(),
RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverSettings, scriptService);
Assert.assertNotNull(definition);
Assert.assertEquals("mycollection", definition.getIncludeCollection());
Assert.assertTrue(definition.getParentTypes().contains("parent1"));
Expand All @@ -100,10 +102,10 @@ public void testLoadMongoDBRiverNewDefinition() {
Assert.assertEquals(11000, definition.getConnectTimeout());
Assert.assertEquals(riverName.getName(), definition.getRiverName());

// actions: 500
// size: "20mb",
// concurrent_requests: 40,
// flush_interval: "50ms"
// actions: 500
// size: "20mb",
// concurrent_requests: 40,
// flush_interval: "50ms"

// Test bulk
Assert.assertEquals(500, definition.getBulk().getBulkActions());
Expand All @@ -124,8 +126,8 @@ public void testLoadMongoDBRiverDefinitionIssue159() {
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings,
scriptService);
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(),
RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverSettings, scriptService);
Assert.assertNotNull(definition);

Assert.assertEquals(2, definition.getMongoServers().size());
Expand All @@ -141,6 +143,23 @@ public void testLoadMongoDBRiverDefinitionIssue159() {
}
}

@Test
public void testLoadMongoDBRiverDefinitionIssue167() {
try {
RiverName riverName = new RiverName("mongodb", "mongodb-" + System.currentTimeMillis());
InputStream in = getClass().getResourceAsStream("/org/elasticsearch/river/mongodb/test-mongodb-river-definition-167.json");
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(),
RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverSettings, scriptService);
Assert.assertNotNull(definition);
Assert.assertTrue(definition.isSkipInitialImport());
} catch (Throwable t) {
Assert.fail("testLoadMongoDBRiverDefinitionIssue167 failed", t);
}
}

@Test
public void parseFilter() {
String filter = "{\"o.lang\":\"de\"}";
Expand All @@ -156,7 +175,7 @@ public void parseFilter() {
Assert.assertNotNull(filterNoPrefix2);
BasicDBObject bsonFilterNoPrefix2 = (BasicDBObject) JSON.parse(filterNoPrefix2);
Assert.assertEquals(bsonFilterNoPrefix, bsonFilterNoPrefix2);

String filterWithPrefix = MongoDBRiverDefinition.addPrefix("o.", filterNoPrefix);
BasicDBObject bsonFilterWithPrefix = (BasicDBObject) JSON.parse(filterWithPrefix);
Assert.assertNotNull(bsonFilterWithPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -34,17 +36,25 @@
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.PluginManager;
import org.elasticsearch.plugins.PluginManager.OutputMode;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
import org.elasticsearch.script.ScriptService;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
Expand Down Expand Up @@ -289,9 +299,9 @@ protected void refreshIndex(String index) {

protected void createRiver(String jsonDefinition, String river, Object... args) throws Exception {
logger.info("Create river [{}]", river);
String setting = getJsonSettings(jsonDefinition, args);
logger.info("River setting [{}]", setting);
node.client().prepareIndex("_river", river, "_meta").setSource(setting).execute().actionGet();
String settings = getJsonSettings(jsonDefinition, args);
logger.info("River setting [{}]", settings);
node.client().prepareIndex("_river", river, "_meta").setSource(settings).execute().actionGet();
logger.debug("Running Cluster Health");
ClusterHealthResponse clusterHealth = node.client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus())
.actionGet();
Expand Down Expand Up @@ -331,8 +341,13 @@ protected void deleteIndex(String name) {
int max = 5;
int count = 0;
logger.info("Delete index [{}]", name);
IndicesExistsResponse response = node.client().admin().indices().prepareExists(name).get();
if (!response.isExists()) {
logger.info("Index {} does not exist", name);
return;
}
if (!node.client().admin().indices().prepareDelete(name).execute().actionGet().isAcknowledged()) {
IndicesExistsResponse response = node.client().admin().indices().prepareExists(name).get();
response = node.client().admin().indices().prepareExists(name).get();
while (response.isExists()) {
logger.debug("Index {} not deleted. Try waiting 1 sec...", name);
try {
Expand All @@ -346,10 +361,6 @@ protected void deleteIndex(String name) {
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
logger.debug("Running Cluster Health");
ClusterHealthResponse clusterHealth = node.client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus())
.actionGet();
Expand Down Expand Up @@ -391,6 +402,26 @@ protected void deleteRiver(String name) {
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
}

protected MongoDBRiverDefinition getMongoDBRiverDefinition(String jsonDefinition, String database, String collection, String index) throws Throwable {
try {
RiverName riverName = new RiverName("mongodb", river);
// InputStream in = getClass().getResourceAsStream("/org/elasticsearch/river/mongodb/test-mongodb-river-simple-definition.json");
String settings = getJsonSettings(jsonDefinition, String.valueOf(getMongoPort1()), String.valueOf(getMongoPort2()),
String.valueOf(getMongoPort3()), database, collection, index);
InputStream in = new ByteArrayInputStream(settings.getBytes());
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverSettings,
scriptService);
Assert.assertNotNull(definition);
return definition;
} catch (Throwable t) {
Assert.fail("testLoadMongoDBRiverSimpleDefinition failed", t);
throw t;
}
}

@AfterSuite
public void afterSuite() {
logger.debug("*** afterSuite ***");
Expand Down Expand Up @@ -431,6 +462,10 @@ protected static Node getNode() {
return node;
}

protected static IndicesAdminClient getIndicesAdminClient() {
return node.client().admin().indices();
}

protected static int getMongoPort1() {
return mongoPort1;
}
Expand Down
Loading

0 comments on commit 161f18c

Please sign in to comment.