Skip to content

Enable creating an index "from_existing" #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 95 additions & 23 deletions redisvl/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

from redisvl.index.storage import HashStorage, JsonStorage
from redisvl.query.query import BaseQuery, CountQuery, FilterQuery
from redisvl.redis.connection import RedisConnectionFactory
from redisvl.redis.connection import (
RedisConnectionFactory,
convert_index_info_to_schema,
validate_modules,
)
from redisvl.redis.utils import convert_bytes
from redisvl.schema import IndexSchema, StorageType
from redisvl.utils.log import get_logger
Expand Down Expand Up @@ -102,7 +106,7 @@ def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.exists():
raise ValueError(
raise RuntimeError(
f"Index has not been created. Must be created before calling {func.__name__}"
)
return func(self, *args, **kwargs)
Expand Down Expand Up @@ -162,7 +166,6 @@ def __init__(

self.schema = schema

# set custom lib name
self._lib_name: Optional[str] = kwargs.pop("lib_name", None)

# set up redis connection
Expand Down Expand Up @@ -317,6 +320,34 @@ class SearchIndex(BaseSearchIndex):

"""

@classmethod
def from_existing(
cls,
name: str,
redis_client: Optional[redis.Redis] = None,
redis_url: Optional[str] = None,
**kwargs,
):
# Handle redis instance
if redis_url:
redis_client = RedisConnectionFactory.connect(
redis_url=redis_url, use_async=False, **kwargs
)
if not redis_client:
raise ValueError(
"Must provide either a redis_url or redis_client to fetch Redis index info."
)

# Validate modules
installed_modules = RedisConnectionFactory._get_modules(redis_client)
validate_modules(installed_modules, [{"name": "search", "ver": 20810}])
Copy link
Collaborator

@bsbodden bsbodden Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is search the only one we need to validate? I'm looking at the remote possibility that somebody did their own module add in the configuration with mismatched versions, but that likely entail us to have a compatibility matrix with at least 3 columns, Redis version , Search module version, and JSON module version

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the ability to read in from the FT.INFO output, it's not tied to any module besides Search. Not sure we can invest in supporting additional/ad-hoc modules (yet). Will likely require a bit of work to get there. But I do want to clean up the module and connection factory interfaces soon to make them a bit cleaner and more generic.


# Fetch index info and convert to schema
index_info = cls._info(name, redis_client)
schema_dict = convert_index_info_to_schema(index_info)
schema = IndexSchema.from_dict(schema_dict)
return cls(schema, redis_client, **kwargs)

def connect(self, redis_url: Optional[str] = None, **kwargs):
"""Connect to a Redis instance using the provided `redis_url`, falling
back to the `REDIS_URL` environment variable (if available).
Expand Down Expand Up @@ -653,22 +684,28 @@ def exists(self) -> bool:
"""
return self.schema.index.name in self.listall()

@staticmethod
def _info(name: str, redis_client: redis.Redis) -> Dict[str, Any]:
"""Run FT.INFO to fetch information about the index."""
try:
return convert_bytes(redis_client.ft(name).info()) # type: ignore
except:
logger.exception(f"Error while fetching {name} index info")
raise

@check_index_exists()
def info(self) -> Dict[str, Any]:
def info(self, name: Optional[str] = None) -> Dict[str, Any]:
"""Get information about the index.

Args:
name (str, optional): Index name to fetch info about.
Defaults to None.

Returns:
dict: A dictionary containing the information about the index.
"""
try:
return convert_bytes(
self._redis_client.ft(self.schema.index.name).info() # type: ignore
)
except:
logger.exception(
f"Error while fetching {self.schema.index.name} index info"
)
raise
index_name = name or self.schema.index.name
return self._info(index_name, self._redis_client) # type: ignore


class AsyncSearchIndex(BaseSearchIndex):
Expand Down Expand Up @@ -698,6 +735,36 @@ class AsyncSearchIndex(BaseSearchIndex):

"""

@classmethod
async def from_existing(
cls,
name: str,
redis_client: Optional[aredis.Redis] = None,
redis_url: Optional[str] = None,
**kwargs,
):
if redis_url:
redis_client = RedisConnectionFactory.connect(
redis_url=redis_url, use_async=True, **kwargs
)

if not redis_client:
raise ValueError(
"Must provide either a redis_url or redis_client to fetch Redis index info."
)

# Validate modules
installed_modules = await RedisConnectionFactory._get_modules_async(
redis_client
)
validate_modules(installed_modules, [{"name": "search", "ver": 20810}])

# Fetch index info and convert to schema
index_info = await cls._info(name, redis_client)
schema_dict = convert_index_info_to_schema(index_info)
schema = IndexSchema.from_dict(schema_dict)
return cls(schema, redis_client, **kwargs)

def connect(self, redis_url: Optional[str] = None, **kwargs):
"""Connect to a Redis instance using the provided `redis_url`, falling
back to the `REDIS_URL` environment variable (if available).
Expand Down Expand Up @@ -1035,19 +1102,24 @@ async def exists(self) -> bool:
"""
return self.schema.index.name in await self.listall()

@staticmethod
async def _info(name: str, redis_client: aredis.Redis) -> Dict[str, Any]:
try:
return convert_bytes(await redis_client.ft(name).info()) # type: ignore
except:
logger.exception(f"Error while fetching {name} index info")
raise

@check_async_index_exists()
async def info(self) -> Dict[str, Any]:
async def info(self, name: Optional[str] = None) -> Dict[str, Any]:
"""Get information about the index.

Args:
name (str, optional): Index name to fetch info about.
Defaults to None.

Returns:
dict: A dictionary containing the information about the index.
"""
try:
return convert_bytes(
await self._redis_client.ft(self.schema.index.name).info() # type: ignore
)
except:
logger.exception(
f"Error while fetching {self.schema.index.name} index info"
)
raise
index_name = name or self.schema.index.name
return await self._info(index_name, self._redis_client) # type: ignore
139 changes: 100 additions & 39 deletions redisvl/redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
)
from redis.exceptions import ResponseError

from redisvl.redis.constants import REDIS_REQUIRED_MODULES
from redisvl.redis.constants import DEFAULT_REQUIRED_MODULES
from redisvl.redis.utils import convert_bytes
from redisvl.version import __version__


def unpack_redis_modules(module_list: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Unpack a list of Redis modules pulled from the MODULES LIST command."""
return {module["name"]: module["ver"] for module in module_list}


def get_address_from_env() -> str:
"""Get a redis connection from environment variables.

Expand All @@ -43,6 +48,82 @@ def make_lib_name(*args) -> str:
return f"redis-py({custom_libs})"


def convert_index_info_to_schema(index_info: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the output of FT.INFO into a schema-ready dictionary.

Args:
index_info (Dict[str, Any]): Output of the Redis FT.INFO command.

Returns:
Dict[str, Any]: Schema dictionary.
"""
index_name = index_info["index_name"]
prefixes = index_info["index_definition"][3][0]
storage_type = index_info["index_definition"][1].lower()

index_fields = index_info["attributes"]

def parse_vector_attrs(attrs):
vector_attrs = {attrs[i].lower(): attrs[i + 1] for i in range(6, len(attrs), 2)}
vector_attrs["dims"] = int(vector_attrs.pop("dim"))
vector_attrs["distance_metric"] = vector_attrs.pop("distance_metric").lower()
vector_attrs["algorithm"] = vector_attrs.pop("algorithm").lower()
vector_attrs["datatype"] = vector_attrs.pop("data_type").lower()
return vector_attrs

def parse_attrs(attrs):
return {attrs[i].lower(): attrs[i + 1] for i in range(6, len(attrs), 2)}

schema_fields = []

for field_attrs in index_fields:
# parse field info
name = field_attrs[1] if storage_type == "hash" else field_attrs[3]
field = {"name": name, "type": field_attrs[5].lower()}
if storage_type == "json":
field["path"] = field_attrs[1]
# parse field attrs
if field_attrs[5] == "VECTOR":
field["attrs"] = parse_vector_attrs(field_attrs)
else:
field["attrs"] = parse_attrs(field_attrs)
# append field
schema_fields.append(field)

return {
"index": {"name": index_name, "prefix": prefixes, "storage_type": storage_type},
"fields": schema_fields,
}


def validate_modules(
installed_modules: Dict[str, Any],
required_modules: Optional[List[Dict[str, Any]]] = None,
) -> None:
"""
Validates if required Redis modules are installed.

Args:
installed_modules: List of installed modules.
required_modules: List of required modules.

Raises:
ValueError: If required Redis modules are not installed.
"""
required_modules = required_modules or DEFAULT_REQUIRED_MODULES

for required_module in required_modules:
if required_module["name"] in installed_modules:
installed_version = installed_modules[required_module["name"]] # type: ignore
if int(installed_version) >= int(required_module["ver"]): # type: ignore
return

raise ValueError(
f"Required Redis database module {required_module['name']} with version >= {required_module['ver']} not installed. "
"See Redis Stack documentation: https://redis.io/docs/stack/"
)


class RedisConnectionFactory:
"""Builds connections to a Redis database, supporting both synchronous and
asynchronous clients.
Expand Down Expand Up @@ -128,14 +209,14 @@ def get_async_redis_connection(url: Optional[str] = None, **kwargs) -> AsyncRedi
def validate_redis(
client: Union[Redis, AsyncRedis],
lib_name: Optional[str] = None,
redis_required_modules: Optional[List[Dict[str, Any]]] = None,
required_modules: Optional[List[Dict[str, Any]]] = None,
) -> None:
"""Validates the Redis connection.

Args:
client (Redis or AsyncRedis): Redis client.
lib_name (str): Library name to set on the Redis client.
redis_required_modules (List[Dict[str, Any]]): List of required modules and their versions.
required_modules (List[Dict[str, Any]]): List of required modules and their versions.

Raises:
ValueError: If required Redis modules are not installed.
Expand All @@ -145,18 +226,26 @@ def validate_redis(
RedisConnectionFactory._validate_async_redis,
client,
lib_name,
redis_required_modules,
required_modules,
)
else:
RedisConnectionFactory._validate_sync_redis(
client, lib_name, redis_required_modules
client, lib_name, required_modules
)

@staticmethod
def _get_modules(client: Redis) -> Dict[str, Any]:
return unpack_redis_modules(convert_bytes(client.module_list()))

@staticmethod
async def _get_modules_async(client: AsyncRedis) -> Dict[str, Any]:
return unpack_redis_modules(convert_bytes(await client.module_list()))

@staticmethod
def _validate_sync_redis(
client: Redis,
lib_name: Optional[str],
redis_required_modules: Optional[List[Dict[str, Any]]],
required_modules: Optional[List[Dict[str, Any]]],
) -> None:
"""Validates the sync client."""
# Set client library name
Expand All @@ -168,16 +257,16 @@ def _validate_sync_redis(
client.echo(_lib_name)

# Get list of modules
modules_list = convert_bytes(client.module_list())
installed_modules = RedisConnectionFactory._get_modules(client)

# Validate available modules
RedisConnectionFactory._validate_modules(modules_list, redis_required_modules)
validate_modules(installed_modules, required_modules)

@staticmethod
async def _validate_async_redis(
client: AsyncRedis,
lib_name: Optional[str],
redis_required_modules: Optional[List[Dict[str, Any]]],
required_modules: Optional[List[Dict[str, Any]]],
) -> None:
"""Validates the async client."""
# Set client library name
Expand All @@ -189,10 +278,10 @@ async def _validate_async_redis(
await client.echo(_lib_name)

# Get list of modules
modules_list = convert_bytes(await client.module_list())
installed_modules = await RedisConnectionFactory._get_modules_async(client)

# Validate available modules
RedisConnectionFactory._validate_modules(modules_list, redis_required_modules)
validate_modules(installed_modules, required_modules)

@staticmethod
def _run_async(coro, *args, **kwargs):
Expand Down Expand Up @@ -232,31 +321,3 @@ def _run_async(coro, *args, **kwargs):
finally:
# Close the event loop to release resources
loop.close()

@staticmethod
def _validate_modules(
installed_modules, redis_required_modules: Optional[List[Dict[str, Any]]] = None
) -> None:
"""
Validates if required Redis modules are installed.

Args:
installed_modules: List of installed modules.
redis_required_modules: List of required modules.

Raises:
ValueError: If required Redis modules are not installed.
"""
installed_modules = {module["name"]: module for module in installed_modules}
redis_required_modules = redis_required_modules or REDIS_REQUIRED_MODULES

for required_module in redis_required_modules:
if required_module["name"] in installed_modules:
installed_version = installed_modules[required_module["name"]]["ver"]
if int(installed_version) >= int(required_module["ver"]): # type: ignore
return

raise ValueError(
f"Required Redis database module {required_module['name']} with version >= {required_module['ver']} not installed. "
"Refer to Redis Stack documentation: https://redis.io/docs/stack/"
)
2 changes: 1 addition & 1 deletion redisvl/redis/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# required modules
REDIS_REQUIRED_MODULES = [
DEFAULT_REQUIRED_MODULES = [
{"name": "search", "ver": 20600},
{"name": "searchlight", "ver": 20600},
]
Expand Down
Loading
Loading