Skip to content
Francisco Zamora-Martinez edited this page May 12, 2014 · 8 revisions

Welcome to the Lua-MapReduce wiki!

Before starting with this tool you need to be confident with the Map-Reduce model.

Lua-MapReduce allows the description of distributed computations following the Map-Reduce model. It is based in Lua 5.2 and the MongoDB driver luamongo 0.4.

The basis

A MongoDB database is used in order to communicate all the machines and for persistence purposes, allowing fault tolerancy. In the basic, the tool needs to know:

  • A connection string which indicates the destination of all MongoDB queries.

  • A database name (denoted as dbname) which indicates the name of the collection available for the Map-Reduce task. Reusing this collection is dangerous because Lua-MapReduce could delete your data.

  • A Map-Reduce task given in the form of multiple Lua modules. It is possible to write all the task in one Lua script, but you need to provide the same Lua module for every Map-Reduce function.

  • Optionally the authentication credentials (denoted as auth) if you need to authorize the connection for the given dbname.

The connection string, dbname and auth are needed by both programs, the server and the workers. The purpose of them is:

  • The server creates the dbname if needed and creates all the needed collections. The most important collections are:

    - `dbname.task` stores information about the current task and its status. At the end
      of the execution statistics about time needed for Map and Reduce jobs will be written
      in this collection.
    - `dbname.map_jobs` stores as many elements as Map jobs you have defined in your **task**
      function. The jobs are
      taken by the workers as soon as possible, and in a random fashion. It is not possible
      (currently) to define how jobs will be distributed. Mappers take one job and execute
      your **map** function.
    - `dbname.red_jobs` is the equivalent but with the Reduce jobs defined by your task.
      This size depends on the keys returned by Mappers and how this keys are groupped by
      your **partition** function. Reducers take one job and execute your **reduce** function.
    - `dbname.fs.files` and `dbname.fs.chunks` are the GridFS collections. The result of
      your task will be stored there with filenames named `result.P???` where ??? is the
      number of the partition. So, will be as many GridFS files as partitions, and all
      the keys corresponding to one partition will be stored together. This data would be
      removed before the end of the execution depending on the result value of your
      **final** function.
    
  • The workers, which read from the dbname.map_jobs or dbname.red_jobs and execute the first available job. Mappers input depends in your task scripts, they use the (key,value) pairs returned by the task function, and produce several (key',value') pairs. These result pairs will be stored in in an intermediate storage sorted by keys. This storage could be the GridFS collection (which could be sharded if you do that using MongoShell), a shared file system path or a local file system path with scp+public_keys for synchronization. Mappers result will be partitioned following your partition function. Reducers will take Mappers result. At most, a reducer could read its input from as many intermediate files as Map jobs are defined in your task. All this input files will be merged in order to combine together all the values available for one key. Reducers result is directly stored at the GridFS collection in the definitive result files.

The easy way

Two Lua scripts have been prepared for fast running of the software.

  • execute_server.lua runs the master server for your map-reduce operation. Only one instance of this script is needed. Note that this software receives the map-reduce task splitted into several Lua modules. These modules had to be visible in the LUA_PATH of the server and all the workers that you execute. This script receives 7 mandatory arguments:

    1. The connection string, normally localhost or localhost:21707.
    2. The name of the database where the work will be done.
    3. A Lua module which contains the task function data.
    4. A Lua module which contains the map function data.
    5. A Lua module which contains the partition function data.
    6. A Lua module which contains the reduce function data.
    7. A Lua module which contains the final function data.
  • execute_worker.lua runs the worker, which is configured by default to execute one map-reduce task and finish its operation. One task doesn't mean one job. A map-reduce task is performed as several individual map/reduce jobs. A worker waits until all the possible map or reduce jobs are completed to consider a task as finished. This script receives two arguments:

    1. The connection string, as above.
    2. The name of the database where the work will be done, as above.

A simple word-count example is available in the repository. There are two shell-scripts: execute_server_example.sh and execute_worker_example.sh; which are ready to run the word-count example in only one machine, with one or more worker instances. The execution of the example looks like this:

SERVER execute only one.

$ ./execute_example_server.sh > output
# Preparing MAP
# MAP execution
 100.0 % 
# Preparing REDUCE
# 	 MERGE AND PARTITIONING
	 100.0 % 
