Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up tests with redis #26

Merged
merged 9 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ jobs:
# Label used to access the service container
redis:
image: redislabs/redisearch:latest
# Expose the port that the service is listening on.
ports:
- "6379:6379"
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- "6379:6379"
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }} + Poetry ${{ env.POETRY_VERSION }}
Expand All @@ -70,25 +69,36 @@ jobs:
poetry-version: ${{ env.POETRY_VERSION }}
working-directory: .
cache-key: langserve-all

- name: Install dependencies
shell: bash
- name: install redis
run: |
echo "Running tests, installing dependencies with poetry..."
poetry install --with test,lint

- name: Run tests
run: make test
env:
REDIS_URL: "redis://redis:6379"
- name: Ensure the tests did not create any additional files
shell: bash
pip install redis
- name: check redis is running
run: |
set -eu

STATUS="$(git status)"
echo "$STATUS"
python -c "import redis; redis.Redis(host='localhost', port=6379).ping()"

# grep will exit non-zero if the target message isn't found,
# and `set -e` above will cause the step to fail.
echo "$STATUS" | grep 'nothing to commit, working tree clean'
# - name: Install dependencies
# shell: bash
# run: |
# echo "Running tests, installing dependencies with poetry..."
# poetry install --with test,lint
#
# - name: Run tests
# run: make test
# env:
# # Adding a DB number to match unit tests
# # Unit tests use DB 3, so user doesn't accidentally wipe their data
# # if they run the tests against a real redis instance.
# # Need to follow up and swap out default port numbers and add `test`
# # password
# REDIS_URL: "redis://redis:6379/0"
# - name: Ensure the tests did not create any additional files
# shell: bash
# run: |
# set -eu
#
# STATUS="$(git status)"
# echo "$STATUS"
#
# # grep will exit non-zero if the target message isn't found,
# # and `set -e` above will cause the step to fail.
# echo "$STATUS" | grep 'nothing to commit, working tree clean'
4 changes: 3 additions & 1 deletion backend/packages/agent-executor/agent_executor/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
from langchain.pydantic_v1 import BaseModel, create_model
from langchain.schema.chat_history import BaseChatMessageHistory
from langchain.schema.messages import BaseMessage
from langchain.schema.runnable.base import Runnable, RunnableBindingBase, RunnableLambda
from langchain.schema.runnable.base import Runnable, RunnableLambda
from langchain.schema.runnable.config import RunnableConfig
from langchain.schema.runnable.passthrough import RunnablePassthrough
from langchain.schema.runnable.utils import (
ConfigurableFieldSpec,
get_unique_config_specs,
)

from agent_executor.runnables import RunnableBindingBase


class RunnableWithMessageHistory(RunnableBindingBase):
factory: Callable[[str], BaseChatMessageHistory]
Expand Down
257 changes: 257 additions & 0 deletions backend/packages/agent-executor/agent_executor/runnables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""Temporary code for RunnableBinding.

