Skip to content

Commit

Permalink
es: task cli
Browse files Browse the repository at this point in the history
* Adds `es task` cli commands.
* Adds `es index` `move` and `info` commands.

Co-Authored-by: Peter Weber <peter.weber@rero.ch>
  • Loading branch information
rerowep committed Feb 19, 2023
1 parent 404060b commit 44af8cd
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rero-invenio-base"
version = "0.2.0"
version = "0.2.1"
description = "Generic backend libraries for RERO Invenio instances."
readme = "README.rst"
authors = ["RERO <software@rero.ch>"]
Expand Down
2 changes: 2 additions & 0 deletions rero_invenio_base/cli/es/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .index import index
from .slm import slm
from .snapshot import snapshot
from .task import task


@click.group()
Expand All @@ -34,5 +35,6 @@ def es():
es.add_command(alias)
es.add_command(slm)
es.add_command(snapshot)
es.add_command(task)

__all__ = ('index')
131 changes: 128 additions & 3 deletions rero_invenio_base/cli/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

"""Click elasticsearch index command-line utilities."""
import json
import sys
from pprint import pformat
from time import sleep

import click
from elasticsearch_dsl import Index
Expand Down Expand Up @@ -95,7 +98,7 @@ def switch_index(old, new):
for alias in aliases:
current_search_client.indices.put_alias(new, alias)
current_search_client.indices.delete_alias(old, alias)
click.secho('Sucessfully switched.', fg='green')
click.secho('Successfully switched.', fg='green')


@index.command('create')
Expand All @@ -113,7 +116,7 @@ def create_index(resource, index, verbose, templates):
:param resource: the resource such as documents.
:param index: the index name such as documents-document-v0.0.1-20211014
:param verbose: display addtional message.
:param verbose: display additional message.
:param templates: update also the es templates.
"""
if templates:
Expand Down Expand Up @@ -162,8 +165,130 @@ def update_mapping(aliases, settings):
f'error: {excep}', fg='red')
if res.get('acknowledged'):
click.secho(
f'index: {index} has been sucessfully updated',
f'index: {index} has been successfully updated',
fg='green')
else:
click.secho(
f'error: {res}', fg='red')


@index.command('move')
@with_appcontext
@es_version_check
@click.argument('resource')
@click.argument('old')
@click.argument('new')
@click.option('-t', '--templates/--no-templates', 'templates', is_flag=True,
default=True)
@click.option('-v', '--verbose/--no-verbose', 'verbose',
is_flag=True, default=False)
@click.option('-n', '--interval', default=1, type=int,
help='seconds to wait between updates')
def move_index(resource, old, new, templates, verbose, interval):
"""Move index using the elasticsearch aliases.
:param resource: the resource such as documents.
:param old: full name of the old index
:param new: full name of the fresh created index
:param verbose: display additional message.
:param templates: update also the es templates.
"""
# TODO: use create_index once.
# create_index(resource, index, verbose, templates)
try:
if templates:
tbody = current_search_client.indices.get_template()
for tmpl in current_search.put_templates():
click.secho(f'file:{tmpl[0]}, ok: {tmpl[1]}', fg='green')
new_tbody = current_search_client.indices.get_template()
if patch := make_patch(new_tbody, tbody):
click.secho('Templates are updated.', fg='green')
if verbose:
click.secho('Diff in templates', fg='green')
click.echo(patch)
else:
click.secho('Templates did not changed.', fg='yellow')

f_mapping = list(current_search.aliases.get(resource).values()).pop()
mapping = json.load(open(f'{f_mapping}'))
current_search_client.indices.create(new, mapping)
click.secho(f'Index {new} has been created.', fg='green')
except Exception as err:
click.secho(f'ERROR CREATE: {err}', fg='red')
sys.exit(1)

res = current_search_client.reindex(
body=dict(
source=dict(
index=old
),
dest=dict(
index=new,
version_type='external_gte'
)
),
wait_for_completion=False
)
task = res["task"]
click.secho(f'Task: {task}', fg='green')
count = 0
# wait for task
res = current_search_client.tasks.get(task)
while not res.get('completed'):
task_info = res.get('task')
if verbose and task_info:
click.secho(
f'Watching task: {task} {count} seconds ...', fg='green')
click.secho(f'{task_info.get("description")}', fg='green')
click.secho(f'{pformat( task_info.get("status"))}', fg='green')
sleep(interval)
count += interval
res = current_search_client.tasks.get(task)
if verbose:
click.secho(f'Finished task: {task} {count} seconds ...', fg='yellow')
click.secho(f'{pformat(res.get("response"))}', fg='yellow')
if failures := res.get('failures'):
click.secho(f'ERROR REINDEX: {failures}', fg='red')
sys.exit(2)

# switch index
try:
aliases = current_search_client.indices.get_alias().get(old)\
.get('aliases').keys()
for alias in aliases:
current_search_client.indices.put_alias(new, alias)
current_search_client.indices.delete_alias(old, alias)
click.secho('Successfully switched.', fg='green')
except Exception as err:
click.secho(f'ERROR SWITCH: {err}', fg='red')
sys.exit(3)
sys.exit(0)


@index.command()
@click.option('-i', '--index', default='',
help='all if not specified')
@click.option('-a', '--aliases', is_flag=True, default=False,
help='Display aliases.')
@click.option('-m', '--mappings', is_flag=True, default=False,
help='Display mappings.')
@click.option('-s', '--settings', is_flag=True, default=False,
help='Display settings.')
@with_appcontext
def info(index, aliases, mappings, settings):
"""List indices of given alias."""
def print_info(name, data):
"""Display additional info."""
msg = pformat(data.get(name))
click.secho(f'{name}:', fg='yellow')
click.secho(f'{msg}', fg='yellow')

indices = current_search_client.indices.get(f'{index}*')
for index, data in indices.items():
click.secho(f'{index}', fg='green')
if aliases:
print_info('aliases', data)
if mappings:
print_info('mappings', data)
if settings:
print_info('settings', data)
128 changes: 128 additions & 0 deletions rero_invenio_base/cli/es/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
#
# RERO Invenio Base
# Copyright (C) 2023 RERO.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Click elasticsearch tasks command-line utilities."""
import sys
from pprint import pformat
from time import sleep

