Skip to content

Fix docker issues, TTL cleanup #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion langgraph/checkpoint/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

import json
from contextlib import contextmanager
import logging
from contextlib import contextmanager
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast

from langchain_core.runnables import RunnableConfig
Expand Down
36 changes: 10 additions & 26 deletions langgraph/checkpoint/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,10 @@ async def aput(

# Apply TTL if configured
if self.ttl_config and "default_ttl" in self.ttl_config:
all_keys = (
[checkpoint_key] + [key for key, _ in blobs]
if blobs
else [checkpoint_key]
await self._apply_ttl_to_keys(
checkpoint_key,
[key for key, _ in blobs] if blobs else None,
)
ttl_minutes = self.ttl_config.get("default_ttl")
ttl_seconds = int(ttl_minutes * 60)

for key in all_keys:
await self._redis.expire(key, ttl_seconds)
else:
# For non-cluster mode, use pipeline with transaction for atomicity
pipeline = self._redis.pipeline(transaction=True)
Expand All @@ -622,19 +616,10 @@ async def aput(

# Apply TTL to checkpoint and blob keys if configured
if self.ttl_config and "default_ttl" in self.ttl_config:
all_keys = (
[checkpoint_key] + [key for key, _ in blobs]
if blobs
else [checkpoint_key]
await self._apply_ttl_to_keys(
checkpoint_key,
[key for key, _ in blobs] if blobs else None,
)
ttl_minutes = self.ttl_config.get("default_ttl")
ttl_seconds = int(ttl_minutes * 60)

# Use a new pipeline for TTL operations
ttl_pipeline = self._redis.pipeline()
for key in all_keys:
ttl_pipeline.expire(key, ttl_seconds)
await ttl_pipeline.execute()

return next_config

Expand Down Expand Up @@ -780,11 +765,10 @@ async def aput_writes(
and self.ttl_config
and "default_ttl" in self.ttl_config
):
ttl_minutes = self.ttl_config.get("default_ttl")
ttl_seconds = int(ttl_minutes * 60)

for key in created_keys:
await self._redis.expire(key, ttl_seconds)
await self._apply_ttl_to_keys(
created_keys[0],
created_keys[1:] if len(created_keys) > 1 else None,
)
else:
# For non-cluster mode, use transaction pipeline for atomicity
pipeline = self._redis.pipeline(transaction=True)
Expand Down
2 changes: 1 addition & 1 deletion langgraph/checkpoint/redis/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from redis import Redis
from redis.asyncio import Redis as AsyncRedis
from redis.cluster import RedisCluster
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
from redis.cluster import RedisCluster
from redisvl.index import AsyncSearchIndex, SearchIndex

RedisClientType = TypeVar(
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import os
import socket
import time

import pytest
from redis.asyncio import Redis
Expand Down Expand Up @@ -58,6 +60,20 @@ def redis_url(redis_container):
on container port 6379 (mapped to an ephemeral port on the host).
"""
host, port = redis_container.get_service_host_and_port("redis", 6379)

# Wait up to 15 seconds for the container to accept TCP connections.
deadline = time.time() + 15
while True:
try:
with socket.create_connection((host, int(port)), timeout=1):
break # Redis is accepting connections
except OSError:
if time.time() > deadline:
pytest.skip(
"Redis container failed to become ready for this worker – skipping tests."
)
time.sleep(0.5)

return f"redis://{host}:{port}"


Expand Down
7 changes: 4 additions & 3 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ services:
redis:
image: "${REDIS_IMAGE}"
ports:
- "6379"
- target: 6379
published: 0
protocol: tcp
mode: host
environment:
- "REDIS_ARGS=--save '' --appendonly no"
deploy:
replicas: 1
restart_policy:
condition: on-failure
labels:
- "com.docker.compose.publishers=redis,6379,6379"
2 changes: 1 addition & 1 deletion tests/test_async_cluster_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
RedisCluster as AsyncRedisCluster, # Import actual for isinstance checks if needed by store
)

from langgraph.store.redis import AsyncRedisStore
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.store.redis import AsyncRedisStore


# Override session-scoped redis_container fixture to prevent Docker operations and provide dummy host/port
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cluster_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
from unittest.mock import MagicMock

import pytest
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata
from langgraph.store.base import GetOp, ListNamespacesOp, PutOp, SearchOp
from redis import Redis
from redis.cluster import RedisCluster as SyncRedisCluster
from ulid import ULID

from langgraph.checkpoint.redis import RedisSaver
from langgraph.store.redis import RedisStore
from langgraph.store.redis.base import (
REDIS_KEY_SEPARATOR,
STORE_PREFIX,
STORE_VECTOR_PREFIX,
)
from langgraph.checkpoint.redis import RedisSaver
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata


# Override session-scoped redis_container fixture to prevent Docker operations and provide dummy host/port
Expand Down