Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions external/storm-solr/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<id>hmcl</id>
<name>Hugo Louro</name>
<email>hmclouro@gmail.com</email>
</developer>
</developer>
</developers>

<dependencies>
Expand All @@ -46,13 +46,13 @@
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>5.2.1</version>
<version>5.5.0</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5.5.0 version is affected by a security problem and is also more than 1.5 years old.
http://lucene.apache.org/solr/news.html
Please consider moving to Solr 6.5.1 or 6.6.0

<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
Expand All @@ -67,13 +67,13 @@
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>5.2.1</version>
<version>5.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-test-framework</artifactId>
<version>5.2.1</version>
<version>5.5.0</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoiding adding version everywhere, this should be made a property

<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@

package org.apache.storm.solr.bolt;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.storm.solr.config.CommitCallback;
import org.apache.storm.solr.config.CountBasedCommit;
import org.apache.storm.solr.config.SolrCommitStrategy;
import org.apache.storm.solr.config.SolrConfig;
import org.apache.storm.solr.mapper.SolrMapper;
import org.apache.storm.solr.mapper.SolrMapperException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,6 +56,7 @@ public class SolrUpdateBolt extends BaseTickTupleAwareRichBolt {
private SolrClient solrClient;
private OutputCollector collector;
private List<Tuple> toCommitTuples;
private CommitCallback commitCallback;
private int tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;

public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
Expand All @@ -66,6 +67,7 @@ public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitSt
this.solrConfig = solrConfig;
this.solrMapper = solrMapper;
this.commitStgy = commitStgy;
this.commitCallback = solrConfig.getCommitCallback();
LOG.debug("Created {} with the following configuration: " +
"[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]",
this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
Expand All @@ -74,7 +76,7 @@ public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitSt
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.solrClient = new CloudSolrClient(solrConfig.getZkHostString());
this.solrClient = solrConfig.getSolrClientFactory().getSolrClient();
this.toCommitTuples = new ArrayList<>(capacity());
}

Expand All @@ -96,17 +98,24 @@ protected void process(Tuple tuple) {
}
}