import click
from invenio_search import current_search_client
from invenio_search.cli import es_version_check, with_appcontext


def abort_if_false(ctx, param, value):
"""Abort command is value is False."""
if not value:
ctx.abort()


@click.group()
def task():
"""Elasticsearch task commands.
See: https://www.elastic.co/guide/en/elasticsearch/reference/7.10/tasks.html # noqa
"""


@task.command('get')
@with_appcontext
@es_version_check
@click.argument('task')
def task_get(task):
"""Get task info.
:param task: task id.
"""
try:
res = current_search_client.tasks.get(task)
if info := res.get("response"):
click.secho(f'{pformat(info)}', fg='green')
elif info := res.get('task'):
click.secho(f'{info.get("description")}', fg='yellow')
click.secho(f'{pformat(info.get("status"))}', fg='yellow')
else:
click.secho(f'{pformat(res)}', fg='blue')
except Exception as err:
click.secho(f'Error: {err}', fg='red')
sys.exit(1)
sys.exit(0)


@task.command('list')
@with_appcontext
@es_version_check
def task_list():
"""Get task list."""
res = current_search_client.tasks.list()
click.secho(f'{pformat(res)}', fg='green')


@task.command('cancel')
@with_appcontext
@click.option('--yes-i-know', is_flag=True, callback=abort_if_false,
expose_value=False,
prompt='Do you really want to cancel the task?')
@click.argument('task')
@es_version_check
def task_cancel(task):
"""Cancel task.
:param task: task id.
"""
try:
res = current_search_client.tasks.cancel(task)
click.secho(f'{pformat(res)}', fg='yellow')
except Exception as err:
click.secho(f'Error: {err}', fg='red')
sys.exit(1)
sys.exit(0)


@task.command('watch')
@with_appcontext
@es_version_check
@click.argument('task')
@click.option('-n', '--interval', default=1, type=int,
help='seconds to wait between updates')
def task_watch(task, interval):
"""Watch task info.
:param task: task id.
"""
click.secho(f'Watching task: {task}', fg='green')
try:
seconds = 0
res = current_search_client.tasks.get(task)
while not res.get('completed'):
if info := res.get('task'):
click.secho(
f'Watching task: {task} {seconds} seconds ...',
fg='green'
)
click.secho(f'{info.get("description")}', fg='yellow')
click.secho(f'{pformat(info.get("status"))}', fg='yellow')
sleep(interval)
seconds += interval
res = current_search_client.tasks.get(task)

click.secho(f'Finished task: {task} {seconds} seconds ...', fg='green')
click.secho(f'{pformat(res.get("response"))}', fg='green')
except Exception as err:
click.secho(f'Error: {err}', fg='red')
sys.exit(1)
sys.exit(0)

0 comments on commit 44af8cd

Please sign in to comment.