toCommitTuples;
+ private CommitCallback commitCallback;
private int tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
@@ -66,6 +67,7 @@ public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitSt
this.solrConfig = solrConfig;
this.solrMapper = solrMapper;
this.commitStgy = commitStgy;
+ this.commitCallback = solrConfig.getCommitCallback();
LOG.debug("Created {} with the following configuration: " +
"[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]",
this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
@@ -74,7 +76,7 @@ public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitSt
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
- this.solrClient = new CloudSolrClient(solrConfig.getZkHostString());
+ this.solrClient = solrConfig.getSolrClientFactory().getSolrClient();
this.toCommitTuples = new ArrayList<>(capacity());
}
@@ -96,17 +98,24 @@ protected void process(Tuple tuple) {
}
}
- private void ack(Tuple tuple) throws SolrServerException, IOException {
+ @Override
+ protected void onTickTuple(final Tuple tuple) {
+ try {
+ // commit ignore strategy when tick
+ ackCommittedTuples();
+ } catch (Exception e) {
+ fail(tuple, e);
+ }
+ }
+
+ private void ack(Tuple tuple) throws SolrServerException, IOException, SolrMapperException {
if (commitStgy == null) {
collector.ack(tuple);
} else {
- final boolean isTickTuple = TupleUtils.isTick(tuple);
- if (!isTickTuple) { // Don't ack tick tuples
- toCommitTuples.add(tuple);
- commitStgy.update();
- }
- if (isTickTuple || commitStgy.commit()) {
- solrClient.commit(solrMapper.getCollection());
+ toCommitTuples.add(tuple);
+ commitStgy.update();
+ if (commitStgy.commit()) {
+ commitCallback.process(solrClient, solrMapper.getCollection());
ackCommittedTuples();
}
}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/client/DefaultSolrClientFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/client/DefaultSolrClientFactory.java
new file mode 100644
index 00000000000..6879ac64861
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/client/DefaultSolrClientFactory.java
@@ -0,0 +1,30 @@
+/*
+ * DefaultCloudSolrClientFactory.java
+ * Copyright 2017 Qunhe Tech, all rights reserved.
+ * Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
+ */
+
+package org.apache.storm.solr.client;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+
+/**
+ * default implementation of {@link SolrClientFactory}, which takes a zookeeper host
+ * and produce {@link org.apache.solr.client.solrj.impl.CloudSolrClient} with the host.
+ *
+ * @author alei
+ */
+public class DefaultSolrClientFactory implements SolrClientFactory {
+
+ private String zkHostString;
+
+ public DefaultSolrClientFactory(String zkHostString) {
+ this.zkHostString = zkHostString;
+ }
+
+ @Override
+ public SolrClient getSolrClient() {
+ return new CloudSolrClient(zkHostString);
+ }
+}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/client/SolrClientFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/client/SolrClientFactory.java
new file mode 100644
index 00000000000..25b1674ec9d
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/client/SolrClientFactory.java
@@ -0,0 +1,21 @@
+/*
+ * SerializableSolrClientFactory.java
+ * Copyright 2017 Qunhe Tech, all rights reserved.
+ * Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
+ */
+
+package org.apache.storm.solr.client;
+
+import org.apache.solr.client.solrj.SolrClient;
+
+import java.io.Serializable;
+
+/**
+ * this interface provide SolrClient for {@link org.apache.storm.solr.mapper.SolrMapper}
+ *
+ * @author alei
+ */
+public interface SolrClientFactory extends Serializable {
+
+ SolrClient getSolrClient();
+}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/CommitCallback.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/CommitCallback.java
new file mode 100644
index 00000000000..9e439fac58f
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/CommitCallback.java
@@ -0,0 +1,23 @@
+/*
+ * OnCommit.java
+ * Copyright 2017 Qunhe Tech, all rights reserved.
+ * Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
+ */
+
+package org.apache.storm.solr.config;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * This callback interface is invoked on {@link SolrCommitStrategy} triggers or tick. Typically fire
+ * a "commit" request to solr, you can customize commit details by implementing this interface.
+ * @author alei
+ */
+public interface CommitCallback extends Serializable {
+
+ void process(SolrClient solrClient, String collection) throws SolrServerException, IOException;
+}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/DefaultCommitCallback.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/DefaultCommitCallback.java
new file mode 100644
index 00000000000..d8d0a7291f8
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/DefaultCommitCallback.java
@@ -0,0 +1,23 @@
+/*
+ * DefaultCommitOperation.java
+ * Copyright 2017 Qunhe Tech, all rights reserved.
+ * Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
+ */
+
+package org.apache.storm.solr.config;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+
+import java.io.IOException;
+
+/**
+ * @author alei
+ */
+public class DefaultCommitCallback implements CommitCallback {
+
+ @Override
+ public void process(final SolrClient solrClient, final String collection) throws SolrServerException, IOException {
+ solrClient.commit(collection);
+ }
+}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
index 1803a968f2e..ac22f0357d1 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
@@ -19,6 +19,8 @@
package org.apache.storm.solr.config;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.storm.solr.client.DefaultSolrClientFactory;
+import org.apache.storm.solr.client.SolrClientFactory;
import java.io.Serializable;
@@ -27,9 +29,14 @@
* the bolts should be put in this class.
*/
public class SolrConfig implements Serializable {
- private final String zkHostString;
+
private final int tickTupleInterval;
+ private final SolrClientFactory solrClientFactory;
+
+ private final CommitCallback commitCallback;
+
+
/**
* @param zkHostString Zookeeper host string as defined in the {@link CloudSolrClient} constructor
* */
@@ -42,16 +49,35 @@ public SolrConfig(String zkHostString) {
* @param tickTupleInterval interval for tick tuples
* */
public SolrConfig(String zkHostString, int tickTupleInterval) {
- this.zkHostString = zkHostString;
+ this(new DefaultSolrClientFactory(zkHostString), tickTupleInterval);
+ }
+
+ /**
+ * @param solrClientFactory factory that provide solrClient
+ * @param tickTupleInterval interval for tick tuples
+ * */
+ public SolrConfig(SolrClientFactory solrClientFactory, int tickTupleInterval) {
+ this(solrClientFactory, tickTupleInterval, new DefaultCommitCallback());
+ }
+
+ public SolrConfig(SolrClientFactory solrClientFactory,
+ int tickTupleInterval,
+ CommitCallback commitCallback) {
+ this.solrClientFactory = solrClientFactory;
this.tickTupleInterval = tickTupleInterval;
+ this.commitCallback = commitCallback;
}
- public String getZkHostString() {
- return zkHostString;
+ public SolrClientFactory getSolrClientFactory() {
+ return solrClientFactory;
}
public int getTickTupleInterval() {
return tickTupleInterval;
}
+ public CommitCallback getCommitCallback() {
+ return commitCallback;
+ }
+
}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SolrClientSchemaBuilder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SolrClientSchemaBuilder.java
new file mode 100644
index 00000000000..0971b9d727f
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SolrClientSchemaBuilder.java
@@ -0,0 +1,95 @@
+/*
+ * SolrClientSchemaBuilder.java
+ * Copyright 2017 Qunhe Tech, all rights reserved.
+ * Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
+ */
+
+package org.apache.storm.solr.schema.builder;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.storm.solr.client.SolrClientFactory;
+import org.apache.storm.solr.schema.CopyField;
+import org.apache.storm.solr.schema.FieldType;
+import org.apache.storm.solr.schema.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * this Class build the {@link Schema} by Solr Schema API.
+ *
+ * @see Solr Schema API
+ * @author alei
+ */
+public class SolrClientSchemaBuilder implements SchemaBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SolrClientSchemaBuilder.class);
+
+ private Schema schema;
+
+ public SolrClientSchemaBuilder(String collection, SolrClientFactory factory) throws IOException, SolrServerException {
+ SchemaRequest request = new SchemaRequest();
+ SchemaResponse response = request.process(factory.getSolrClient(), collection);
+ SchemaRepresentation representation = response.getSchemaRepresentation();
+ schema = convert(representation);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ private Schema convert(final SchemaRepresentation representation) {
+ Schema converted = new Schema();
+ converted.setUniqueKey(representation.getUniqueKey());
+ converted.setName(representation.getName());
+ converted.setVersion(String.valueOf(representation.getVersion()));
+ List fieldTypeList = new ArrayList<>(representation.getFieldTypes().size());
+ for (FieldTypeDefinition type : representation.getFieldTypes()) {
+ FieldType fieldType = new FieldType();
+ fieldType.setClazz(type.getAttributes().get("class").toString());
+ if (type.getAttributes().get("multiValued") != null) {
+ fieldType.setMultiValued(Boolean.valueOf(type.getAttributes().get("multiValued").toString()));
+ }
+ fieldType.setName(type.getAttributes().get("name").toString());
+ fieldTypeList.add(fieldType);
+ }
+ converted.setFieldTypes(fieldTypeList);
+ List fieldList =
+ new ArrayList<>(representation.getFields().size());
+ for (Map field : representation.getFields()) {
+ org.apache.storm.solr.schema.Field schemaField = new org.apache.storm.solr.schema.Field();
+ schemaField.setName(field.get("name").toString());
+ schemaField.setType(field.get("type").toString());
+ fieldList.add(schemaField);
+ }
+ converted.setFields(fieldList);
+ List dynamicFieldList =
+ new ArrayList<>(representation.getDynamicFields().size());
+ for (Map field : representation.getDynamicFields()) {
+ org.apache.storm.solr.schema.Field schemaField = new org.apache.storm.solr.schema.Field();
+ schemaField.setName(field.get("name").toString());
+ schemaField.setType(field.get("type").toString());
+ dynamicFieldList.add(schemaField);
+ }
+ converted.setDynamicFields(dynamicFieldList);
+ List copyFieldList = new ArrayList<>(representation.getCopyFields().size());
+ for (Map field : representation.getDynamicFields()) {
+ CopyField copyField = new CopyField();
+ copyField.setDest(field.get("dest").toString());
+ copyField.setSource(field.get("source").toString());
+ copyFieldList.add(copyField);
+ }
+ converted.setCopyFields(copyFieldList);
+ return converted;
+ }
+
+}
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
index d84d140ae0b..cf8dda483f5 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
@@ -18,16 +18,15 @@
package org.apache.storm.solr.trident;
-import org.apache.storm.topology.FailedException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.storm.solr.config.SolrConfig;
import org.apache.storm.solr.mapper.SolrMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
@@ -44,7 +43,7 @@ public SolrState(SolrConfig solrConfig, SolrMapper solrMapper) {
}
protected void prepare() {
- solrClient = new CloudSolrClient(solrConfig.getZkHostString());
+ solrClient = solrConfig.getSolrClientFactory().getSolrClient();
}
@Override