-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy paththreaded.rb
111 lines (91 loc) · 2.25 KB
/
threaded.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
require 'thread'
require 'timeout'
require 'logger'
require 'stringio'
require 'threaded/version'
module Threaded
STOP_TIMEOUT = 10 # seconds
extend self
@mutex = Mutex.new
attr_reader :logger, :size, :inline, :sync_promise_io
alias :sync_promise_io? :sync_promise_io
alias :inline? :inline
def inline=(inline)
@mutex.synchronize { @inline = inline }
end
def logger=(logger)
@mutex.synchronize { @logger = logger }
end
def size=(size)
@mutex.synchronize { @size = size }
end
def sync_promise_io=(sync_promise_io)
@mutex.synchronize { @sync_promise_io = sync_promise_io }
end
@sync_promise_io = true
# Starts a threadpool master
# Threaded can still be used without explicitly starting but threads will be
# added to the pool lazilly.
# @see Threaded::Master#start
def start(options = {})
raise "Queue is already started, must configure queue before starting" if options.any? && started?
options.each do |k, v|
self.send(k, v)
end
self.master.start
return self
end
def master
@mutex.synchronize do
return @master if @master
@master = Master.new(logger: self.logger,
size: self.size)
end
@master
end
alias :master= :master
def configure(&block)
raise "Queue is already started, must configure queue before starting" if started?
yield self
end
alias :config :configure
def started?
!stopped?
end
def stopped?
master.stopping?
end
def later(&block)
job = if sync_promise_io?
Proc.new {
Thread.current[:stdout] = StringIO.new
block.call
}
else
block
end
Threaded::Promise.new(&job).later
end
def enqueue(job, *args)
if inline?
job.call(*args)
else
master.enqueue(job, *args)
end
return true
end
# Tells the threadpool master to begin cleaning up jobs and stop
# @see Threaded::Master#stop
def stop(timeout = STOP_TIMEOUT)
return true unless master
master.stop(timeout)
return true
end
end
Threaded.logger = Logger.new(STDOUT)
Threaded.logger.level = Logger::INFO
require 'threaded/errors'
require 'threaded/ext/stdout'
require 'threaded/worker'
require 'threaded/master'
require 'threaded/promise'