Skip to content

Commit 4542419

Browse files
Add memory sample support.
1 parent bda1baa commit 4542419

File tree

13 files changed

+345
-10
lines changed

13 files changed

+345
-10
lines changed

.rubocop.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ Layout/BeginEndAlignment:
3434
Enabled: true
3535
EnforcedStyleAlignWith: start_of_line
3636

37+
Layout/RescueEnsureAlignment:
38+
Enabled: true
39+
3740
Layout/ElseAlignment:
3841
Enabled: true
3942

@@ -42,6 +45,7 @@ Layout/DefEndAlignment:
4245

4346
Layout/CaseIndentation:
4447
Enabled: true
48+
EnforcedStyle: end
4549

4650
Layout/CommentIndentation:
4751
Enabled: true

async-container-supervisor.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ Gem::Specification.new do |spec|
2626

2727
spec.add_dependency "async-service"
2828
spec.add_dependency "io-endpoint"
29+
spec.add_dependency "memory", "~> 0.6"
2930
spec.add_dependency "memory-leak", "~> 0.5"
3031
end

bake/async/container/supervisor.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,25 @@ def status
2929
end
3030
end
3131

32+
# Sample memory allocations from a worker over a time period.
33+
#
34+
# This is useful for identifying memory leaks by tracking allocations
35+
# that are retained after garbage collection.
36+
#
37+
# @parameter duration [Integer] The duration in seconds to sample for (default: 10).
38+
# @parameter connection_id [String] The connection ID to target a specific worker.
39+
def memory_sample(duration: 10, connection_id:)
40+
client do |connection|
41+
Console.info(self, "Sampling memory from worker...", duration: duration, connection_id: connection_id)
42+
43+
# Build the operation request:
44+
operation = {do: :memory_sample, duration: duration}
45+
46+
# Use the forward operation to proxy the request to a worker:
47+
return connection.call(do: :forward, operation: operation, connection_id: connection_id)
48+
end
49+
end
50+
3251
private
3352

3453
def endpoint

context/getting-started.md

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,46 @@ The {ruby Async::Container::Supervisor::MemoryMonitor} will periodically check w
139139

140140
The supervisor can collect various diagnostics from workers on demand:
141141

142-
- **Memory dumps**: Full heap dumps for memory analysis
143-
- **Thread dumps**: Stack traces of all threads
142+
- **Memory dumps**: Full heap dumps for memory analysis via `ObjectSpace.dump_all`.
143+
- **Memory samples**: Lightweight sampling to identify memory leaks.
144+
- **Thread dumps**: Stack traces of all threads.
144145
- **Scheduler dumps**: Async fiber hierarchy
145146
- **Garbage collection profiles**: GC performance data
146147

147148
These can be triggered programmatically or via command-line tools (when available).
148149

150+
#### Memory Leak Diagnosis
151+
152+
To identify memory leaks, you can use the memory sampling feature which is much lighter weight than a full memory dump. It tracks allocations over a time period and focuses on retained objects.
153+
154+
**Using the bake task:**
155+
156+
```bash
157+
# Sample for 30 seconds and print report to console
158+
$ bake async:container:supervisor:memory_sample duration=30
159+
```
160+
161+
**Programmatically:**
162+
163+
```ruby
164+
# Assuming you have a connection to a worker:
165+
result = connection.call(do: :memory_sample, duration: 30)
166+
puts result[:data]
167+
```
168+
169+
This will sample memory allocations for the specified duration, then force a garbage collection and return a JSON report showing what objects were allocated during that period and retained after GC. Late-lifecycle allocations that are retained are likely memory leaks.
170+
171+
The JSON report includes:
172+
- `total_allocated`: Total allocated memory and count
173+
- `total_retained`: Total retained memory and count
174+
- `by_gem`: Breakdown by gem/library
175+
- `by_file`: Breakdown by source file
176+
- `by_location`: Breakdown by specific file:line locations
177+
- `by_class`: Breakdown by object class
178+
- `strings`: String allocation analysis
179+
180+
This is much more efficient than `do: :memory_dump` which uses `ObjectSpace.dump_all` and can be slow and blocking on large heaps. The JSON format also makes it easy to integrate with monitoring and analysis tools.
181+
149182
## Advanced Usage
150183

