From ada81f34d8b8738f09dc9b14a879e2caf12c8db1 Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Wed, 12 Jun 2024 13:29:41 +0300 Subject: [PATCH] Timeseries insertion filters for close samples (#3228) Support timeseries insertion filters for samples that are close to each other in time and value. Use the documented way to disable compression, i.e. `ENCODING UNCOMPRESSED` instead of `UNCOMPRESSED`. Polish the documentation related to timeseries. Align things needed around CI, to make sure all tests are actually executed. BREAKING CHANGES: 1. Remove the `uncompressed` flag from TS.ALTER, since compression of existing timeseries cannot be changed. This should not have been used, so there should be no real impact. 2. For the TS.ADD command (TimeSeriesCommands.add method): the `duplicate_policy` Python parameter that was mapping to `ON DUPLICATE` was now rewired to map to `DUPLICATE POLICY`. A new Python parameter called `on_duplicate` was added, that maps to `ON DUPLICATE`. The expected impact of this change is low. --- .github/workflows/integration.yaml | 164 ++--- docker-compose.yml | 2 +- redis/commands/timeseries/commands.py | 939 ++++++++++++++++---------- tests/conftest.py | 6 +- tests/test_asyncio/test_timeseries.py | 160 ++++- tests/test_timeseries.py | 195 +++++- 6 files changed, 982 insertions(+), 484 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 537409b5f1..11b08c934e 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -23,99 +23,101 @@ concurrency: permissions: contents: read # to fetch code (actions/checkout) -jobs: +env: + REDIS_STACK_IMAGE: redis/redis-stack-server:7.4.0-rc1 - dependency-audit: - name: Dependency audit - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: pypa/gh-action-pip-audit@v1.0.8 - with: - inputs: requirements.txt dev_requirements.txt - ignore-vulns: | - GHSA-w596-4wvx-j9j6 # subversion related git pull, dependency for pytest. There is no impact here. +jobs: + dependency-audit: + name: Dependency audit + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: pypa/gh-action-pip-audit@v1.0.8 + with: + inputs: requirements.txt dev_requirements.txt + ignore-vulns: | + GHSA-w596-4wvx-j9j6 # subversion related git pull, dependency for pytest. There is no impact here. - lint: - name: Code linters - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: 3.9 - cache: 'pip' - - name: run code linters - run: | - pip install -r dev_requirements.txt - invoke linters + lint: + name: Code linters + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.9 + cache: 'pip' + - name: run code linters + run: | + pip install -r dev_requirements.txt + invoke linters - run-tests: - runs-on: ubuntu-latest - timeout-minutes: 60 - strategy: - max-parallel: 15 - fail-fast: false - matrix: - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] - test-type: ['standalone', 'cluster'] - connection-type: ['hiredis', 'plain'] - env: - ACTIONS_ALLOW_UNSECURE_COMMANDS: true - name: Python ${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}} tests - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - cache: 'pip' - - name: run tests - run: | - pip install -U setuptools wheel - pip install -r requirements.txt - pip install -r dev_requirements.txt - if [ "${{matrix.connection-type}}" == "hiredis" ]; then - pip install hiredis - fi - invoke devenv - sleep 10 # time to settle - invoke ${{matrix.test-type}}-tests + run-tests: + runs-on: ubuntu-latest + timeout-minutes: 60 + strategy: + max-parallel: 15 + fail-fast: false + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] + test-type: ['standalone', 'cluster'] + connection-type: ['hiredis', 'plain'] + env: + ACTIONS_ALLOW_UNSECURE_COMMANDS: true + name: Python ${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}} tests + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: 'pip' + - name: run tests + run: | + pip install -U setuptools wheel + pip install -r requirements.txt + pip install -r dev_requirements.txt + if [ "${{matrix.connection-type}}" == "hiredis" ]; then + pip install hiredis + fi + invoke devenv + sleep 10 # time to settle + invoke ${{matrix.test-type}}-tests - - uses: actions/upload-artifact@v4 - if: success() || failure() - with: - name: pytest-results-${{matrix.test-type}}-${{matrix.connection-type}}-${{matrix.python-version}} - path: '${{matrix.test-type}}*results.xml' + - uses: actions/upload-artifact@v4 + if: success() || failure() + with: + name: pytest-results-${{matrix.test-type}}-${{matrix.connection-type}}-${{matrix.python-version}} + path: '${{matrix.test-type}}*results.xml' - - name: Upload codecov coverage - uses: codecov/codecov-action@v4 - with: - fail_ci_if_error: false + - name: Upload codecov coverage + uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: false - - name: View Test Results - uses: dorny/test-reporter@v1 - if: success() || failure() - continue-on-error: true - with: - name: Test Results ${{matrix.python-version}} ${{matrix.test-type}}-${{matrix.connection-type}} - path: '*.xml' - reporter: java-junit - list-suites: all - list-tests: all - max-annotations: 10 - fail-on-error: 'false' + - name: View Test Results + uses: dorny/test-reporter@v1 + if: success() || failure() + continue-on-error: true + with: + name: Test Results ${{matrix.python-version}} ${{matrix.test-type}}-${{matrix.connection-type}} + path: '*.xml' + reporter: java-junit + list-suites: all + list-tests: all + max-annotations: 10 + fail-on-error: 'false' - resp3_tests: + resp3_tests: runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: ['3.7', '3.11'] + python-version: ['3.8', '3.11'] test-type: ['standalone', 'cluster'] connection-type: ['hiredis', 'plain'] protocol: ['3'] env: - ACTIONS_ALLOW_UNSECURE_COMMANDS: true + ACTIONS_ALLOW_UNSECURE_COMMANDS: true name: RESP3 [${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}}] steps: - uses: actions/checkout@v4 @@ -136,7 +138,7 @@ jobs: invoke ${{matrix.test-type}}-tests invoke ${{matrix.test-type}}-tests --uvloop - build_and_test_package: + build_and_test_package: name: Validate building and installing the package runs-on: ubuntu-latest needs: [run-tests] @@ -153,13 +155,13 @@ jobs: run: | bash .github/workflows/install_and_test.sh ${{ matrix.extension }} - install_package_from_commit: + install_package_from_commit: name: Install package from commit hash runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] + python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 diff --git a/docker-compose.yml b/docker-compose.yml index 09418ed094..72c43c2252 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -105,7 +105,7 @@ services: - all redis-stack: - image: redis/redis-stack-server:edge + image: ${REDIS_STACK_IMAGE:-redis/redis-stack-server:edge} container_name: redis-stack ports: - 6479:6379 diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py index 208ddfb09f..f8dfe8b5c0 100644 --- a/redis/commands/timeseries/commands.py +++ b/redis/commands/timeseries/commands.py @@ -33,44 +33,67 @@ def create( labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ Create a new time-series. - Args: + For more information see https://redis.io/commands/ts.create/ - key: - time-series key - retention_msecs: - Maximum age for samples compared to highest reported timestamp (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - Must be a multiple of 8 in the range [128 .. 1048576]. - duplicate_policy: - Policy for handling multiple samples with identical timestamps. - Can be one of: - - 'block': an error will occur for any out of order sample. - - 'first': ignore the new value. - - 'last': override with latest value. - - 'min': only override if the value is lower than the existing value. - - 'max': only override if the value is higher than the existing value. - - 'sum': If a previous sample exists, add the new sample to it so that \ - the updated value is equal to (previous + new). If no previous sample \ - exists, set the updated value equal to the new value. - - For more information: https://redis.io/commands/ts.create/ - """ # noqa + Args: + key: + The time-series key. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + """ params = [key] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) - self._append_duplicate_policy(params, CREATE_CMD, duplicate_policy) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(CREATE_CMD, *params) @@ -81,42 +104,65 @@ def alter( labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ - Update the retention, chunk size, duplicate policy, and labels of an existing - time series. + Update an existing time series. - Args: + For more information see https://redis.io/commands/ts.alter/ - key: - time-series key - retention_msecs: - Maximum retention period, compared to maximal existing timestamp (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - Must be a multiple of 8 in the range [128 .. 1048576]. - duplicate_policy: - Policy for handling multiple samples with identical timestamps. - Can be one of: - - 'block': an error will occur for any out of order sample. - - 'first': ignore the new value. - - 'last': override with latest value. - - 'min': only override if the value is lower than the existing value. - - 'max': only override if the value is higher than the existing value. - - 'sum': If a previous sample exists, add the new sample to it so that \ - the updated value is equal to (previous + new). If no previous sample \ - exists, set the updated value equal to the new value. - - For more information: https://redis.io/commands/ts.alter/ - """ # noqa + Args: + key: + The time-series key. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. Changing this value does not affect + existing chunks. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + """ params = [key] self._append_retention(params, retention_msecs) self._append_chunk_size(params, chunk_size) - self._append_duplicate_policy(params, ALTER_CMD, duplicate_policy) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(ALTER_CMD, *params) @@ -130,60 +176,104 @@ def add( labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, + on_duplicate: Optional[str] = None, ): """ - Append (or create and append) a new sample to a time series. + Append a sample to a time series. When the specified key does not exist, a new + time series is created. - Args: + For more information see https://redis.io/commands/ts.add/ - key: - time-series key - timestamp: - Timestamp of the sample. * can be used for automatic timestamp (using the system clock). - value: - Numeric data value of the sample - retention_msecs: - Maximum retention period, compared to maximal existing timestamp (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - Must be a multiple of 8 in the range [128 .. 1048576]. - duplicate_policy: - Policy for handling multiple samples with identical timestamps. - Can be one of: - - 'block': an error will occur for any out of order sample. - - 'first': ignore the new value. - - 'last': override with latest value. - - 'min': only override if the value is lower than the existing value. - - 'max': only override if the value is higher than the existing value. - - 'sum': If a previous sample exists, add the new sample to it so that \ - the updated value is equal to (previous + new). If no previous sample \ - exists, set the updated value equal to the new value. - - For more information: https://redis.io/commands/ts.add/ - """ # noqa + Args: + key: + The time-series key. + timestamp: + Timestamp of the sample. `*` can be used for automatic timestamp (using + the system clock). + value: + Numeric data value of the sample. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + on_duplicate: + Use a specific duplicate policy for the specified timestamp. Overrides + the duplicate policy set by `duplicate_policy`. + """ params = [key, timestamp, value] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) - self._append_duplicate_policy(params, ADD_CMD, duplicate_policy) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) + self._append_on_duplicate(params, on_duplicate) return self.execute_command(ADD_CMD, *params) def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]): """ - Append (or create and append) a new `value` to series - `key` with `timestamp`. - Expects a list of `tuples` as (`key`,`timestamp`, `value`). - Return value is an array with timestamps of insertions. + Append new samples to one or more time series. + + Each time series must already exist. + + The method expects a list of tuples. Each tuple should contain three elements: + (`key`, `timestamp`, `value`). The `value` will be appended to the time series + identified by 'key', at the given 'timestamp'. - For more information: https://redis.io/commands/ts.madd/ - """ # noqa + For more information see https://redis.io/commands/ts.madd/ + + Args: + ktv_tuples: + A list of tuples, where each tuple contains: + - `key`: The key of the time series. + - `timestamp`: The timestamp at which the value should be appended. + - `value`: The value to append to the time series. + + Returns: + A list that contains, for each sample, either the timestamp that was used, + or an error, if the sample could not be added. + """ params = [] for ktv in ktv_tuples: params.extend(ktv) @@ -199,37 +289,86 @@ def incrby( uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ - Increment (or create an time-series and increment) the latest sample's of a series. - This command can be used as a counter or gauge that automatically gets history as a time series. + Increment the latest sample's of a series. When the specified key does not + exist, a new time series is created. - Args: + This command can be used as a counter or gauge that automatically gets history + as a time series. + + For more information see https://redis.io/commands/ts.incrby/ - key: - time-series key - value: - Numeric data value of the sample - timestamp: - Timestamp of the sample. * can be used for automatic timestamp (using the system clock). - retention_msecs: - Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - - For more information: https://redis.io/commands/ts.incrby/ - """ # noqa + Args: + key: + The time-series key. + value: + Numeric value to be added (addend). + timestamp: + Timestamp of the sample. `*` can be used for automatic timestamp (using + the system clock). `timestamp` must be equal to or higher than the + maximum existing timestamp in the series. When equal, the value of the + sample with the maximum existing timestamp is increased. If it is + higher, a new sample with a timestamp set to `timestamp` is created, and + its value is set to the value of the sample with the maximum existing + timestamp plus the addend. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + + Returns: + The timestamp of the sample that was modified or added. + """ params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(INCRBY_CMD, *params) @@ -242,37 +381,86 @@ def decrby( uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ - Decrement (or create an time-series and decrement) the latest sample's of a series. - This command can be used as a counter or gauge that automatically gets history as a time series. + Decrement the latest sample's of a series. When the specified key does not + exist, a new time series is created. - Args: + This command can be used as a counter or gauge that automatically gets history + as a time series. - key: - time-series key - value: - Numeric data value of the sample - timestamp: - Timestamp of the sample. * can be used for automatic timestamp (using the system clock). - retention_msecs: - Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - - For more information: https://redis.io/commands/ts.decrby/ - """ # noqa + For more information see https://redis.io/commands/ts.decrby/ + + Args: + key: + The time-series key. + value: + Numeric value to subtract (subtrahend). + timestamp: + Timestamp of the sample. `*` can be used for automatic timestamp (using + the system clock). `timestamp` must be equal to or higher than the + maximum existing timestamp in the series. When equal, the value of the + sample with the maximum existing timestamp is decreased. If it is + higher, a new sample with a timestamp set to `timestamp` is created, and + its value is set to the value of the sample with the maximum existing + timestamp minus subtrahend. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + + Returns: + The timestamp of the sample that was modified or added. + """ params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(DECRBY_CMD, *params) @@ -280,17 +468,22 @@ def delete(self, key: KeyT, from_time: int, to_time: int): """ Delete all samples between two timestamps for a given time series. - Args: + The given timestamp interval is closed (inclusive), meaning that samples whose + timestamp equals `from_time` or `to_time` are also deleted. - key: - time-series key. - from_time: - Start timestamp for the range deletion. - to_time: - End timestamp for the range deletion. + For more information see https://redis.io/commands/ts.del/ - For more information: https://redis.io/commands/ts.del/ - """ # noqa + Args: + key: + The time-series key. + from_time: + Start timestamp for the range deletion. + to_time: + End timestamp for the range deletion. + + Returns: + The number of samples deleted. + """ return self.execute_command(DEL_CMD, key, from_time, to_time) def createrule( @@ -304,24 +497,23 @@ def createrule( """ Create a compaction rule from values added to `source_key` into `dest_key`. - Args: + For more information see https://redis.io/commands/ts.createrule/ - source_key: - Key name for source time series - dest_key: - Key name for destination (compacted) time series - aggregation_type: - Aggregation type: One of the following: - [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, - `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Duration of each bucket, in milliseconds - align_timestamp: - Assure that there is a bucket that starts at exactly align_timestamp and - align all other buckets accordingly. - - For more information: https://redis.io/commands/ts.createrule/ - """ # noqa + Args: + source_key: + Key name for source time series. + dest_key: + Key name for destination (compacted) time series. + aggregation_type: + Aggregation type: One of the following: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, + `std.s`, `var.p`, `var.s`, `twa`] + bucket_size_msec: + Duration of each bucket, in milliseconds. + align_timestamp: + Assure that there is a bucket that starts at exactly align_timestamp and + align all other buckets accordingly. + """ params = [source_key, dest_key] self._append_aggregation(params, aggregation_type, bucket_size_msec) if align_timestamp is not None: @@ -331,10 +523,10 @@ def createrule( def deleterule(self, source_key: KeyT, dest_key: KeyT): """ - Delete a compaction rule from `source_key` to `dest_key`.. + Delete a compaction rule from `source_key` to `dest_key`. - For more information: https://redis.io/commands/ts.deleterule/ - """ # noqa + For more information see https://redis.io/commands/ts.deleterule/ + """ return self.execute_command(DELETERULE_CMD, source_key, dest_key) def __range_params( @@ -383,42 +575,46 @@ def range( empty: Optional[bool] = False, ): """ - Query a range in forward direction for a specific time-serie. + Query a range in forward direction for a specific time-series. - Args: + For more information see https://redis.io/commands/ts.range/ - key: - Key name for timeseries. - from_time: - Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, + can be used to express the maximum possible timestamp. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter by_min_value). - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted value of the - latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.range/ - """ # noqa + Args: + key: + Key name for timeseries. + from_time: + Start timestamp for the range query. `-` can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express the maximum + possible timestamp. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter by_min_value`). + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__range_params( key, from_time, @@ -457,40 +653,44 @@ def revrange( **Note**: This command is only available since RedisTimeSeries >= v1.4 - Args: + For more information see https://redis.io/commands/ts.revrange/ - key: - Key name for timeseries. - from_time: - Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, + can be used to express the maximum possible timestamp. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter_by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter_by_min_value). - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted value of the - latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.revrange/ - """ # noqa + Args: + key: + Key name for timeseries. + from_time: + Start timestamp for the range query. `-` can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express the maximum + possible timestamp. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter_by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter_by_min_value`). + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__range_params( key, from_time, @@ -567,49 +767,55 @@ def mrange( """ Query a range across multiple time-series by filters in forward direction. - Args: + For more information see https://redis.io/commands/ts.mrange/ - from_time: - Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, `+` can be used to express the maximum possible timestamp. - filters: - filter to match the time-series labels. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - with_labels: - Include in the reply all label-value pairs representing metadata labels of the time series. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter_by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter_by_min_value). - groupby: - Grouping by fields the results (must mention also reduce). - reduce: - Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, - `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. - select_labels: - Include in the reply only a subset of the key-value pair labels of a series. - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted - value of the latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.mrange/ - """ # noqa + Args: + from_time: + Start timestamp for the range query. `-` can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express the maximum + possible timestamp. + filters: + Filter to match the time-series labels. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + with_labels: + Include in the reply all label-value pairs representing metadata labels + of the time series. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter_by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter_by_min_value`). + groupby: + Grouping by fields the results (must mention also `reduce`). + reduce: + Applying reducer functions on each group. Can be one of [`avg` `sum`, + `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. + select_labels: + Include in the reply only a subset of the key-value pair labels of a + series. + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__mrange_params( aggregation_type, bucket_size_msec, @@ -655,49 +861,55 @@ def mrevrange( """ Query a range across multiple time-series by filters in reverse direction. - Args: + For more information see https://redis.io/commands/ts.mrevrange/ - from_time: - Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, + can be used to express the maximum possible timestamp. - filters: - Filter to match the time-series labels. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - with_labels: - Include in the reply all label-value pairs representing metadata labels of the time series. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter_by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter_by_min_value). - groupby: - Grouping by fields the results (must mention also reduce). - reduce: - Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, - `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. - select_labels: - Include in the reply only a subset of the key-value pair labels of a series. - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted - value of the latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.mrevrange/ - """ # noqa + Args: + from_time: + Start timestamp for the range query. '-' can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, '+' can be used to express the maximum + possible timestamp. + filters: + Filter to match the time-series labels. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`]. + bucket_size_msec: + Time bucket for aggregation in milliseconds. + with_labels: + Include in the reply all label-value pairs representing metadata labels + of the time series. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter_by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter_by_min_value`). + groupby: + Grouping by fields the results (must mention also `reduce`). + reduce: + Applying reducer functions on each group. Can be one of [`avg` `sum`, + `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. + select_labels: + Include in the reply only a subset of the key-value pair labels of a + series. + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__mrange_params( aggregation_type, bucket_size_msec, @@ -721,13 +933,16 @@ def mrevrange( return self.execute_command(MREVRANGE_CMD, *params) def get(self, key: KeyT, latest: Optional[bool] = False): - """# noqa + """ Get the last sample of `key`. - `latest` used when a time series is a compaction, reports the compacted - value of the latest (possibly partial) bucket - For more information: https://redis.io/commands/ts.get/ - """ # noqa + For more information see https://redis.io/commands/ts.get/ + + Args: + latest: + Used when a time series is a compaction, reports the compacted value of + the latest (possibly partial) bucket. + """ params = [key] self._append_latest(params, latest) return self.execute_command(GET_CMD, *params, keys=[key]) @@ -739,24 +954,24 @@ def mget( select_labels: Optional[List[str]] = None, latest: Optional[bool] = False, ): - """# noqa + """ Get the last samples matching the specific `filter`. - Args: + For more information see https://redis.io/commands/ts.mget/ - filters: - Filter to match the time-series labels. - with_labels: - Include in the reply all label-value pairs representing metadata - labels of the time series. - select_labels: - Include in the reply only a subset of the key-value pair labels of a series. - latest: - Used when a time series is a compaction, reports the compacted - value of the latest possibly partial bucket - - For more information: https://redis.io/commands/ts.mget/ - """ # noqa + Args: + filters: + Filter to match the time-series labels. + with_labels: + Include in the reply all label-value pairs representing metadata labels + of the time series. + select_labels: + Include in the reply only a subset of the key-value pair labels o the + time series. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + """ params = [] self._append_latest(params, latest) self._append_with_labels(params, with_labels, select_labels) @@ -765,26 +980,26 @@ def mget( return self.execute_command(MGET_CMD, *params) def info(self, key: KeyT): - """# noqa + """ Get information of `key`. - For more information: https://redis.io/commands/ts.info/ - """ # noqa + For more information see https://redis.io/commands/ts.info/ + """ return self.execute_command(INFO_CMD, key, keys=[key]) def queryindex(self, filters: List[str]): - """# noqa + """ Get all time series keys matching the `filter` list. - For more information: https://redis.io/commands/ts.queryindex/ - """ # noq + For more information see https://redis.io/commands/ts.queryindex/ + """ return self.execute_command(QUERYINDEX_CMD, *filters) @staticmethod def _append_uncompressed(params: List[str], uncompressed: Optional[bool]): """Append UNCOMPRESSED tag to params.""" if uncompressed: - params.extend(["UNCOMPRESSED"]) + params.extend(["ENCODING", "UNCOMPRESSED"]) @staticmethod def _append_with_labels( @@ -860,17 +1075,16 @@ def _append_chunk_size(params: List[str], chunk_size: Optional[int]): params.extend(["CHUNK_SIZE", chunk_size]) @staticmethod - def _append_duplicate_policy( - params: List[str], command: Optional[str], duplicate_policy: Optional[str] - ): - """Append DUPLICATE_POLICY property to params on CREATE - and ON_DUPLICATE on ADD. - """ + def _append_duplicate_policy(params: List[str], duplicate_policy: Optional[str]): + """Append DUPLICATE_POLICY property to params.""" if duplicate_policy is not None: - if command == "TS.ADD": - params.extend(["ON_DUPLICATE", duplicate_policy]) - else: - params.extend(["DUPLICATE_POLICY", duplicate_policy]) + params.extend(["DUPLICATE_POLICY", duplicate_policy]) + + @staticmethod + def _append_on_duplicate(params: List[str], on_duplicate: Optional[str]): + """Append ON_DUPLICATE property to params.""" + if on_duplicate is not None: + params.extend(["ON_DUPLICATE", on_duplicate]) @staticmethod def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]): @@ -903,3 +1117,20 @@ def _append_empty(params: List[str], empty: Optional[bool]): """Append EMPTY property to params.""" if empty: params.append("EMPTY") + + @staticmethod + def _append_insertion_filters( + params: List[str], + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, + ): + """Append insertion filters to params.""" + if (ignore_max_time_diff is None) != (ignore_max_val_diff is None): + raise ValueError( + "Both ignore_max_time_diff and ignore_max_val_diff must be set." + ) + + if ignore_max_time_diff is not None and ignore_max_val_diff is not None: + params.extend( + ["IGNORE", str(ignore_max_time_diff), str(ignore_max_val_diff)] + ) diff --git a/tests/conftest.py b/tests/conftest.py index 9263c4353d..6df6875845 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -157,8 +157,12 @@ def pytest_sessionstart(session): session.config.REDIS_INFO = REDIS_INFO # module info + stack_url = redis_url + if stack_url == default_redis_url: + stack_url = default_redismod_url try: - REDIS_INFO["modules"] = info["modules"] + stack_info = _get_info(stack_url) + REDIS_INFO["modules"] = stack_info["modules"] except (KeyError, redis.exceptions.ConnectionError): pass diff --git a/tests/test_asyncio/test_timeseries.py b/tests/test_asyncio/test_timeseries.py index 0c78ce0941..c93af1ea5b 100644 --- a/tests/test_asyncio/test_timeseries.py +++ b/tests/test_asyncio/test_timeseries.py @@ -75,7 +75,7 @@ async def test_alter(decoded_r: redis.Redis): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -async def test_alter_diplicate_policy(decoded_r: redis.Redis): +async def test_alter_duplicate_policy(decoded_r: redis.Redis): assert await decoded_r.ts().create(1) info = await decoded_r.ts().info(1) assert_resp_response( @@ -113,42 +113,44 @@ async def test_add(decoded_r: redis.Redis): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -async def test_add_duplicate_policy(r: redis.Redis): +async def test_add_duplicate_policy(decoded_r: redis.Redis): # Test for duplicate policy BLOCK - assert 1 == await r.ts().add("time-serie-add-ooo-block", 1, 5.0) + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-block", 1, 5.0) with pytest.raises(Exception): - await r.ts().add("time-serie-add-ooo-block", 1, 5.0, duplicate_policy="block") + await decoded_r.ts().add( + "time-serie-add-ooo-block", 1, 5.0, on_duplicate="block" + ) # Test for duplicate policy LAST - assert 1 == await r.ts().add("time-serie-add-ooo-last", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-last", 1, 10.0, duplicate_policy="last" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-last", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-last", 1, 10.0, on_duplicate="last" ) - res = await r.ts().get("time-serie-add-ooo-last") + res = await decoded_r.ts().get("time-serie-add-ooo-last") assert 10.0 == res[1] # Test for duplicate policy FIRST - assert 1 == await r.ts().add("time-serie-add-ooo-first", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-first", 1, 10.0, duplicate_policy="first" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-first", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-first", 1, 10.0, on_duplicate="first" ) - res = await r.ts().get("time-serie-add-ooo-first") + res = await decoded_r.ts().get("time-serie-add-ooo-first") assert 5.0 == res[1] # Test for duplicate policy MAX - assert 1 == await r.ts().add("time-serie-add-ooo-max", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-max", 1, 10.0, duplicate_policy="max" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-max", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-max", 1, 10.0, on_duplicate="max" ) - res = await r.ts().get("time-serie-add-ooo-max") + res = await decoded_r.ts().get("time-serie-add-ooo-max") assert 10.0 == res[1] # Test for duplicate policy MIN - assert 1 == await r.ts().add("time-serie-add-ooo-min", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-min", 1, 10.0, duplicate_policy="min" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-min", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-min", 1, 10.0, on_duplicate="min" ) - res = await r.ts().get("time-serie-add-ooo-min") + res = await decoded_r.ts().get("time-serie-add-ooo-min") assert 5.0 == res[1] @@ -214,7 +216,7 @@ async def test_create_and_delete_rule(decoded_r: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_del_range(decoded_r: redis.Redis): try: await decoded_r.ts().delete("test", 0, 100) @@ -248,7 +250,7 @@ async def test_range(decoded_r: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_range_advanced(decoded_r: redis.Redis): for i in range(100): await decoded_r.ts().add(1, i, i % 7) @@ -279,7 +281,7 @@ async def test_range_advanced(decoded_r: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_rev_range(decoded_r: redis.Redis): for i in range(100): await decoded_r.ts().add(1, i, i % 7) @@ -379,7 +381,7 @@ async def test_multi_range(decoded_r: redis.Redis): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_multi_range_advanced(decoded_r: redis.Redis): await decoded_r.ts().create(1, labels={"Test": "This", "team": "ny"}) await decoded_r.ts().create( @@ -497,7 +499,7 @@ async def test_multi_range_advanced(decoded_r: redis.Redis): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_multi_reverse_range(decoded_r: redis.Redis): await decoded_r.ts().create(1, labels={"Test": "This", "team": "ny"}) await decoded_r.ts().create( @@ -752,9 +754,117 @@ async def test_query_index(decoded_r: redis.Redis): async def test_uncompressed(decoded_r: redis.Redis): await decoded_r.ts().create("compressed") await decoded_r.ts().create("uncompressed", uncompressed=True) + for i in range(1000): + await decoded_r.ts().add("compressed", i, i) + await decoded_r.ts().add("uncompressed", i, i) compressed_info = await decoded_r.ts().info("compressed") uncompressed_info = await decoded_r.ts().info("uncompressed") if is_resp2_connection(decoded_r): assert compressed_info.memory_usage != uncompressed_info.memory_usage else: assert compressed_info["memoryUsage"] != uncompressed_info["memoryUsage"] + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_create_with_insertion_filters(decoded_r: redis.Redis): + await decoded_r.ts().create( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1000 == await decoded_r.ts().add("time-series-1", 1000, 1.0) + assert 1010 == await decoded_r.ts().add("time-series-1", 1010, 11.0) + assert 1010 == await decoded_r.ts().add("time-series-1", 1013, 10.0) + assert 1020 == await decoded_r.ts().add("time-series-1", 1020, 11.5) + assert 1021 == await decoded_r.ts().add("time-series-1", 1021, 22.0) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1020, 11.5), (1021, 22.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_alter_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().add("time-series-1", 1000, 1.0) + assert 1010 == await decoded_r.ts().add("time-series-1", 1010, 11.0) + assert 1013 == await decoded_r.ts().add("time-series-1", 1013, 10.0) + + await decoded_r.ts().alter( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1013 == await decoded_r.ts().add("time-series-1", 1015, 11.5) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1013, 10.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_add_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().add( + "time-series-1", + 1000, + 1.0, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == await decoded_r.ts().add("time-series-1", 1004, 3.0) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_incrby_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().incrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == await decoded_r.ts().incrby("time-series-1", 3.0, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + assert 1000 == await decoded_r.ts().incrby("time-series-1", 10.1, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 11.1)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_decrby_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().decrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == await decoded_r.ts().decrby("time-series-1", 3.0, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -1.0)] + assert expected_points == data_points + + assert 1000 == await decoded_r.ts().decrby("time-series-1", 10.1, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -11.1)] + assert expected_points == data_points diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 5318818e79..5647bd45c6 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -84,7 +84,7 @@ def test_alter(client): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -def test_alter_diplicate_policy(client): +def test_alter_duplicate_policy(client): assert client.ts().create(1) info = client.ts().info(1) assert_resp_response( @@ -122,38 +122,32 @@ def test_add(client): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -def test_add_duplicate_policy(client): +def test_add_on_duplicate(client): # Test for duplicate policy BLOCK assert 1 == client.ts().add("time-serie-add-ooo-block", 1, 5.0) with pytest.raises(Exception): - client.ts().add("time-serie-add-ooo-block", 1, 5.0, duplicate_policy="block") + client.ts().add("time-serie-add-ooo-block", 1, 5.0, on_duplicate="block") # Test for duplicate policy LAST assert 1 == client.ts().add("time-serie-add-ooo-last", 1, 5.0) - assert 1 == client.ts().add( - "time-serie-add-ooo-last", 1, 10.0, duplicate_policy="last" - ) + assert 1 == client.ts().add("time-serie-add-ooo-last", 1, 10.0, on_duplicate="last") assert 10.0 == client.ts().get("time-serie-add-ooo-last")[1] # Test for duplicate policy FIRST assert 1 == client.ts().add("time-serie-add-ooo-first", 1, 5.0) assert 1 == client.ts().add( - "time-serie-add-ooo-first", 1, 10.0, duplicate_policy="first" + "time-serie-add-ooo-first", 1, 10.0, on_duplicate="first" ) assert 5.0 == client.ts().get("time-serie-add-ooo-first")[1] # Test for duplicate policy MAX assert 1 == client.ts().add("time-serie-add-ooo-max", 1, 5.0) - assert 1 == client.ts().add( - "time-serie-add-ooo-max", 1, 10.0, duplicate_policy="max" - ) + assert 1 == client.ts().add("time-serie-add-ooo-max", 1, 10.0, on_duplicate="max") assert 10.0 == client.ts().get("time-serie-add-ooo-max")[1] # Test for duplicate policy MIN assert 1 == client.ts().add("time-serie-add-ooo-min", 1, 5.0) - assert 1 == client.ts().add( - "time-serie-add-ooo-min", 1, 10.0, duplicate_policy="min" - ) + assert 1 == client.ts().add("time-serie-add-ooo-min", 1, 10.0, on_duplicate="min") assert 5.0 == client.ts().get("time-serie-add-ooo-min")[1] @@ -163,6 +157,15 @@ def test_madd(client): assert [1, 2, 3] == client.ts().madd([("a", 1, 5), ("a", 2, 10), ("a", 3, 15)]) +@pytest.mark.redismod +def test_madd_missing_timeseries(client): + response = client.ts().madd([("a", 1, 5), ("a", 2, 10)]) + assert isinstance(response, list) + assert len(response) == 2 + assert isinstance(response[0], redis.ResponseError) + assert isinstance(response[1], redis.ResponseError) + + @pytest.mark.redismod def test_incrby_decrby(client): for _ in range(100): @@ -217,12 +220,12 @@ def test_create_and_delete_rule(client): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_del_range(client): try: client.ts().delete("test", 0, 100) - except Exception as e: - assert e.__str__() != "" + except redis.ResponseError as e: + assert "key does not exist" in str(e) for i in range(100): client.ts().add(1, i, i % 7) @@ -247,7 +250,7 @@ def test_range(client): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_range_advanced(client): for i in range(100): client.ts().add(1, i, i % 7) @@ -381,7 +384,7 @@ def test_range_empty(client: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_rev_range(client): for i in range(100): client.ts().add(1, i, i % 7) @@ -578,7 +581,7 @@ def test_mrange(client): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_multi_range_advanced(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -722,7 +725,7 @@ def test_mrange_latest(client: redis.Redis): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_multi_reverse_range(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -1010,9 +1013,157 @@ def test_pipeline(client): def test_uncompressed(client): client.ts().create("compressed") client.ts().create("uncompressed", uncompressed=True) + for i in range(1000): + client.ts().add("compressed", i, i) + client.ts().add("uncompressed", i, i) compressed_info = client.ts().info("compressed") uncompressed_info = client.ts().info("uncompressed") if is_resp2_connection(client): - assert compressed_info.memory_usage != uncompressed_info.memory_usage + assert compressed_info.memory_usage < uncompressed_info.memory_usage else: - assert compressed_info["memoryUsage"] != uncompressed_info["memoryUsage"] + assert compressed_info["memoryUsage"] < uncompressed_info["memoryUsage"] + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_create_with_insertion_filters(client): + client.ts().create( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1000 == client.ts().add("time-series-1", 1000, 1.0) + assert 1010 == client.ts().add("time-series-1", 1010, 11.0) + assert 1010 == client.ts().add("time-series-1", 1013, 10.0) + assert 1020 == client.ts().add("time-series-1", 1020, 11.5) + assert 1021 == client.ts().add("time-series-1", 1021, 22.0) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1020, 11.5), (1021, 22.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_create_with_insertion_filters_other_duplicate_policy(client): + client.ts().create( + "time-series-1", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1000 == client.ts().add("time-series-1", 1000, 1.0) + assert 1010 == client.ts().add("time-series-1", 1010, 11.0) + # Still accepted because the duplicate_policy is not `last`. + assert 1013 == client.ts().add("time-series-1", 1013, 10.0) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1013, 10)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_alter_with_insertion_filters(client): + assert 1000 == client.ts().add("time-series-1", 1000, 1.0) + assert 1010 == client.ts().add("time-series-1", 1010, 11.0) + assert 1013 == client.ts().add("time-series-1", 1013, 10.0) + + client.ts().alter( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1013 == client.ts().add("time-series-1", 1015, 11.5) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1013, 10.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_add_with_insertion_filters(client): + assert 1000 == client.ts().add( + "time-series-1", + 1000, + 1.0, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == client.ts().add("time-series-1", 1004, 3.0) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_incrby_with_insertion_filters(client): + assert 1000 == client.ts().incrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == client.ts().incrby("time-series-1", 3.0, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + assert 1000 == client.ts().incrby("time-series-1", 10.1, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 11.1)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_decrby_with_insertion_filters(client): + assert 1000 == client.ts().decrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == client.ts().decrby("time-series-1", 3.0, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -1.0)] + assert expected_points == data_points + + assert 1000 == client.ts().decrby("time-series-1", 10.1, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -11.1)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_madd_with_insertion_filters(client): + client.ts().create( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1010 == client.ts().add("time-series-1", 1010, 1.0) + assert [1010, 1010, 1020, 1021] == client.ts().madd( + [ + ("time-series-1", 1011, 11.0), + ("time-series-1", 1013, 10.0), + ("time-series-1", 1020, 2.0), + ("time-series-1", 1021, 22.0), + ] + ) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1010, 1.0), (1020, 2.0), (1021, 22.0)] + assert expected_points == data_points