Skip to content

Commit

Permalink
Add properties, requirements, and best practices from FTE blog
Browse files Browse the repository at this point in the history
  • Loading branch information
jhlodin authored and arhimondr committed May 9, 2022
1 parent 746b6d7 commit ffb09f9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 9 deletions.
80 changes: 71 additions & 9 deletions docs/src/main/sphinx/admin/fault-tolerant-execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ execution on a Trino cluster:
* - ``exchange.deduplication-buffer-size``
- Size of the coordinator's in-memory buffer used by fault-tolerant
execution to store output of query :ref:`stages <trino-concept-stage>`.
If this buffer is filled during query execution, the query fails unless
If this buffer is filled during query execution, the query fails with a
"Task descriptor storage capacity has been exceeded" error message unless
an :ref:`exchange manager <fte-exchange-manager>` is configured.
- ``32MB``
* - ``exchange.compression-enabled``
- Enable compression of spooling data. Setting to ``true`` is recommended
when using an :ref:`exchange manager <fte-exchange-manager>`.
- ``false``

.. _fte-retry-policy:

Expand Down Expand Up @@ -91,11 +96,28 @@ recommended when executing large batch queries, as the cluster can more
efficiently retry smaller tasks within the query rather than retry the whole
query.

``TASK`` retry policy requires a configured :ref:`exchange manager
<fte-exchange-manager>` to store spooled exchange data used for each task. It is
also strongly recommended to set the ``query.low-memory-killer.policy``
configuration property to ``total-reservation-on-blocked-nodes``, or queries may
need to be manually killed if the cluster runs out of memory.
The following cluster configuration changes are recommended to improve
fault-tolerant execution with a ``TASK`` retry policy:

* Set the ``query.low-memory-killer.policy``
:doc:`query management property </admin/properties-query-management>` to
``total-reservation-on-blocked-nodes``, or queries may
need to be manually killed if the cluster runs out of memory.
* Set the ``query.low-memory-killer.delay``
:doc:`query management property </admin/properties-query-management>` to
``0s`` so the cluster immediately unblocks nodes that run out of memory.
* Modify the ``query.remote-task.max-error-duration``
:doc:`query management property </admin/properties-query-management>`
to adjust how long Trino allows a remote task to try reconnecting before
considering it lost and rescheduling.

.. note::

A ``TASK`` retry policy is best suited for large batch queries, but this
policy can result in higher latency for short-running queries executed in high
volume. As a best practice, it is recommended to run a dedicated cluster
with a ``TASK`` retry policy for large batch queries, separate from another
cluster that handles short queries.

Advanced configuration
----------------------
Expand Down Expand Up @@ -264,6 +286,17 @@ fault-tolerant execution:
reschedule tasks in case of a failure.
- (JVM heap size * 0.15)
- Only ``TASK``
* - ``fault-tolerant-execution-partition-count``
- Number of partitions to use for distributed joins and aggregations,
similar in function to the ``query.hash-partition-count`` :doc:`query
management property </admin/properties-query-management>`. It is not
recommended to increase this property value above the default of ``50``,
which may result in instability and poor performance. May be overridden
for the current session with the
``fault_tolerant_execution_partition_count`` :ref:`session property
<session-properties-definition>`.
- ``50``
- Only ``TASK``
* - ``max-tasks-waiting-for-node-per-stage``
- Allow for up to configured number of tasks to wait for node allocation
per stage, before pausing scheduling for other tasks from this stage.
Expand All @@ -280,6 +313,9 @@ fault-tolerant execution. You can configure a filesystem-based exchange manager
that stores spooled data in a specified location, either an S3-compatible
storage system or a local filesystem.

Configuration
^^^^^^^^^^^^^

To configure an exchange manager, create a new
``etc/exchange-manager.properties`` configuration file on the coordinator and
all worker nodes. In this file, set the ``exchange-manager.name`` configuration
Expand All @@ -294,8 +330,8 @@ for your storage solution.
- Description
- Default value
* - ``exchange.base-directories``
- The base directory URI location that the exchange manager uses to store
spooling data. Only supports S3 and local filesystems.
- Comma-separated list of URI locations that the exchange manager uses to
store spooling data. Only supports S3 and local filesystems.
-
* - ``exchange.encryption-enabled``
- Enable encrypting of spooling data.
Expand Down Expand Up @@ -339,19 +375,45 @@ for your storage solution.
- Part size for S3 multi-part upload.
- ``5MB``

It is also recommended to set the ``exchange.compression-enabled`` property to
``true`` in the cluster's ``config.properties`` file, to reduce the exchange
manager's overall I/O load.

S3 bucket storage
~~~~~~~~~~~~~~~~~

If using an AWS S3 bucket, it is recommended to configure a bucket lifecycle
rule in AWS to automatically expire abandoned objects in the event of a node
crash.

The following example ``exchange-manager.properties`` configuration specifies an
AWS S3 bucket as the spooling storage destination. Note that the destination
does not have to be in AWS, but can be any S3-compatible storage system.

.. code-block:: properties
exchange-manager.name=filesystem
exchange.base-directories=s3n://trino-exchange-manager
exchange.base-directories=s3n://exchange-spooling-bucket
exchange.encryption-enabled=true
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
You can configure multiple S3 buckets for the exchange manager to distribute
spooled data across buckets, reducing the I/O load on any one bucket. If a query
fails with the error message
"software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your
request rate", this indicates that the workload is I/O intensive, and you should
specify multiple S3 buckets in ``exchange.base-directories`` to balance the
load:

.. code-block:: properties
exchange.base-directories=s3n://exchange-spooling-bucket-1,s3n://exchange-spooling-bucket-2
Local filesystem storage
~~~~~~~~~~~~~~~~~~~~~~~~

The following example ``exchange-manager.properties`` configuration specifies a
local directory, ``/tmp/trino-exchange-manager``, as the spooling storage
destination.
Expand Down
20 changes: 20 additions & 0 deletions docs/src/main/sphinx/admin/properties-query-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ memory availability. Supports the following values:
* ``total-reservation-on-blocked-nodes`` - Kill the query currently using the
most memory specifically on nodes that are now out of memory.

``query.low-memory-killer.delay``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** :ref:`prop-type-duration`
* **Default value:** ``5m``

The amount of time a query is allowed to recover between running out of memory
and being killed, if ``query.low-memory-killer.policy`` is set to
``total-reservation`` or ``total-reservation-on-blocked-nodes``.

``query.max-execution-time``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -118,6 +128,16 @@ The minimal age of a query in the history before it is expired. An expired
query is removed from the query history buffer and no longer available in
the :doc:`/admin/web-interface`.

``query.remote-task.max-error-duration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** :ref:`prop-type-duration`
* **Default value:** ``5m``

Timeout value for remote tasks that fail to communicate with the
coordinator. If the coordinator is unable to receive updates from a remote task
before this value is reached, the coordinator treats the task as failed.

``retry-policy``
^^^^^^^^^^^^^^^^

Expand Down

0 comments on commit ffb09f9

Please sign in to comment.