Skip to content
Paco Zamora Martinez edited this page May 18, 2014 · 5 revisions

First steps

Two Lua scripts have been prepared for fast running of the software.

  • execute_server.lua runs the master server for your MapReduce operation. Only one instance of this script is needed. Note that this software receives the MapReduce task splitted into several Lua modules. These modules had to be visible in the LUA_PATH of the server and all the workers that you execute. This script receives 7 mandatory arguments:

    1. The connection string, normally localhost or localhost:21707.
    2. The name of the database where the work will be done.
    3. A Lua module which contains the task function.
    4. A Lua module which contains the map function.
    5. A Lua module which contains the partition function.
    6. A Lua module which contains the reduce function.
    7. A Lua module which contains the final function (optional, it could be the string nil).
    8. A Lua module which contains the combiner function (optional, it could be nil).
    9. A string indicating the intermediate storage (optional). The valid values are gridfs, shared and sshfs. By default it is gridfs.
  • execute_worker.lua runs the worker, which is configured by default to execute one MapReduce task and finish its operation. One task doesn't mean one job. A MapReduce task is performed as several individual map/reduce jobs. A worker waits until all the possible map or reduce jobs are completed to consider a task as finished. This script receives two arguments:

    1. The connection string, as above.
    2. The name of the database where the work will be done, as above.

A simple word-count example is available in the repository at mapreduce/examples/WordCount path. There are two shell-scripts: execute_server_example.sh and execute_worker_example.sh; which are ready to run the word-count example in only one machine, with one or more worker instances. The execution of the example looks like this:

SERVER execute only one.

$ ./execute_example_server.sh > output
# Preparing MAP
# MAP execution
 100.0 % 
# Preparing REDUCE
# 	 MERGE AND PARTITIONING
	 100.0 % 
# 	 CREATING JOBS
# 	 STARTING REDUCE
# REDUCE execution
 100.0 % 
# FINAL execution

WORKER as many workers as you want and your network supports.

$ ./execute_example_worker.sh 
# NEW TASK READY
# 	 EXECUTING MAP JOB _id: "1"
# 		 FINISHED
# 	 EXECUTING MAP JOB _id: "2"
# 		 FINISHED
# 	 EXECUTING MAP JOB _id: "3"
# 		 FINISHED
# 	 EXECUTING MAP JOB _id: "4"
# 		 FINISHED
# 	 EXECUTING REDUCE JOB _id: "121"
# 		 FINISHED
# 	 EXECUTING REDUCE JOB _id: "37"
# 		 FINISHED
...

MapReduce task example: word-count

The example is composed by one Lua module for each of the MapReduce functions, and are available at the directory mapreduce/examples/WordCount/. All the modules has the same structure, they return a Lua table with two fields:

  • init function, which receives a table of arguments and allows to configure your module options, in case that you need any option.

  • A function which implements the necessary Lua code for the operation. The name of the function is different for each operation.

The wordcount MapReduce task is divided, in the following modules:

  • taskfn.lua is the script which defines how the data is divided in order to create map jobs. Every map job will be created by calling emit(key,value).
-- arg is for configuration purposes, it is allowed in any of the scripts
local init = function(arg)
  -- do whatever you need for initialization parametrized by arg table
end
return {
  init = init,
  taskfn = function(emit)
    emit(1,"mapreduce/server.lua")
    emit(2,"mapreduce/worker.lua")
    emit(3,"mapreduce/test.lua")
    emit(4,"mapreduce/utils.lua")
  end
}
  • mapfn.lua is the script where the map function is implemented. The mapfn field is executed as a standard Lua function, and receives three arguments (key,value,emit). The first two are generated b one of the yields at your taskfn script. The third argument is a function. Map results are produced by calling the function emit(key,value).
return {
  init = function() end,
  mapfn = function(key,value,emit)
    for line in io.lines(value) do
      for w in line:gmatch("[^%s]+") do
        emit(w,1)
      end
    end
  end
}
  • partitionfn.lua is the script which describes how the map results are grouped and partitioned in order to create reduce jobs. The partitionfn field is a hash function which receives an emitted key and returns an integer number. Depending in your hash function, more or less reducers will be needed.
-- string hash function: http://isthe.com/chongo/tech/comp/fnv/
local NUM_REDUCERS = 10
local FNV_prime    = 16777619
local offset_basis = 2166136261
local MAX          = 2^32
return {
  init = function() end,
  partitionfn = function(key)
    -- compute hash
    local h = offset_basis
    for i=1,#key do
      h = (h * FNV_prime) % MAX
      h = bit32.bxor(h, key:byte(i))
    end
    return h % NUM_REDUCERS
  end
}
  • reducefn.lua is the script which implements the reduce and the combiner function. The function receives a pair (key,values) and the emit function. The key is one of the emitted keys at map function, and the values is a Lua array (table with integer and sequential keys starting at 1) with all the available map values for the given key. The reduce results will be grouped following the partition function. For each possible partition, an intermediate file will be created in the given storage option. In this case, the reduce function is associative, commutative and idempotent, so this properties are indicated in the returned table. Combiner function always must to accomplish this properties, so in this case is possible to use the same combiner and reduce functions.
local reducefn = function(key,values,emit)
  local count=0
  for _,v in ipairs(values) do count = count + v end
  emit(count)
end
return {
  init = function() end,
  reducefn = reducefn,
  combinerfn = reducefn,
  -- This three properties are true for this reduce function.
  -- Combiners always must to fulfill this properties.
  associative_reducer = true,
  commutative_reducer = true,
  idempotent_reducer  = true,
}
  • finalfn.lua is the script which implements how to take the results produced by the system. The finalfn field is a function which receives a Lua pairs iterator, and returns a boolean or a string indicating what to do next. If the returned value is true, the result data will be deleted from the GridFS collection. If the returned value is false or nil, the results will be available after the execution of your MapReduce task at the corresponding GridFS collection. If the returned value is the string "loop", the system will restart the computation from the begin, allowing to implement Machine Learning algorithms following the Iterative MapReduce model.
return {
  init = function() end,
  finalfn = function(it)
    for key,value in it do
      print(value,key)
    end
    return true -- indicates to remove mongo gridfs result files
  end
}

Execution of WordCount

Example shell-scripts are available at the repository, which execute WordCount task. Their content is:

execute_example_worker.sh

#!/bin/bash
lua execute_worker.lua localhost wordcount

execute_example_server.sh

#!/bin/bash
lua execute_server.lua localhost wordcount \
    mapreduce.examples.WordCount.taskfn \
    mapreduce.examples.WordCount.mapfn \
    mapreduce.examples.WordCount.partitionfn \
    mapreduce.examples.WordCount.reducefn \
    mapreduce.examples.WordCount.finalfn \
    mapreduce.examples.WordCount.reducefn $@

All task in one script

It is possible to put all this modules together in one script. The examples/WordCount/init.lua scripts is an example of how to do that. In this case, the execution of the server will be done by the following shell-script:

lua execute_server.lua localhost wordcount \
    mapreduce.examples.WordCount \
    mapreduce.examples.WordCount \
    mapreduce.examples.WordCount \
    mapreduce.examples.WordCount \
    mapreduce.examples.WordCount \
    mapreduce.examples.WordCount $@