Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,15 @@ zmq.threads: 1
zmq.linger.millis: 5000
zmq.hwm: 0


storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880 #5MB buffer
# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead.
storm.messaging.netty.max_retries: 300
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
io.netty.noPreferDirect: false
io.netty.allocator.type: "pooled"

# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
storm.messaging.netty.transfer.batch.size: 262144
Expand Down Expand Up @@ -280,6 +281,7 @@ pacemaker.host: "localhost"
pacemaker.port: 6699
pacemaker.base.threads: 10
pacemaker.max.threads: 50
pacemaker.client.max.threads: 2
pacemaker.thread.timeout: 10
pacemaker.childopts: "-Xmx1024m"
pacemaker.auth.method: "NONE"
Expand Down
9 changes: 3 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@
<disruptor.version>3.3.2</disruptor.version>
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
<netty.version>3.9.0.Final</netty.version>
<netty.version>4.1.5.Final</netty.version>
<sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
<log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
<log4j.version>2.1</log4j.version>
<slf4j.version>1.7.7</slf4j.version>
Expand Down Expand Up @@ -648,10 +649,6 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -819,7 +816,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
Expand Down
14 changes: 7 additions & 7 deletions storm-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down Expand Up @@ -490,7 +490,7 @@
<include>compojure:compojure</include>
<include>clj-time:clj-time</include>
<include>org.apache.thrift:*</include>
<include>io.netty:netty</include>
<include>io.netty:netty-all</include>
<include>com.google.guava:guava</include>
<include>org.apache.httpcomponents:http*</include>
<include>org.apache.zookeeper:zookeeper</include>
Expand Down Expand Up @@ -538,6 +538,10 @@
<pattern>clojure.core.incubator</pattern>
<shadedPattern>org.apache.storm.shade.clojure.core.incubator</shadedPattern>
</relocation>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.storm.netty</shadedPattern>
</relocation>
<relocation>
<pattern>clojure.tools.namespace</pattern>
<shadedPattern>org.apache.storm.shade.clojure.tools.namespace</shadedPattern>
Expand Down Expand Up @@ -583,10 +587,6 @@
<!-- This pattern is inconsistent for backwards compatibility purposes. -->
<shadedPattern>org.apache.storm.thrift</shadedPattern>
</relocation>
<relocation>
<pattern>org.jboss.netty</pattern>
<shadedPattern>org.apache.storm.shade.org.jboss.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>org.apache.storm.shade.com.google.common</shadedPattern>
Expand Down Expand Up @@ -760,7 +760,7 @@
</excludes>
</filter>
<filter>
<artifact>io.netty:netty</artifact>
<artifact>io.netty:netty-all</artifact>
<excludes>
<exclude>META-INF/LICENSE.txt</exclude>
<exclude>META-INF/NOTICE.txt</exclude>
Expand Down
29 changes: 29 additions & 0 deletions storm-core/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ public class Config extends HashMap<String, Object> {
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";

/**
* Netty based messaging: The write buffer high water mark for write buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These config descriptions are not descriptive. Without reading the code, I have no idea what this does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take a look at how to improve. Any suggestions?

-thanks

*/
@isInteger
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "storm.messaging.netty.write.buffer.high.water.mark";

/**
* Netty based messaging: The write buffer low water mark for write buffer
*/
@isInteger
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_WRITE_BUFFER_LOW_WATER_MARK = "storm.messaging.netty.write.buffer.low.water.mark";

/**
* Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
*/
Expand Down Expand Up @@ -120,6 +134,12 @@ public class Config extends HashMap<String, Object> {
@isInteger
public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";

/**
* The Netty message decoder will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_DECODE_BATCH_SIZE
*/
@isInteger
public static final String STORM_NETTY_MESSAGE_DECODE_BATCH_SIZE = "storm.messaging.netty.decode.batch.size";

/**
* We check with this interval that whether the Netty channel is writable and try to write pending messages
*/
Expand Down Expand Up @@ -954,6 +974,15 @@ public class Config extends HashMap<String, Object> {
@isPositiveNumber
public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";

/**
* The maximum number of threads that should be used by the Pacemaker client.
* When Pacemaker gets loaded it will spawn new threads, up to
* this many total, to handle the load.
*/
@isNumber
@isPositiveNumber
public static final String PACEMAKER_CLIENT_MAX_THREADS = "pacemaker.client.max.threads";

/**
* This parameter is used by the storm-deploy project to configure the
* jvm options for the pacemaker daemon.
Expand Down
11 changes: 5 additions & 6 deletions storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.storm.messaging.local;

import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TaskMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,16 +33,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;

public class Context implements IContext {
private static final Logger LOG = LoggerFactory.getLogger(Context.class);
Expand Down
Loading