Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

019 minor fixes #292

Merged
merged 33 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
de8d014
Fix a typo.
Evildoor Oct 28, 2019
3568321
State what config is expected
Evildoor Oct 28, 2019
685c710
Add an option to get help with --help/-h keys.
Evildoor Oct 28, 2019
e19300f
Add a message for default config being used.
Evildoor Oct 28, 2019
d11b45b
Add missing numeration to README.
Evildoor Oct 28, 2019
7b5f0f6
Add description for check_input().
Evildoor Oct 29, 2019
f893b6a
Add description for convertIndexToLowerCase().
Evildoor Oct 29, 2019
9e921cb
Add description for constructActionJson().
Evildoor Oct 31, 2019
3b22230
Add description for constructDataJson().
Evildoor Oct 31, 2019
3327c73
Update readability for list of definitions.
Evildoor Nov 6, 2019
4aa44ec
Add comments to actions in main logic.
Evildoor Nov 6, 2019
2bfbf7c
Add help message and options to show it.
Evildoor Nov 18, 2019
c6977b4
Always exit after usage().
Evildoor Nov 18, 2019
345d441
Call esFormat.php's help from run.sh's usage().
Evildoor Nov 18, 2019
d6e3734
Update run.sh's usage() style.
Evildoor Nov 18, 2019
808fbb1
Move start of message to separate line.
Evildoor Nov 21, 2019
8d253b9
Add info about usage of stdin to help message.
Evildoor Nov 21, 2019
002d24f
Shorten run.sh's help.
Evildoor Dec 27, 2019
05303e5
Clarify the config description.
Evildoor Dec 27, 2019
0048d97
Simplify the description.
Evildoor Dec 27, 2019
bb2deb7
Add description for getAction().
Evildoor Dec 27, 2019
2963b3b
Remove index initialization with NULL.
Evildoor Dec 30, 2019
339a687
Move UPDATE_RETRIES definition.
Evildoor Dec 30, 2019
be42f92
Add ACTION variable.
Evildoor Dec 30, 2019
68110d1
Expand DEFAULT_ACTION description.
Evildoor Dec 30, 2019
7d3ed8b
Add a note about help messages' dependance.
Evildoor Jan 24, 2020
71b0b39
Add explanation for '--' argument.
Evildoor Mar 23, 2020
651b5e9
Add link to ES docs.
Evildoor Apr 7, 2020
2dde924
Change exit status to 0.
Evildoor Apr 7, 2020
76dfa03
Remove unnecessary brackets.
Evildoor Apr 11, 2020
90b5003
Update optionality of arguments.
Evildoor Apr 11, 2020
0d91916
Merge branch 'master' into 019-minor-fixes
Evildoor Apr 13, 2020
3a078d4
Adapt stage to incomplete data handling changes.
Evildoor Apr 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Utils/Dataflow/019_esFormat/README
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Stage 19 *
============

Description
1. Description
-----------
Prepare data before uploading to ElasticSearch:
* turn JSON keys to lower case
Expand All @@ -14,7 +14,7 @@ Use ./run.sh to run stage with common configuration from
Use `--update` option to produce records intended to *update* data in ES,
not replace.

Input
2. Input
-----
Expects data in JSON format with special fields.
Required:
Expand All @@ -26,7 +26,7 @@ Optional:
* '_update' (corresponding ES record should be "updated", not
"inserted"/"indexed").

Output
3. Output
------
JSON documents, one document per line:
{{{
Expand All @@ -37,7 +37,7 @@ JSON documents, one document per line:
...
}}}

Samples
4. Samples
-------
To produce regular (seamless NDJSON) samples, run:

Expand Down
131 changes: 124 additions & 7 deletions Utils/Dataflow/019_esFormat/esFormat.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,61 @@ function exception_error_handler($errno, $errstr, $errfile, $errline ) {
}
set_error_handler("exception_error_handler");