private void ack(Tuple tuple) throws SolrServerException, IOException {
@Override
protected void onTickTuple(final Tuple tuple) {
try {
// commit ignore strategy when tick
ackCommittedTuples();
} catch (Exception e) {
fail(tuple, e);
}
}

private void ack(Tuple tuple) throws SolrServerException, IOException, SolrMapperException {
if (commitStgy == null) {
collector.ack(tuple);
} else {
final boolean isTickTuple = TupleUtils.isTick(tuple);
if (!isTickTuple) { // Don't ack tick tuples
toCommitTuples.add(tuple);
commitStgy.update();
}
if (isTickTuple || commitStgy.commit()) {
solrClient.commit(solrMapper.getCollection());
toCommitTuples.add(tuple);
commitStgy.update();
if (commitStgy.commit()) {
commitCallback.process(solrClient, solrMapper.getCollection());
ackCommittedTuples();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* DefaultCloudSolrClientFactory.java
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace this license header with the Apache header (copy it from one of the other files)

* Copyright 2017 Qunhe Tech, all rights reserved.
* Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
*/

package org.apache.storm.solr.client;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;

/**
* default implementation of {@link SolrClientFactory}, which takes a zookeeper host
* and produce {@link org.apache.solr.client.solrj.impl.CloudSolrClient} with the host.
*
* @author alei
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the author.

*/
public class DefaultSolrClientFactory implements SolrClientFactory {

private String zkHostString;

public DefaultSolrClientFactory(String zkHostString) {
this.zkHostString = zkHostString;
}

@Override
public SolrClient getSolrClient() {
return new CloudSolrClient(zkHostString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SerializableSolrClientFactory.java
* Copyright 2017 Qunhe Tech, all rights reserved.
* Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
*/

package org.apache.storm.solr.client;

import org.apache.solr.client.solrj.SolrClient;

import java.io.Serializable;

/**
* this interface provide SolrClient for {@link org.apache.storm.solr.mapper.SolrMapper}
*
* @author alei
*/
public interface SolrClientFactory extends Serializable {

SolrClient getSolrClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* OnCommit.java
* Copyright 2017 Qunhe Tech, all rights reserved.
* Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
*/

package org.apache.storm.solr.config;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;

import java.io.IOException;
import java.io.Serializable;

/**
* This callback interface is invoked on {@link SolrCommitStrategy} triggers or tick. Typically fire
* a "commit" request to solr, you can customize commit details by implementing this interface.
* @author alei
*/
public interface CommitCallback extends Serializable {

void process(SolrClient solrClient, String collection) throws SolrServerException, IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Shouldn't this be named "commit"?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* DefaultCommitOperation.java
* Copyright 2017 Qunhe Tech, all rights reserved.
* Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
*/

package org.apache.storm.solr.config;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;

import java.io.IOException;

/**
* @author alei
*/
public class DefaultCommitCallback implements CommitCallback {

@Override
public void process(final SolrClient solrClient, final String collection) throws SolrServerException, IOException {
solrClient.commit(collection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.storm.solr.config;

import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.storm.solr.client.DefaultSolrClientFactory;
import org.apache.storm.solr.client.SolrClientFactory;

import java.io.Serializable;

Expand All @@ -27,9 +29,14 @@
* the bolts should be put in this class.
*/
public class SolrConfig implements Serializable {
private final String zkHostString;

private final int tickTupleInterval;

private final SolrClientFactory solrClientFactory;

private final CommitCallback commitCallback;


/**
* @param zkHostString Zookeeper host string as defined in the {@link CloudSolrClient} constructor
* */
Expand All @@ -42,16 +49,35 @@ public SolrConfig(String zkHostString) {
* @param tickTupleInterval interval for tick tuples
* */
public SolrConfig(String zkHostString, int tickTupleInterval) {
this.zkHostString = zkHostString;
this(new DefaultSolrClientFactory(zkHostString), tickTupleInterval);
}

/**
* @param solrClientFactory factory that provide solrClient
* @param tickTupleInterval interval for tick tuples
* */
public SolrConfig(SolrClientFactory solrClientFactory, int tickTupleInterval) {
this(solrClientFactory, tickTupleInterval, new DefaultCommitCallback());
}

public SolrConfig(SolrClientFactory solrClientFactory,
int tickTupleInterval,
CommitCallback commitCallback) {
this.solrClientFactory = solrClientFactory;
this.tickTupleInterval = tickTupleInterval;
this.commitCallback = commitCallback;
}

public String getZkHostString() {
return zkHostString;
public SolrClientFactory getSolrClientFactory() {
return solrClientFactory;
}

public int getTickTupleInterval() {
return tickTupleInterval;
}

public CommitCallback getCommitCallback() {
return commitCallback;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SolrClientSchemaBuilder.java
* Copyright 2017 Qunhe Tech, all rights reserved.
* Qunhe PROPRIETARY/CONFIDENTIAL, any form of usage is subject to approval.
*/

package org.apache.storm.solr.schema.builder;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
import org.apache.storm.solr.client.SolrClientFactory;
import org.apache.storm.solr.schema.CopyField;
import org.apache.storm.solr.schema.FieldType;
import org.apache.storm.solr.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* <p>this Class build the {@link Schema} by Solr Schema API.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to prefer the REST schema builder over this one? Otherwise maybe we should remove the other one?

*
* @see <a href="https://cwiki.apache.org/confluence/display/solr/Schema+API">Solr Schema API</a>
* @author alei
*/
public class SolrClientSchemaBuilder implements SchemaBuilder {

private static final Logger LOG = LoggerFactory.getLogger(SolrClientSchemaBuilder.class);

private Schema schema;

public SolrClientSchemaBuilder(String collection, SolrClientFactory factory) throws IOException, SolrServerException {
SchemaRequest request = new SchemaRequest();
SchemaResponse response = request.process(factory.getSolrClient(), collection);
SchemaRepresentation representation = response.getSchemaRepresentation();
schema = convert(representation);
}

@Override
public Schema getSchema() {
return schema;
}

private Schema convert(final SchemaRepresentation representation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It might be good to add a unit test for this, to verify that this conversion does the right thing.

Schema converted = new Schema();
converted.setUniqueKey(representation.getUniqueKey());
converted.setName(representation.getName());
converted.setVersion(String.valueOf(representation.getVersion()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If this is a float, shouldn't we keep it that way in Schema?

List<FieldType> fieldTypeList = new ArrayList<>(representation.getFieldTypes().size());
for (FieldTypeDefinition type : representation.getFieldTypes()) {
FieldType fieldType = new FieldType();
fieldType.setClazz(type.getAttributes().get("class").toString());
if (type.getAttributes().get("multiValued") != null) {
fieldType.setMultiValued(Boolean.valueOf(type.getAttributes().get("multiValued").toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this attribute already a boolean? If so I don't think there's a reason to toString and then Boolean.valueOf it.

}
fieldType.setName(type.getAttributes().get("name").toString());
fieldTypeList.add(fieldType);
}
converted.setFieldTypes(fieldTypeList);
List<org.apache.storm.solr.schema.Field> fieldList =
new ArrayList<>(representation.getFields().size());
for (Map<String, Object> field : representation.getFields()) {
org.apache.storm.solr.schema.Field schemaField = new org.apache.storm.solr.schema.Field();
schemaField.setName(field.get("name").toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: An explicit cast to String might be nicer here, so you get an exception in case this isn't a String for some reason.

schemaField.setType(field.get("type").toString());
fieldList.add(schemaField);
}
converted.setFields(fieldList);
List<org.apache.storm.solr.schema.Field> dynamicFieldList =
new ArrayList<>(representation.getDynamicFields().size());
for (Map<String, Object> field : representation.getDynamicFields()) {
org.apache.storm.solr.schema.Field schemaField = new org.apache.storm.solr.schema.Field();
schemaField.setName(field.get("name").toString());
schemaField.setType(field.get("type").toString());
dynamicFieldList.add(schemaField);
}
converted.setDynamicFields(dynamicFieldList);
List<CopyField> copyFieldList = new ArrayList<>(representation.getCopyFields().size());
for (Map<String, Object> field : representation.getDynamicFields()) {
CopyField copyField = new CopyField();
copyField.setDest(field.get("dest").toString());
copyField.setSource(field.get("source").toString());
copyFieldList.add(copyField);
}
converted.setCopyFields(copyFieldList);
return converted;
}

}
Loading