151184
### Custom Monitors

examples/memory-leak/service.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def setup(container)
5151
# The interval at which to check for memory leaks.
5252
interval: 1,
5353
# The total size limit of all processes:
54-
maximum_size_limit: 1024 * 1024 * 40, # 40 MB
54+
maximum_size_limit: 1024 * 1024 * 1000, # 1000 MB
5555
)]
5656
end
5757
end

examples/simple/simple.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ def setup(container)
4646
include Async::Container::Supervisor::Environment
4747

4848
monitors do
49-
[Async::Container::Supervisor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 400)]
49+
[Async::Container::Supervisor::MemoryMonitor.new(
50+
interval: 1,
51+
limit: 1024 * 1024 * 400
52+
)]
5053
end
5154
end

guides/getting-started/readme.md

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,46 @@ The {ruby Async::Container::Supervisor::MemoryMonitor} will periodically check w
139139

140140
The supervisor can collect various diagnostics from workers on demand:
141141

142-
- **Memory dumps**: Full heap dumps for memory analysis
143-
- **Thread dumps**: Stack traces of all threads
142+
- **Memory dumps**: Full heap dumps for memory analysis via `ObjectSpace.dump_all`.
143+
- **Memory samples**: Lightweight sampling to identify memory leaks.
144+
- **Thread dumps**: Stack traces of all threads.
144145
- **Scheduler dumps**: Async fiber hierarchy
145146
- **Garbage collection profiles**: GC performance data
146147

147148
These can be triggered programmatically or via command-line tools (when available).
148149

150+
#### Memory Leak Diagnosis
151+
152+
To identify memory leaks, you can use the memory sampling feature which is much lighter weight than a full memory dump. It tracks allocations over a time period and focuses on retained objects.
153+
154+
**Using the bake task:**
155+
156+
```bash
157+
# Sample for 30 seconds and print report to console
158+
$ bake async:container:supervisor:memory_sample duration=30
159+
```
160+
161+
**Programmatically:**
162+
163+
```ruby
164+
# Assuming you have a connection to a worker:
165+
result = connection.call(do: :memory_sample, duration: 30)
166+
puts result[:data]
167+
```
168+
169+
This will sample memory allocations for the specified duration, then force a garbage collection and return a JSON report showing what objects were allocated during that period and retained after GC. Late-lifecycle allocations that are retained are likely memory leaks.
170+
171+
The JSON report includes:
172+
- `total_allocated`: Total allocated memory and count
173+
- `total_retained`: Total retained memory and count
174+
- `by_gem`: Breakdown by gem/library
175+
- `by_file`: Breakdown by source file
176+
- `by_location`: Breakdown by specific file:line locations
177+
- `by_class`: Breakdown by object class
178+
- `strings`: String allocation analysis
179+
180+
This is much more efficient than `do: :memory_dump` which uses `ObjectSpace.dump_all` and can be slow and blocking on large heaps. The JSON format also makes it easy to integrate with monitoring and analysis tools.
181+
149182
## Advanced Usage
150183

151184
### Custom Monitors

lib/async/container/supervisor/connection.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,27 @@ def closed?
7171
@queue.closed?
7272
end
7373

74+
# Forward this call to another connection, proxying all responses back.
75+
#
76+
# This provides true streaming forwarding - intermediate responses flow through
77+
# in real-time rather than being buffered.
78+
#
79+
# @parameter target_connection [Connection] The connection to forward the call to.
80+
# @parameter operation [Hash] The operation request to forward (must include :do key).
81+
def forward(target_connection, operation)
82+
# Forward the operation in an async task to avoid blocking
83+
Async do
84+
# Make the call to the target connection and stream responses back:
85+
Call.call(target_connection, **operation) do |response|
86+
# Push each response through our queue:
87+
self.push(**response)
88+
end
89+
ensure
90+
# Close our queue to signal completion:
91+
@queue.close
92+
end
93+
end
94+
7495
def self.dispatch(connection, target, id, message)
7596
Async do
7697
call = self.new(connection, id, message)

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,19 @@ module Async
1010
module Container
1111
module Supervisor
1212
class MemoryMonitor
13+
MEMORY_SAMPLE = {duration: 60, timeout: 60+20}
14+
1315
# Create a new memory monitor.
1416
#
1517
# @parameter interval [Integer] The interval at which to check for memory leaks.
1618
# @parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit.
1719
# @parameter options [Hash] Options to pass to the cluster when adding processes.
18-
def initialize(interval: 10, total_size_limit: nil, **options)
20+
def initialize(interval: 10, total_size_limit: nil, memory_sample: MEMORY_SAMPLE, **options)
1921
@interval = interval
2022
@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit)
2123