# Default values.
# ES index where the documents should be indexed/updated.
$DEFAULT_INDEX = 'tasks_production';
$DEFAULT_ACTION = 'index';
$UPDATE_RETRIES = 3;
$ES_INDEX = NULL;
# End-of-process marker, depending on mode.
$EOP_DEFAULTS = Array("stream" => chr(0), "file" => "");
# End-of-message marker, depending on mode.
$EOM_DEFAULTS = Array("stream" => chr(30), "file" => chr(30));
# How many times the update should be retried in case of conflict.
$UPDATE_RETRIES = 3;
# Action.
# Possible values:
# * 'index' - insert new record or overwrite existing one
# * 'update' - update existing record or insert data as a new one
$DEFAULT_ACTION = 'index';

function usage() {
/* Display information on how to use the script.

Note: this help message is used in run.sh's one. If it is changed,
run.sh may also need change. Another option is to make run.sh's help
more adaptable.
*/

$f = basename(__FILE__);
$msg =
"usage: $f [-h] [-e EOM] [-E EOM] [--update] [FILE]

optional arguments:
-h, --help show this help message and exit

-e EOM, --end-of-message EOM custom end of message marker

-E EOP, --end-of-process EOP custom end of process marker

--update use 'update' action for all records

FILE source file
note: if no FILE is specified, the data
will be acquired from standard input
";
fwrite(STDERR, $msg);
}