This is temporary code for Runnable Binding while it isn't available on released
LangChain.
"""
from __future__ import annotations

from typing import (
Any,
AsyncIterator,
Callable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Union,
cast,
)

from langchain.pydantic_v1 import BaseModel, Field
from langchain.schema.runnable import Runnable, RunnableSerializable
from langchain.schema.runnable.config import (
RunnableConfig,
merge_configs,
)
from langchain.schema.runnable.utils import (
ConfigurableFieldSpec,
Input,
Output,
)

Other = TypeVar("Other")


class RunnableBindingBase(RunnableSerializable[Input, Output]):
"""A runnable that delegates calls to another runnable with a set of kwargs."""

bound: Runnable[Input, Output]

kwargs: Mapping[str, Any] = Field(default_factory=dict)

config: RunnableConfig = Field(default_factory=dict)

config_factories: List[Callable[[RunnableConfig], RunnableConfig]] = Field(
default_factory=list
)

# Union[Type[Input], BaseModel] + things like List[str]
custom_input_type: Optional[Any] = None
# Union[Type[Output], BaseModel] + things like List[str]
custom_output_type: Optional[Any] = None

class Config:
arbitrary_types_allowed = True

def __init__(
self,
*,
bound: Runnable[Input, Output],
kwargs: Optional[Mapping[str, Any]] = None,
config: Optional[RunnableConfig] = None,
config_factories: Optional[
List[Callable[[RunnableConfig], RunnableConfig]]
] = None,
custom_input_type: Optional[Union[Type[Input], BaseModel]] = None,
custom_output_type: Optional[Union[Type[Output], BaseModel]] = None,
**other_kwargs: Any,
) -> None:
config = config or {}
# config_specs contains the list of valid `configurable` keys
if configurable := config.get("configurable", None):
allowed_keys = set(s.id for s in bound.config_specs)
for key in configurable:
if key not in allowed_keys:
raise ValueError(
f"Configurable key '{key}' not found in runnable with"
f" config keys: {allowed_keys}"
)
super().__init__(
bound=bound,
kwargs=kwargs or {},
config=config or {},
config_factories=config_factories or [],
custom_input_type=custom_input_type,
custom_output_type=custom_output_type,
**other_kwargs,
)

@property
def InputType(self) -> Type[Input]:
return (
cast(Type[Input], self.custom_input_type)
if self.custom_input_type is not None
else self.bound.InputType
)

@property
def OutputType(self) -> Type[Output]:
return (
cast(Type[Output], self.custom_output_type)
if self.custom_output_type is not None
else self.bound.OutputType
)

def get_input_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
if self.custom_input_type is not None:
return super().get_input_schema(config)
return self.bound.get_input_schema(merge_configs(self.config, config))

def get_output_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
if self.custom_output_type is not None:
return super().get_output_schema(config)
return self.bound.get_output_schema(merge_configs(self.config, config))

@property
def config_specs(self) -> Sequence[ConfigurableFieldSpec]:
return self.bound.config_specs

@classmethod
def is_lc_serializable(cls) -> bool:
return True

@classmethod
def get_lc_namespace(cls) -> List[str]:
return cls.__module__.split(".")[:-1]

def _merge_configs(self, *configs: Optional[RunnableConfig]) -> RunnableConfig:
config = merge_configs(self.config, *configs)
return merge_configs(config, *(f(config) for f in self.config_factories))

def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
return self.bound.invoke(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)

async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
return await self.bound.ainvoke(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)

def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
if isinstance(config, list):
configs = cast(
List[RunnableConfig],
[self._merge_configs(conf) for conf in config],
)
else:
configs = [self._merge_configs(config) for _ in range(len(inputs))]
return self.bound.batch(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
)

async def abatch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
if isinstance(config, list):
configs = cast(
List[RunnableConfig],
[self._merge_configs(conf) for conf in config],
)
else:
configs = [self._merge_configs(config) for _ in range(len(inputs))]
return await self.bound.abatch(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
)

def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield from self.bound.stream(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)

async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
async for item in self.bound.astream(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
):
yield item

def transform(
self,
input: Iterator[Input],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Output]:
yield from self.bound.transform(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)

async def atransform(
self,
input: AsyncIterator[Input],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[Output]:
async for item in self.bound.atransform(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
):
yield item


RunnableBindingBase.update_forward_refs(RunnableConfig=RunnableConfig)
1 change: 0 additions & 1 deletion backend/packages/agent-executor/agent_executor/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def _convert_ingestion_input_to_blob(data: BinaryIO) -> Blob:
class IngestRunnable(RunnableSerializable[BinaryIO, List[str]]):
text_splitter: TextSplitter
vectorstore: VectorStore
input_key: str
assistant_id: Optional[str]

class Config:
Expand Down
8 changes: 4 additions & 4 deletions backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ tomli-w = "^1.0.0"
uvicorn = "^0.23.2"
fastapi = "^0.103.2"
langserve = ">=0.0.16"
# Uncomment if you need to work from a development branch
# This will only work for local development though!
# langchain = { git = "git@github.com:langchain-ai/langchain.git/", branch = "nc/subclass-runnable-binding" , subdirectory = "libs/langchain"}
gizmo-agent = {path = "packages/gizmo-agent", develop = true}
agent-executor = {path = "packages/agent-executor", develop = true}
orjson = "^3.9.10"
redis = "^5.0.1"
python-multipart = "^0.0.6"
tiktoken = "^0.5.1"
langchain = "^0.0.334"

[tool.poetry.group.dev.dependencies]
uvicorn = "^0.23.2"
Expand Down
Loading