24+
@memory_sample = memory_sample
25+
2226
# We use these options when adding processes to the cluster:
2327
@options = options
2428

@@ -74,6 +78,23 @@ def status(call)
7478
# @parameter monitor [Memory::Leak::Monitor] The monitor that detected the memory leak.
7579
# @returns [Boolean] True if the process was killed.
7680
def memory_leak_detected(process_id, monitor)
81+
Console.info(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
82+
83+
if @memory_sample
84+
Console.info(self, "Capturing memory sample...", child: {process_id: process_id}, memory_sample: @memory_sample)
85+
86+
# We are tracking multiple connections to the same process:
87+
connections = @processes[process_id]
88+
89+
# Try to capture a memory sample:
90+
connections.each do |connection|
91+
result = connection.call(do: :memory_sample, **@memory_sample)
92+
93+
Console.info(self, "Memory sample completed:", child: {process_id: process_id}, result: result)
94+
end
95+
end
96+
97+
# Kill the process gently:
7798
Console.info(self, "Killing process!", child: {process_id: process_id})
7899
Process.kill(:INT, process_id)
79100

lib/async/container/supervisor/server.rb

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Released under the MIT License.
44
# Copyright, 2025, by Samuel Williams.
55

6+
require "securerandom"
7+
68
require_relative "connection"
79
require_relative "endpoint"
810
require_relative "dispatchable"
@@ -17,15 +19,23 @@ class Server
1719
def initialize(monitors: [], endpoint: Supervisor.endpoint)
1820
@monitors = monitors
1921
@endpoint = endpoint
22+
23+
@connections = {}
2024
end
2125

2226
attr :monitors
27+
attr :connections
2328

2429
include Dispatchable
2530

2631
def do_register(call)
2732
call.connection.state.merge!(call.message[:state])
2833

34+
connection_id = SecureRandom.uuid
35+
call.connection.state[:connection_id] = connection_id
36+
37+
@connections[connection_id] = call.connection
38+
2939
@monitors.each do |monitor|
3040
monitor.register(call.connection)
3141
rescue => error
@@ -35,6 +45,31 @@ def do_register(call)
3545
call.finish
3646
end
3747

48+
# Forward an operation to a worker connection.
49+
#
50+
# @parameter call [Connection::Call] The call to handle.
51+
# @parameter operation [Hash] The operation to forward, must include :do key.
52+
# @parameter connection_id [String] The connection ID to target.
53+
def do_forward(call)
54+
operation = call[:operation]
55+
connection_id = call[:connection_id]
56+
57+
unless connection_id
58+
call.fail(error: "Missing 'connection_id' parameter")
59+
return
60+
end
61+
62+
connection = @connections[connection_id]
63+
64+
unless connection
65+
call.fail(error: "Connection not found", connection_id: connection_id)
66+
return
67+
end
68+
69+
# Forward the call to the target connection
70+
call.forward(connection, operation)
71+
end
72+
3873
# Restart the current process group, usually including the supervisor and any other processes.
3974
#
4075
# @parameter signal [Symbol] The signal to send to the process group.
@@ -48,14 +83,26 @@ def do_restart(call)
4883
end
4984

5085
def do_status(call)
86+
connections = @connections.map do |connection_id, connection|
87+
{
88+
connection_id: connection_id,
89+
process_id: connection.state[:process_id],
90+
state: connection.state,
91+
}
92+
end
93+
5194
@monitors.each do |monitor|
5295
monitor.status(call)
5396
end
5497

55-
call.finish
98+
call.finish(connections: connections)
5699
end
57100

58101
def remove(connection)
102+
if connection_id = connection.state[:connection_id]
103+
@connections.delete(connection_id)
104+
end
105+
59106
@monitors.each do |monitor|
60107
monitor.remove(connection)
61108
rescue => error

0 commit comments

Comments
 (0)