function check_input($row) {
/* Check the provided input's correctness.

The input must:
- be an array;
- contain non-empty fields '_id' and '_type'.

:param row: input to check

:return: TRUE if input is correct, FALSE if it is not
:rtype: bool
*/
$required_fields = array('_id', '_type');

if (!is_array($row)) {
Expand All @@ -31,6 +78,11 @@ function check_input($row) {
}

function convertIndexToLowerCase(&$a) {
/* Convert array's keys to lowercase, in place.

:param a: array to process
:type a: array
*/
$result = array();

foreach (array_keys($a) as $i) {
Expand All @@ -41,20 +93,56 @@ function convertIndexToLowerCase(&$a) {
}

function getAction($row) {
global $DEFAULT_ACTION;
/* Determine action for the document.

Action is 'update' if either {'_update': true} or {'_incomplete': true}
key-value pair is present in the document. Otherwise, default value
is returned.

:param row: document for which the action should be determined
:type row: array

:return: action
:rtype: str
*/
global $ACTION;

if (isset($row['_update']) and $row['_update'] === true) {
$action = 'update';
} elseif (isset($row['_incomplete']) and $row['_incomplete'] === true) {
$action = 'update';
} else {
$action = $DEFAULT_ACTION;
$action = $ACTION;
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
}

return $action;
}

function constructActionJson($row) {
/* Generate a json with ES bulk API action information for a given document.

Action json is generated for each document and includes directions about how
exactly the document must be processed. It contains a single key-value pair.
The key is the action to be taken, and is determined upon the presence of
"update":true in the document. The value is an array that contains the
remaining information:
- index name
- number of retries if the action is update
- document id
- document type
- document's parent, if it is specified.

The latter three values are taken from the document.

mgolosova marked this conversation as resolved.
Show resolved Hide resolved
For additional information please refer to the Elasticsearch documentation:
www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

:param row: document for which the action json should be generated
:type row: array

:return: generated action json
:rtype: array
*/
global $ES_INDEX;
global $UPDATE_RETRIES;

Expand All @@ -80,6 +168,23 @@ function constructActionJson($row) {
}

function constructDataJson($row) {
/* Prepare the document for bulk operation.

- Remove fields starting with an underscore. These are service fields
that are processed separately and should not be included into the
resulting data.
- If action is 'update' then 'doc_as_upsert' is set to 'true' if the data
is complete. This means that the same document should be used as a new
document to be indexed if there is no existing document to update.
For incomplete data it is set to 'false', and different copies of data
are provided for indexing and updating.

:param row: document to be prepared
:type row: array

:return: prepared document
:rtype: array
*/
$data = $row;

if (isset($data['_incomplete'])) {
Expand Down Expand Up @@ -136,7 +241,8 @@ function decode_escaped($string) {
'stripcslashes("$0")', $string);
}

$opts = getopt("e:E:", Array("end-of-message:", "end-of-process:",
# Process command line arguments.
$opts = getopt("he:E:", Array("help", "end-of-message:", "end-of-process:",
"update"));
$args = $argv;

Expand All @@ -153,6 +259,11 @@ function decode_escaped($string) {
unset($args[$mkey]);
}
switch ($key) {
case "h":
case "help":
usage();
exit(0);
break;
case "e":
case "end-of-message":
$EOM_MARKER = decode_escaped($val);
Expand All @@ -162,13 +273,14 @@ function decode_escaped($string) {
$EOP_MARKER = decode_escaped($val);
break;
case "update":
$DEFAULT_ACTION = "update";
$ACTION = "update";
break;
}
}

$args = array_values($args);

# Determine mode depending on whether the input file was supplied or not.
if (isset($args[1])) {
$h = fopen($args[1], "r");
$mode = "file";
Expand All @@ -177,12 +289,16 @@ function decode_escaped($string) {
$mode = "stream";
}

if (!(isset($ACTION))) $ACTION = $DEFAULT_ACTION;

# Set markers.
if (!(isset($EOM_MARKER))) $EOM_MARKER = $EOM_DEFAULTS[$mode];
if (!(isset($EOP_MARKER))) $EOP_MARKER = $EOP_DEFAULTS[$mode];

$EOM_HEX = implode(unpack("H*", $EOM_MARKER));
$EOP_HEX = implode(unpack("H*", $EOP_MARKER));

# Check that markers are valid.
if ($EOM_MARKER == '') {
fwrite(STDERR, "(ERROR) EOM marker can not be empty string.\n");
exit(1);
Expand All @@ -203,6 +319,7 @@ function decode_escaped($string) {
$ES_INDEX = $DEFAULT_INDEX;
}

# Process data.
if ($h) {
while (($line = stream_get_line($h, 0, $EOM_MARKER)) !== false) {
$row = json_decode($line,true);
Expand Down
36 changes: 26 additions & 10 deletions Utils/Dataflow/019_esFormat/run.sh
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
#/usr//bin/env bash
#/usr/bin/env bash

base_dir=$(cd "$(dirname "$(readlink -f "$0")")"; pwd)

ES_CONFIG=$base_dir/../../Elasticsearch/config/es
CONFIG_DEFAULT=TRUE

usage() {
echo "USAGE:
$(basename "$0") [-c CONFIG] [--] [ARGS]
echo "usage: $(basename "$0") [-h] [-c CONFIG] [-- [ARGS]]

PARAMETERS:
CONFIG -- configuration file
ARGS -- arguments to be passed to the PHP script
"
}
optional arguments:
-h, --help show this help message and exit

ES_CONFIG=$base_dir/../../Elasticsearch/config/es
-c CONFIG configuration file with parameters required to
prepare data for indexing in Elasticsearch
Default value: $ES_CONFIG

-- separator for explicit division of arguments to
be passed further to PHP script (after) and
ones indended for this script (before).

ARGS, arguments to be passed to the PHP script:"
# Display part of esFormat.php's help describing its arguments.
$base_dir/esFormat.php -h 2>&1 | sed 1,5d >&2
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
}

while [ -n "$1" ]; do
case "$1" in
--help|-h)
usage >&2; exit 0;;
--config|-c)
[ -n "$2" ] && ES_CONFIG="$2" || { usage >&2 && exit 1; }
[ -n "$2" ] && ES_CONFIG="$2" && CONFIG_DEFAULT="" \
|| { usage >&2; exit 1; }
shift;;
--)
shift
Expand All @@ -28,6 +41,9 @@ while [ -n "$1" ]; do
shift
done

[ $CONFIG_DEFAULT ] && echo No config file specified, using \
the default value: $ES_CONFIG >&2

[ -r "$ES_CONFIG" ] \
|| { echo "Can't access configuration file: $ES_CONFIG" >&2 \
&& exit 1; }
Expand Down