diff --git a/langgraph/checkpoint/redis/__init__.py b/langgraph/checkpoint/redis/__init__.py index 2fbb151..4f67cf0 100644 --- a/langgraph/checkpoint/redis/__init__.py +++ b/langgraph/checkpoint/redis/__init__.py @@ -1,8 +1,8 @@ from __future__ import annotations import json -from contextlib import contextmanager import logging +from contextlib import contextmanager from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast from langchain_core.runnables import RunnableConfig diff --git a/langgraph/checkpoint/redis/aio.py b/langgraph/checkpoint/redis/aio.py index a6c7fe8..3fd7211 100644 --- a/langgraph/checkpoint/redis/aio.py +++ b/langgraph/checkpoint/redis/aio.py @@ -595,16 +595,10 @@ async def aput( # Apply TTL if configured if self.ttl_config and "default_ttl" in self.ttl_config: - all_keys = ( - [checkpoint_key] + [key for key, _ in blobs] - if blobs - else [checkpoint_key] + await self._apply_ttl_to_keys( + checkpoint_key, + [key for key, _ in blobs] if blobs else None, ) - ttl_minutes = self.ttl_config.get("default_ttl") - ttl_seconds = int(ttl_minutes * 60) - - for key in all_keys: - await self._redis.expire(key, ttl_seconds) else: # For non-cluster mode, use pipeline with transaction for atomicity pipeline = self._redis.pipeline(transaction=True) @@ -622,19 +616,10 @@ async def aput( # Apply TTL to checkpoint and blob keys if configured if self.ttl_config and "default_ttl" in self.ttl_config: - all_keys = ( - [checkpoint_key] + [key for key, _ in blobs] - if blobs - else [checkpoint_key] + await self._apply_ttl_to_keys( + checkpoint_key, + [key for key, _ in blobs] if blobs else None, ) - ttl_minutes = self.ttl_config.get("default_ttl") - ttl_seconds = int(ttl_minutes * 60) - - # Use a new pipeline for TTL operations - ttl_pipeline = self._redis.pipeline() - for key in all_keys: - ttl_pipeline.expire(key, ttl_seconds) - await ttl_pipeline.execute() return next_config @@ -780,11 +765,10 @@ async def aput_writes( and self.ttl_config and "default_ttl" in self.ttl_config ): - ttl_minutes = self.ttl_config.get("default_ttl") - ttl_seconds = int(ttl_minutes * 60) - - for key in created_keys: - await self._redis.expire(key, ttl_seconds) + await self._apply_ttl_to_keys( + created_keys[0], + created_keys[1:] if len(created_keys) > 1 else None, + ) else: # For non-cluster mode, use transaction pipeline for atomicity pipeline = self._redis.pipeline(transaction=True) diff --git a/langgraph/checkpoint/redis/types.py b/langgraph/checkpoint/redis/types.py index 52c6ab2..7733fbf 100644 --- a/langgraph/checkpoint/redis/types.py +++ b/langgraph/checkpoint/redis/types.py @@ -2,8 +2,8 @@ from redis import Redis from redis.asyncio import Redis as AsyncRedis -from redis.cluster import RedisCluster from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster +from redis.cluster import RedisCluster from redisvl.index import AsyncSearchIndex, SearchIndex RedisClientType = TypeVar( diff --git a/tests/conftest.py b/tests/conftest.py index 1469572..aa58e2c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import asyncio import os +import socket +import time import pytest from redis.asyncio import Redis @@ -58,6 +60,20 @@ def redis_url(redis_container): on container port 6379 (mapped to an ephemeral port on the host). """ host, port = redis_container.get_service_host_and_port("redis", 6379) + + # Wait up to 15 seconds for the container to accept TCP connections. + deadline = time.time() + 15 + while True: + try: + with socket.create_connection((host, int(port)), timeout=1): + break # Redis is accepting connections + except OSError: + if time.time() > deadline: + pytest.skip( + "Redis container failed to become ready for this worker – skipping tests." + ) + time.sleep(0.5) + return f"redis://{host}:{port}" diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 1441e30..c6b70e6 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -3,12 +3,13 @@ services: redis: image: "${REDIS_IMAGE}" ports: - - "6379" + - target: 6379 + published: 0 + protocol: tcp + mode: host environment: - "REDIS_ARGS=--save '' --appendonly no" deploy: replicas: 1 restart_policy: condition: on-failure - labels: - - "com.docker.compose.publishers=redis,6379,6379" \ No newline at end of file diff --git a/tests/test_async_cluster_mode.py b/tests/test_async_cluster_mode.py index a942ee9..839d045 100644 --- a/tests/test_async_cluster_mode.py +++ b/tests/test_async_cluster_mode.py @@ -10,8 +10,8 @@ RedisCluster as AsyncRedisCluster, # Import actual for isinstance checks if needed by store ) -from langgraph.store.redis import AsyncRedisStore from langgraph.checkpoint.redis.aio import AsyncRedisSaver +from langgraph.store.redis import AsyncRedisStore # Override session-scoped redis_container fixture to prevent Docker operations and provide dummy host/port diff --git a/tests/test_cluster_mode.py b/tests/test_cluster_mode.py index a4edfba..cf80fa0 100644 --- a/tests/test_cluster_mode.py +++ b/tests/test_cluster_mode.py @@ -7,19 +7,19 @@ from unittest.mock import MagicMock import pytest +from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata from langgraph.store.base import GetOp, ListNamespacesOp, PutOp, SearchOp from redis import Redis from redis.cluster import RedisCluster as SyncRedisCluster from ulid import ULID +from langgraph.checkpoint.redis import RedisSaver from langgraph.store.redis import RedisStore from langgraph.store.redis.base import ( REDIS_KEY_SEPARATOR, STORE_PREFIX, STORE_VECTOR_PREFIX, ) -from langgraph.checkpoint.redis import RedisSaver -from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata # Override session-scoped redis_container fixture to prevent Docker operations and provide dummy host/port