Skip to content

Commit

Permalink
[Feature] TensorDictMap Query module
Browse files Browse the repository at this point in the history
ghstack-source-id: d0ef69f42342b66e5d3e39f4029818373169bca0
Pull Request resolved: #2305
  • Loading branch information
vmoens committed Oct 14, 2024
1 parent 1a4b2cc commit 4860674
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 2 deletions.
66 changes: 65 additions & 1 deletion test/test_storage_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import torch

from torchrl.data.map import BinaryToDecimal, RandomProjectionHash, SipHash
from tensordict import TensorDict
from torchrl.data.map import BinaryToDecimal, QueryModule, RandomProjectionHash, SipHash

_has_gym = importlib.util.find_spec("gymnasium", None) or importlib.util.find_spec(
"gym", None
Expand Down Expand Up @@ -51,6 +52,69 @@ def test_randomprojection_hash(self, n_components, scale):
assert y.unique().numel() == y.numel()


class TestQuery:
def test_query_construct(self):
query_module = QueryModule(
in_keys=[(("key1",),), (("another",), "key2")],
index_key=("some", ("_index",)),
hash_module=SipHash(),
clone=False,
)
assert not query_module.clone
assert query_module.in_keys == ["key1", ("another", "key2")]
assert query_module.index_key == ("some", "_index")
assert isinstance(query_module.hash_module, dict)
assert isinstance(
query_module.aggregator,
type(query_module.hash_module[query_module.in_keys[0]]),
)
query_module = QueryModule(
in_keys=[(("key1",),), (("another",), "key2")],
index_key=("some", ("_index",)),
hash_module=SipHash(),
clone=False,
aggregator=SipHash(),
)
# assert not isinstance(query_module.aggregator is not query_module.hash_module[0]
assert isinstance(query_module.aggregator, SipHash)
query_module = QueryModule(
in_keys=[(("key1",),), (("another",), "key2")],
index_key=("some", ("_index",)),
hash_module=[SipHash(), SipHash()],
clone=False,
)
# assert query_module.aggregator is not query_module.hash_module[0]
assert isinstance(query_module.aggregator, SipHash)

@pytest.mark.parametrize("index_key", ["index", ("another", "index")])
@pytest.mark.parametrize("clone", [True, False])
def test_query(self, clone, index_key):
query_module = QueryModule(
in_keys=["key1", "key2"],
index_key=index_key,
hash_module=SipHash(),
clone=clone,
)

query = TensorDict(
{
"key1": torch.Tensor([[1], [1], [1], [2]]),
"key2": torch.Tensor([[3], [3], [2], [3]]),
},
batch_size=(4,),
)
res = query_module(query)
if clone:
assert res is not query
else:
assert res is query
assert index_key in res

assert res[index_key][0] == res[index_key][1]
for i in range(1, 3):
assert res[index_key][i].item() != res[index_key][i + 1].item()


if __name__ == "__main__":
args, unknown = argparse.ArgumentParser().parse_known_args()
pytest.main([__file__, "--capture", "no", "--exitfirst"] + unknown)
2 changes: 1 addition & 1 deletion torchrl/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from .map import BinaryToDecimal, RandomProjectionHash, SipHash
from .map import BinaryToDecimal, HashToInt, QueryModule, RandomProjectionHash, SipHash
from .postprocs import MultiStep
from .replay_buffers import (
Flat2TED,
Expand Down
1 change: 1 addition & 0 deletions torchrl/data/map/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
# LICENSE file in the root directory of this source tree.

from .hash import BinaryToDecimal, RandomProjectionHash, SipHash
from .query import HashToInt, QueryModule
197 changes: 197 additions & 0 deletions torchrl/data/map/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations

from copy import deepcopy
from typing import Any, Callable, Dict, List, Mapping, TypeVar

import torch
import torch.nn as nn
from tensordict import NestedKey, TensorDictBase
from tensordict.nn.common import TensorDictModuleBase
from torchrl._utils import logger as torchrl_logger
from torchrl.data.map import SipHash

K = TypeVar("K")
V = TypeVar("V")


class HashToInt(nn.Module):
"""Converts a hash value to an integer that can be used for indexing a contiguous storage."""

def __init__(self):
super().__init__()
self._index_to_index = {}

def __call__(self, key: torch.Tensor, extend: bool = False) -> torch.Tensor:
result = []
if extend:
for _item in key.tolist():
result.append(
self._index_to_index.setdefault(_item, len(self._index_to_index))
)
else:
for _item in key.tolist():
result.append(
self._index_to_index.get(_item, len(self._index_to_index))
)
return torch.tensor(result, device=key.device, dtype=key.dtype)

def state_dict(self) -> Dict[str, torch.Tensor]:
values = torch.tensor(self._index_to_index.values())
keys = torch.tensor(self._index_to_index.keys())
return {"keys": keys, "values": values}

def load_state_dict(
self, state_dict: Mapping[str, Any], strict: bool = True, assign: bool = False
):
keys = state_dict["keys"]
values = state_dict["values"]
self._index_to_index = {
key: val for key, val in zip(keys.tolist(), values.tolist())
}


class QueryModule(TensorDictModuleBase):
"""A Module to generate compatible indices for storage.
A module that queries a storage and return required index of that storage.
Currently, it only outputs integer indices (torch.int64).
Args:
in_keys (list of NestedKeys): keys of the input tensordict that
will be used to generate the hash value.
index_key (NestedKey): the output key where the index value will be written.
Defaults to ``"_index"``.
Keyword Args:
hash_key (NestedKey): the output key where the hash value will be written.
Defaults to ``"_hash"``.
hash_module (Callable[[Any], int] or a list of these, optional): a hash
module similar to :class:`~tensordict.nn.SipHash` (default).
If a list of callables is provided, its length must equate the number of in_keys.
hash_to_int (Callable[[int], int], optional): a stateful function that
maps a hash value to a non-negative integer corresponding to an index in a
storage. Defaults to :class:`~torchrl.data.map.HashToInt`.
aggregator (Callable[[int], int], optional): a hash function to group multiple hashes
together. This argument should only be passed when there is more than one ``in_keys``.
If a single ``hash_module`` is provided but no aggregator is passed, it will take
the value of the hash_module. If no ``hash_module`` or a list of ``hash_modules`` is
provided but no aggregator is passed, it will default to ``SipHash``.
clone (bool, optional): if ``True``, a shallow clone of the input TensorDict will be
returned. This can be used to retrieve the integer index within the storage,
corresponding to a given input tensordict.
Defaults to ``False``.
d
Examples:
>>> query_module = QueryModule(
... in_keys=["key1", "key2"],
... index_key="index",
... hash_module=SipHash(),
... )
>>> query = TensorDict(
... {
... "key1": torch.Tensor([[1], [1], [1], [2]]),
... "key2": torch.Tensor([[3], [3], [2], [3]]),
... "other": torch.randn(4),
... },
... batch_size=(4,),
... )
>>> res = query_module(query)
>>> # The first two pairs of key1 and key2 match
>>> assert res["index"][0] == res["index"][1]
>>> # The last three pairs of key1 and key2 have at least one mismatching value
>>> assert res["index"][1] != res["index"][2]
>>> assert res["index"][2] != res["index"][3]
"""

def __init__(
self,
in_keys: List[NestedKey],
index_key: NestedKey = "_index",
hash_key: NestedKey = "_hash",
*,
hash_module: Callable[[Any], int] | List[Callable[[Any], int]] | None = None,
hash_to_int: Callable[[int], int] | None = None,
aggregator: Callable[[Any], int] = None,
clone: bool = False,
):
if len(in_keys) == 0:
raise ValueError("`in_keys` cannot be empty.")
in_keys = in_keys if isinstance(in_keys, List) else [in_keys]

super().__init__()
in_keys = self.in_keys = in_keys
self.out_keys = [index_key, hash_key]
index_key = self.out_keys[0]
self.hash_key = self.out_keys[1]

if aggregator is not None and len(self.in_keys) == 1:
torchrl_logger.warn(
"An aggregator was provided but there is only one in-key to be read. "
"This module will be ignored."
)
elif aggregator is None:
if hash_module is not None and not isinstance(hash_module, list):
aggregator = hash_module
else:
aggregator = SipHash()
if hash_module is None:
hash_module = [SipHash() for _ in range(len(self.in_keys))]
elif not isinstance(hash_module, list):
try:
hash_module = [
deepcopy(hash_module) if len(self.in_keys) > 1 else hash_module
for _ in range(len(self.in_keys))
]
except Exception as err:
raise RuntimeError(
"failed to deepcopy the hash module. Please provide a list of hash modules instead."
) from err
elif len(hash_module) != len(self.in_keys):
raise ValueError(
"The number of hash_modules must match the number of in_keys. "
f"Got {len(hash_module)} hash modules but {len(in_keys)} in_keys."
)
if hash_to_int is None:
hash_to_int = HashToInt()

self.aggregator = aggregator
self.hash_module = dict(zip(self.in_keys, hash_module))
self.hash_to_int = hash_to_int

self.index_key = index_key
self.clone = clone

def forward(
self,
tensordict: TensorDictBase,
extend: bool = True,
write_hash: bool = True,
) -> TensorDictBase:
hash_values = []

for k in self.in_keys:
hash_values.append(self.hash_module[k](tensordict.get(k)))
if len(self.in_keys) > 1:
hash_values = torch.stack(
hash_values,
dim=-1,
)
hash_values = self.aggregator(hash_values)
else:
hash_values = hash_values[0]

td_hash_value = self.hash_to_int(hash_values, extend=extend)

if self.clone:
output = tensordict.copy()
else:
output = tensordict

output.set(self.index_key, td_hash_value)
if write_hash:
output.set(self.hash_key, hash_values)
return output

1 comment on commit 4860674

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'CPU Benchmark Results'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 4860674 Previous: 1a4b2cc Ratio
benchmarks/test_replaybuffer_benchmark.py::test_rb_sample[TensorDictReplayBuffer-LazyMemmapStorage-RandomSampler-10000] 604.2554994519732 iter/sec (stddev: 0.03722597784803409) 2097.1285480160527 iter/sec (stddev: 0.00002054648155243566) 3.47

This comment was automatically generated by workflow using github-action-benchmark.

CC: @vmoens

Please sign in to comment.