A Crystal library for interfacing with the Beanstalk queue.
-
Add the dependency to your
shard.yml
:dependencies: beanstalk-cr: github: free-beer/beanstalk-cr
-
Run
shards install
require "beanstalk-cr"
All code for the library is contained within a module called Beanstalk
.
To connect to a Beanstalk server instance you can call the open()
method
on the Beanstalk::Connection
class like this...
connection = Beanstalk::Connection.open()
This will attempt to connect to a server running on localhost
at the
default Beanstalk port number (11300). If you want to change the server that
gets connected to you can specify that in the call to open()
like
this...
connection = Beanstalk::Connection.open("my.host.name", 12345)
Once you have a connection to a Beanstalk instance you can use it to obtain a
Tube
instance. Tubes are used by Beanstalk as a primary interaction
interface. There are two primary mechanisms for obtaining a Tube
from a
Connection
and these are shown below...
tube1 = connection.default_tube
tube2 = connection["my_q"]
The differences between these requires an explanation of the Beanstalk concepts
of 'using' and 'watching'. These concepts refer to actual queues on the Beanstalk
server and how your Tube
instance interacts with them. The queue that a
Tube
is using will be the queue that it inserts content into. A Tube
can only be 'using' a single queue. On the other hand, a Tube
can be
watching
multiple queues and these are the queues that the Tube
will
check when it's looking to receive content.
In the code above, using connection.default_tube
will retrieve a Tube
instance that is using and watching only the default queues (i.e. the queue
with the name 'default'). Using connection["my_q"]
will retrieve a
Tube
that is using and watching a queue called "my_q". Note, that if a
queue does not exist when you request to use or watch it, Beanstalk will
automatically create it.
The queues that a Tube
instance is using or watching can be changed after
the Tube
instance has been obtained using calls like...
tube.use "other_q"
tube.watch "a_third_q"
Beanstalk refers to the content of it's queues as jobs. In general terms a job
is a collection of data that has been inserted into a queue or can be retrieved
from a queue. This is modelled in the library with the Job
class. To create
a new Job
you can simply construct it. Once constructed you can add data to
the Job
instance. The code below shows some examples of this...
# Create an empty job.
job1 = Beanstalk::Job.new
# Create a job populated with data from a String.
job2 = Beanstalk::Job.new("The data content for the job.")
# Create a job from an Array(UInt8).
array = [1_u8, 2_u8, 3_u8, 4_u8, 5_u8]
job3 = new Beanstalk::Job(array)
# Create a job from a Slice(UInt8)
slice = Slice.new(array.to_unsafe, array.size)
job4 = new Beanstalk::Job(slice)
# Create a job from mixed data sources.
job5 = new Beanstalk::Job("Some text.", array, slice)
Data can be added to an existing Job
instance by calling one of it's
append()
methods. You can check the size of a Job
(i.e. the number of
bytes of data it contains) by calling the size()
method and fetch the actual
data by calling the bytes()
method. A call to the to_s()
method will
attempt to convert the Job
data to a String
so don't call this unless
you're sure that data actually does represent a String
.
Once you have a Job
you can add it to a Beanstalk queue by calling the
put()
method on a Tube
instance. The Job
will be added to the
queue that the Tube
is currently set to use. Prior to being added to a
queue a Job
instance will have a nil
value for it's id
attribute. After being added the id
will be updated to match the id value
assigned to the Job
by Beanstalk. Examples of adding jobs to a Beanstalk
queue are shown below...
tube = Beanstalk::Connection.open.default_tube
# Adding a job with default settings for priority, delay and TTR.
job1 = Beanstalk::Job.new("Some content for my first job.")
tube.put(job1)
# Adding a job with explicit settings for priority, delay and TTR.
settings = Beanstalk::Job::Settings.new(0, 120, 600)
job2 = Beanstalk::Job.new("Some different content for my second job.")
tube.put(job2, settings)
When adding a Job
to a queue Beanstalk has three associated concepts called
priority, delay and time to run (TTR). Priority is a mechanism saying something
about the relative importance of different jobs, with lower priority settings being
considered more important and therefore getting delivered sooner. A jobs delay
setting indicates to Beanstalk that it should hold off on making the job available
for retrieval for a specific number of seconds. Finally a jobs TTR indicates how
much time a client has (in seconds) to do something with a job before Beanstalk will
release their hold on the job and make it available for retrieval again.
Another way to obtain a Job
instance is to retrieve one from the Beanstalk
server. The primary mechanism for doing this is referred to by Beanstalk as
'reserving' the job. When you reserve a job from Beanstalk you are taking
temporary ownership of the job. At some future point it is expected that you
will either delete the job, release ownership of it or 'bury' it. The TTR setting
for a job when it is created indicates how long (in seconds) you have to act on
a job before the Beanstalk server will return it to the population of jobs that
are available to be reserved. Examples of fetching jobs from Beanstalk are shown
below...
# Returns immediately with either a Job instance or nil.
job1 = tube.reserve?
# Waits a most the specified time span for a job to become available or
# returns nil if the time expires and a job is not available.
job2 = tube.reserve(Time::Span.new(seconds: 10))
# Block until a job becomes available.
job3 = tube.reserve
Once you have a Job
instance you can perform whatever associating processing
that you want. Once this is finished you must do something to indicate to the
Beanstalk server that you are finished with the Job
. To do any of these things
you must have already have reserved the Job
and the TTR for the Job
must
not have expired. Examples of what you can do include...
# Delete the job from the Beanstalk queue.
tube.delete(job)
# Release the job back to Beanstalk so that it can be reserved again.
tube.release(job)
# Release the job giving it different priority and delay settings.
settings = Beanstalk::Job::Settings.new(250, 600)
tube.release(job, settings)
# Touch the job, resetting the TTR for your reservation.
tube.touch(job)
# Bury the job (i.e. make it unavailable for reservation until it is 'kicked' back
# into the ready pool).
tube.bury(job)
This covers the primary usage for the library. There are other capabilities provided by the library but you should consult the API documentation for further details.
The library takes note of a number of enviroment settings that can alter how it operates when these are set. These are detailed below...
BEANSTALK_CONNECT_TIMEOUT - Is used to set the time out on obtaining an initial connection to a Beanstalk server. Defaults to 10 seconds and should be an integer value.
BEANSTALK_DEFAULT_JOB_DELAY - Is used to set the default delay assigned to jobs added to Beanstalk without an explicit delay. Default to zero and should be an integer value.
BEANSTALK_DEFAULT_JOB_PRIORITY - Is used to set the default priority assigned to jobs added to Beanstalk without an explicit priority. Defaults to 1000 and should be an integer value.
BEANSTALK_DEFAULT_JOB_TTR - Is used to set the default time to run assigned to jobs added to Beanstsalk without an explicit TTR. Defaults to 3600 (1 hour) and should be an integer value (in seconds).
BEANSTALK_READ_BUFFER_SIZE - Is used to set the buffer sized for reading Beanstalk messages from the server. Might be useful to bump in size if you're using very large jobs but probably not. Should be an integer value.
Code is freely available, so familiarize yourself with that. Note that the unit tests require an actual instance of Beanstalk to run and assume that they will be using an instance on localhost at the default port number.
- Fork it (https://github.com/your-github-user/beanstalk-cr/fork)
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
Pull requests will be reviewed as soon as is possible, though no timeline is guaranteed. Ultimate decision on whether a PR gets merged or not remains my perogative.
- Peter Wood - creator and maintainer