-
Notifications
You must be signed in to change notification settings - Fork 4.1k
STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions #2113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7c22715 to
d45c562
Compare
|
cool feature |
kishorvpatil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 LGTM.
|
@kishorvpatil Could you look again. I had to rebase becase of a minor conflict in the DefaultResourceAwareStrategy. It ended up resulting in only some comment changes to this code, but I wanted to be sure you took a look. |
srdo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good as far as I can tell. It was pretty big for 1 commit though :)
conf/defaults.yaml
Outdated
| storm.cgroup.memory.limit.tolerance.margin.mb: 0.0 | ||
| storm.supervisor.memory.limit.tolerance.margin.mb: 128.0 | ||
| storm.supervisor.hard.memory.limit.multiplier: 2.0 | ||
| storm.supervisor.hard.memory.limit.overage: 2024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Some of these properties are missing the unit name
| ### Shared Memory | ||
|
|
||
| In some cases you may have memory that is shared between components. It may be a cache shared within a worker between instances of a bolt, or it might be static data that is memory mapped into a bolt and is shared accross a worker. In any case you can specify your share memory request by | ||
| creating one of `SharedOffHeapWithinNode`, `SharedOffHeapWithinWorker`, or `SharedOnHeap` and adding it to bolts and spouts that use that shared memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a difference between SharedOnHeap and just declaring a static field?
| // The size of a worker is limited by the amount of heap assigned to it and can be overridden by | ||
| conf.setTopologyWorkerMaxHeapSize(1024.0); | ||
| // This is to try and balance the time needed to devote to GC against not needing to | ||
| // serialize/deserialize tuples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand this. Are more executors ending up in a worker if the max heap size is larger, or how does this work?
| if (args != null && args.length > 0) { | ||
| topoName = args[0]; | ||
| } | ||
| //Not needed on RAS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already mentioned
| @NotNull | ||
| @isImplementationOfClass(implementsClass = IStrategy.class) | ||
| @isString | ||
| //TODO @isImplementationOfClass(implementsClass = IStrategy.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
| /** | ||
| * Get the topology id this is a part of. | ||
| */ | ||
| public String getTopoogyId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topoogy -> topology
| import org.junit.Test; | ||
|
|
||
| public class NimbusTest { | ||
| @Test(expected=IllegalArgumentException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could reduce the scope of this assertion more local with the ExpectedException rule
| defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0); | ||
| defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0); | ||
| defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); | ||
| //TODO clean this up some more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo
| Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId); | ||
| _fsOps.setupStormCodeDir(topoConf, _stormRoot); | ||
| if (_assignment.is_has_node_shared_memory()) { | ||
| File sharedMemoryDir = new File(_stormRoot, "shared_by_topology"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have missed it somewhere, but how does the topology code access the off heap shared memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is up to the topology. It could be JNI code or even something that a ShellBolt or ShellSpout does. The point is to provide a place for them to store the data if they need it. We have a group doing this in /tmp right now and causing all kinds of issues if things crash/get rescheduled leaking things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I was just thinking it might be helpful if the topology could get the directory path (or _stormRoot), so users don't have to figure out how to path to this directory themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add it to the documentation. But I didn't spend a lot of time documenting it on purpose. shared memory off heap is hard to get right and I would rather discourage people from using it.
| (int) | ||
| (Math.ceil( | ||
| ObjectReader.getDouble( | ||
| this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if having defaults here is necessary? Isn't defaults.yaml always expected to be present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except in some unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
|
I think I addressed all of the review comments |
srdo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for addressing the comments.
| /** | ||
| * get the topology-id this assignment is for. | ||
| * @return | ||
| * Return the ID of the topolgoy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topolgoy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I even typo my fixes.
|
|
||
| private Map<String, Number> resources = new HashMap<>(); | ||
| private static Map<String, Object> conf = Utils.readStormConfig(); | ||
| //@{link org.apache.storm.trident.planner.Node} and several other tirdent classes inherit from DefaultResourceDeclarer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tirdent
| private static Map<String, Object> conf = Utils.readStormConfig(); | ||
| //@{link org.apache.storm.trident.planner.Node} and several other tirdent classes inherit from DefaultResourceDeclarer | ||
| // These classes are serialized out as part of the bolts and spouts of a topology, often for each bolt/spout in the topology. | ||
| // The following are marked as transiant because they are never used after the topology is created so keeping them around just wasts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasts
| import org.apache.storm.topology.TopologyBuilder; | ||
| import org.junit.Test; | ||
|
|
||
| import static org.junit.Assert.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checkstyle bans this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checkstyle does not check tests for some reason, but if you care about it I can change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, don't worry about it. Hadn't noticed this was in a test
| Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, stormTopology1); | ||
| try { | ||
| Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, stormTopology1); | ||
| fail("Expected exeption not thrown"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exeption
|
@srdo I think I got the rest of them @kishorvpatil could you take a look at let me know if the rework is good for you? |
…ed memory regions
|
@srdo @kishorvpatil @jerrypeng I just rebased this on the latest master and added in a few caches in some of the internal data structures to help with performance issues we had seen in production. Please take a look again, I really would like to get this in sooner rather then later as we have some other work we want to do on the RAS scheduler to add in generic resource support (we have a goal to officially support GPUs) |
kishorvpatil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still +1
|
+1, though I'll admit to just skimming it this time. It would be easier to tell what changed if the non-merge-conflict-resolution changes were in separate commits. It might make sense to drop a line to @roshannaik before merging, I think he asked on the mailing list to be made aware of changes to storm-client due to #2241 |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
This is based off of #2112 so that the formatting is simpler.