# 	 CREATING JOBS
# 	 STARTING REDUCE
# REDUCE execution
 100.0 % 
# FINAL execution

WORKER as many workers as you want and your network supports.

$ ./execute_example_worker.sh 
# NEW TASK READY
# 	 EXECUTING MAP JOB _id: "1"
# 		 FINISHED
# 	 EXECUTING MAP JOB _id: "2"
# 		 FINISHED
# 	 EXECUTING MAP JOB _id: "3"
# 		 FINISHED
# 	 EXECUTING MAP JOB _id: "4"
# 		 FINISHED
# 	 EXECUTING REDUCE JOB _id: "121"
# 		 FINISHED
# 	 EXECUTING REDUCE JOB _id: "37"
# 		 FINISHED
...

Map-reduce task example: word-count

The example is composed by one Lua module for each of the map-reduce functions, and are available at the directory examples/WordCount/. All the modules has the same structure, they return a Lua table with two fields:

  • init function, which receives a table of arguments and allows to configure your module options, in case that you need any option.

  • A function which implements the necessary Lua code for the operation. The name of the function is different for each operation.

A map-reduce task is divided, at least, in the following modules:

  • taskfn.lua is the script which defines how the data is divided in order to create map jobs. The taskfn field is executed as a Lua coroutine, so, every map job will be created by calling corotuine.yield(key,value).
-- arg is for configuration purposes, it is allowed in any of the scripts
local init = function(arg)
  -- do whatever you need for initialization parametrized by arg table
end
return {
  init = init,
  taskfn = function()
    coroutine.yield(1,"mapreduce/server.lua")
    coroutine.yield(2,"mapreduce/worker.lua")
    coroutine.yield(3,"mapreduce/test.lua")
    coroutine.yield(4,"mapreduce/utils.lua")
  end
}
  • mapfn.lua is the script where the map function is implemented. The mapfn field is executed as a standard Lua function, and receives three arguments (key,value,emit). The first two are generated b one of the yields at your taskfn script. The third argument is a function. Map results are produced by calling the function emit(key,value).
return {
  init = function() end,
  mapfn = function(key,value,emit)
    for line in io.lines(value) do
      for w in line:gmatch("[^%s]+") do
        emit(w,1)
      end
    end
  end
}
  • partitionfn.lua is the script which describes how the map results are grouped and partitioned in order to create reduce jobs. The partitionfn field is a hash function which receives an emitted key and returns an integer number. Depending in your hash function, more or less reducers will be needed.
-- string hash function: http://isthe.com/chongo/tech/comp/fnv/
local NUM_REDUCERS = 10
local FNV_prime    = 16777619
local offset_basis = 2166136261
local MAX          = 2^32
return {
  init = function() end,
  partitionfn = function(key)
    -- compute hash
    local h = offset_basis
    for i=1,#key do
      h = (h * FNV_prime) % MAX
      h = bit32.bxor(h, key:byte(i))
    end
    return h % NUM_REDUCERS
  end
}
  • reducefn.lua is the script which implements the reduce function. The reducefn field is a function which receives a pair (key,values) where the key is one of the emitted keys, and the values is a Lua array (table with integer and sequential keys starting at 1) with all the available map values for the given key. The system could reuse the reduce function several times, so, it must be idempotent. The reduce results will be grouped following the partition function. For each possible partition, a GridFS file will be created in a collection called dbname_fs where dbname is the database name defined above.
return {
  init = function() end,
  reducefn = function(key,values)
    local count=0
    for _,v in ipairs(values) do count = count + v end
    return count
  end
}
  • finalfn.lua is the script which implements how to take the results produced by the system. The finalfn field is a function which receives a Lua pairs iterator, and returns a boolean or a string indicating what to do next. If the returned value is true, the result data will be deleted from the GridFS collection. If the returned value is false or nil, the results will be available after the execution of your Map-Reduce task at the corresponding GridFS collection. If the returned value is the string "loop", the system will restart the computation from the begin, allowing to implement Machine Learning algorithms following the Iterative Map-Reduce model.
return {
  init = function() end,
  finalfn = function(it)
    for key,value in it do
      print(value,key)
    end
    return true -- indicates to remove mongo gridfs result files
  end
}

Writting your worker and server scripts from scratch

Clone this wiki locally