diff --git a/.buildkite/rllib.rayci.yml b/.buildkite/rllib.rayci.yml index 6e058618b74e..b72005002c38 100644 --- a/.buildkite/rllib.rayci.yml +++ b/.buildkite/rllib.rayci.yml @@ -51,7 +51,7 @@ steps: - label: ":brain: rllib: examples" tags: rllib - parallelism: 5 + parallelism: 6 instance_type: large commands: - bazel run //ci/ray_ci:test_in_docker -- //rllib/... rllib diff --git a/.buildkite/rllib_contrib.rayci.yml b/.buildkite/rllib_contrib.rayci.yml deleted file mode 100644 index 614e396a57ad..000000000000 --- a/.buildkite/rllib_contrib.rayci.yml +++ /dev/null @@ -1,50 +0,0 @@ -group: rllib contrib tests -sort_key: "|rllib_contrib" -depends_on: - - oss-ci-base_build -steps: - - block: "run rllib contrib tests" - if: build.env("BUILDKITE_PIPELINE_ID") == "0189942e-0876-4b8f-80a4-617f988ec59b" && build.branch == "master" - - - label: ":collaborator: rllib contrib: {{matrix}} tests" - if: build.env("BUILDKITE_PIPELINE_ID") == "0189942e-0876-4b8f-80a4-617f988ec59b" - instance_type: large - tags: rllib_contrib - commands: - - ./ci/ray_ci/rllib_contrib/rllib_contrib_ci.sh test_{{matrix}} - job_env: oss-ci-base_build - matrix: - - a2c - - alpha_star - - alpha_zero - - apex_ddpg - - apex_dqn - - ars - - bandit - - ddpg - - es - - maddpg - - maml - - pg - - qmix - - r2d2 - - simple_q - - slate_q - - td3 - - - label: ":collaborator: rllib contrib: {{matrix}} tests" - if: build.env("BUILDKITE_PIPELINE_ID") == "0189942e-0876-4b8f-80a4-617f988ec59b" - soft_fail: true - instance_type: large - tags: rllib_contrib - commands: - - ./ci/ray_ci/rllib_contrib/rllib_contrib_ci.sh test_{{matrix}} - job_env: oss-ci-base_build - matrix: - - a3c - - crr - - ddppo - - dt - - leela_chess_zero - - mbmpo - - td3 diff --git a/doc/source/rllib/rllib-models.rst b/doc/source/rllib/rllib-models.rst index 717c6bb196c6..8b475081af3b 100644 --- a/doc/source/rllib/rllib-models.rst +++ b/doc/source/rllib/rllib-models.rst @@ -390,29 +390,6 @@ Take a look at this model example that does exactly that: :end-before: __sphinx_doc_end__ -**Using the Trajectory View API: Passing in the last n actions (or rewards or observations) as inputs to a custom Model** - -It is sometimes helpful for learning not only to look at the current observation -in order to calculate the next action, but also at the past n observations. -In other cases, you may want to provide the most recent rewards or actions to the model as well -(like our LSTM wrapper does if you specify: ``use_lstm=True`` and ``lstm_use_prev_action/reward=True``). -All this may even be useful when not working with partially observable environments (PO-MDPs) -and/or RNN/Attention models, as for example in classic Atari runs, where we usually use framestacking of -the last four observed images. - -The trajectory view API allows your models to specify these more complex "view requirements". - -Here is a simple (non-RNN/Attention) example of a Model that takes as input -the last 3 observations (very similar to the recommended "framestacking" for -learning in Atari environments): - -.. literalinclude:: ../../../rllib/examples/_old_api_stack/models/trajectory_view_utilizing_models.py - :language: python - :start-after: __sphinx_doc_begin__ - :end-before: __sphinx_doc_end__ - -A PyTorch version of the above model is also `given in the same file `__. - Custom Action Distributions --------------------------- diff --git a/rllib/BUILD b/rllib/BUILD index ef46f2177719..417cae83345c 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -1455,13 +1455,6 @@ py_test( srcs = ["models/tests/test_lstms.py"] ) -py_test( - name = "test_models", - tags = ["team:rllib", "models"], - size = "medium", - srcs = ["models/tests/test_models.py"] -) - py_test( name = "test_preprocessors", tags = ["team:rllib", "models"], @@ -1978,34 +1971,6 @@ py_test( # ---------------------- # Old API stack examples # ---------------------- -# subdirectory: _old_api_stack/ -py_test( - name = "examples/_old_api_stack/complex_struct_space_tf", - main = "examples/_old_api_stack/complex_struct_space.py", - tags = ["team:rllib", "exclusive", "examples", "old_api_stack"], - size = "small", - srcs = ["examples/_old_api_stack/complex_struct_space.py"], - args = ["--framework=tf"], -) - -py_test( - name = "examples/_old_api_stack/complex_struct_space_tf_eager", - main = "examples/_old_api_stack/complex_struct_space.py", - tags = ["team:rllib", "exclusive", "examples", "old_api_stack"], - size = "small", - srcs = ["examples/_old_api_stack/complex_struct_space.py"], - args = ["--framework=tf2"], -) - -py_test( - name = "examples/_old_api_stack/complex_struct_space_torch", - main = "examples/_old_api_stack/complex_struct_space.py", - tags = ["team:rllib", "exclusive", "examples", "old_api_stack"], - size = "small", - srcs = ["examples/_old_api_stack/complex_struct_space.py"], - args = ["--framework=torch"], -) - # subdirectory: _old_api_stack/connectors/ py_test( name = "examples/_old_api_stack/connectors/run_connector_policy", @@ -2024,25 +1989,6 @@ py_test( args = ["--use-lstm"], ) -py_test( - name = "examples/_old_api_stack/connectors/adapt_connector_policy", - main = "examples/_old_api_stack/connectors/adapt_connector_policy.py", - tags = ["team:rllib", "exclusive", "examples", "old_api_stack"], - size = "small", - srcs = ["examples/_old_api_stack/connectors/adapt_connector_policy.py"], -) - -# py_test( -# name = "examples/_old_api_stack/connectors/self_play_with_policy_checkpoint", -# main = "examples/_old_api_stack/connectors/self_play_with_policy_checkpoint.py", -# tags = ["team:rllib", "exclusive", "examples", "old_api_stack"], -# size = "small", -# srcs = ["examples/_old_api_stack/connectors/self_play_with_policy_checkpoint.py"], -# args = [ -# "--train_iteration=1" # Smoke test. -# ] -# ) - # ---------------------- # New API stack # Note: This includes to-be-translated-to-new-API-stack examples @@ -2631,26 +2577,6 @@ py_test( args = ["--enable-new-api-stack", "--stop-reward=150.0"] ) -#@OldAPIStack -py_test( - name = "examples/inference/policy_inference_after_training_with_attention_tf", - main = "examples/inference/policy_inference_after_training_with_attention.py", - tags = ["team:rllib", "exclusive", "examples"], - size = "medium", - srcs = ["examples/inference/policy_inference_after_training_with_attention.py"], - args = ["--stop-iters=2", "--framework=tf"] -) - -#@OldAPIStack -py_test( - name = "examples/inference/policy_inference_after_training_with_attention_torch", - main = "examples/inference/policy_inference_after_training_with_attention.py", - tags = ["team:rllib", "exclusive", "examples"], - size = "medium", - srcs = ["examples/inference/policy_inference_after_training_with_attention.py"], - args = ["--stop-iters=2", "--framework=torch"] -) - #@OldAPIStack py_test( name = "examples/inference/policy_inference_after_training_with_lstm_tf", @@ -2897,17 +2823,6 @@ py_test( # args = ["--as-test"] # ) -#@OldAPIStack -# TODO (sven): Doesn't seem to learn at the moment. Uncomment once fixed. -# py_test( -# name = "examples/offline_rl/custom_input_api_cql_torch", -# main = "examples/offline_rl/custom_input_api.py", -# tags = ["team:rllib", "exclusive", "examples"], -# size = "medium", -# srcs = ["examples/offline_rl/custom_input_api.py"], -# args = ["--as-test", "--stop-reward=-300", "--stop-iters=1"] -# ) - #@OldAPIStack py_test( name = "examples/offline_rl/offline_rl_torch_old_api_stack", diff --git a/rllib/examples/_old_api_stack/complex_struct_space.py b/rllib/examples/_old_api_stack/complex_struct_space.py deleted file mode 100644 index 1a1ecd28e122..000000000000 --- a/rllib/examples/_old_api_stack/complex_struct_space.py +++ /dev/null @@ -1,57 +0,0 @@ -# @OldAPIStack -"""Example of using variable-length Repeated or struct observation spaces. - -This example demonstrates the following: - - using a custom environment with Repeated / struct observations - - using a custom model to view the batched list observations - -For PyTorch / TF eager mode, use the `--framework=[torch|tf2]` flag. -""" - -import argparse -import os - -import ray -from ray import air, tune -from ray.rllib.algorithms.ppo import PPOConfig -from ray.rllib.models import ModelCatalog -from ray.rllib.examples.envs.classes.simple_rpg import SimpleRPG -from ray.rllib.examples._old_api_stack.models.simple_rpg_model import ( - CustomTorchRPGModel, - CustomTFRPGModel, -) -from ray.rllib.utils.metrics import NUM_ENV_STEPS_SAMPLED_LIFETIME - -parser = argparse.ArgumentParser() -parser.add_argument( - "--framework", - choices=["tf", "tf2", "torch"], - default="tf2", - help="The DL framework specifier.", -) - -if __name__ == "__main__": - ray.init() - args = parser.parse_args() - if args.framework == "torch": - ModelCatalog.register_custom_model("my_model", CustomTorchRPGModel) - else: - ModelCatalog.register_custom_model("my_model", CustomTFRPGModel) - - config = ( - PPOConfig() - .environment(SimpleRPG) - .framework(args.framework) - .env_runners(rollout_fragment_length=1, num_env_runners=0) - .training(train_batch_size=2, model={"custom_model": "my_model"}) - # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. - .resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0"))) - ) - - stop = {NUM_ENV_STEPS_SAMPLED_LIFETIME: 1} - - tuner = tune.Tuner( - "PPO", - param_space=config.to_dict(), - run_config=air.RunConfig(stop=stop, verbose=1), - ) diff --git a/rllib/examples/_old_api_stack/connectors/adapt_connector_policy.py b/rllib/examples/_old_api_stack/connectors/adapt_connector_policy.py deleted file mode 100644 index db59a49dcdbc..000000000000 --- a/rllib/examples/_old_api_stack/connectors/adapt_connector_policy.py +++ /dev/null @@ -1,134 +0,0 @@ -# @OldAPIStack -"""This example script shows how to load a connector enabled policy, -and adapt or use it with a different version of the environment. -""" - -import gymnasium as gym -import numpy as np -import os -import tempfile -from typing import Dict - -from ray.rllib.connectors.connector import ConnectorContext -from ray.rllib.connectors.action.lambdas import register_lambda_action_connector -from ray.rllib.connectors.agent.lambdas import register_lambda_agent_connector -from ray.rllib.examples._old_api_stack.connectors.prepare_checkpoint import ( - # For demo purpose only. Would normally not need this. - create_appo_cartpole_checkpoint, -) -from ray.rllib.policy.policy import Policy -from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.policy import local_policy_inference -from ray.rllib.utils.typing import ( - PolicyOutputType, - StateBatches, - TensorStructType, -) - - -# __sphinx_doc_begin__ -class MyCartPole(gym.Env): - """A mock CartPole environment. - - Gives 2 additional observation states and takes 2 discrete actions. - """ - - def __init__(self): - self._env = gym.make("CartPole-v1") - self.observation_space = gym.spaces.Box(low=-10, high=10, shape=(6,)) - self.action_space = gym.spaces.MultiDiscrete(nvec=[2, 2]) - - def step(self, actions): - # Take the first action. - action = actions[0] - obs, reward, done, truncated, info = self._env.step(action) - # Fake additional data points to the obs. - obs = np.hstack((obs, [8.0, 6.0])) - return obs, reward, done, truncated, info - - def reset(self, *, seed=None, options=None): - obs, info = self._env.reset() - return np.hstack((obs, [8.0, 6.0])), info - - -# Custom agent connector to drop the last 2 feature values. -def v2_to_v1_obs(data: Dict[str, TensorStructType]) -> Dict[str, TensorStructType]: - data[SampleBatch.NEXT_OBS] = data[SampleBatch.NEXT_OBS][:-2] - return data - - -# Agent connector that adapts observations from the new CartPole env -# into old format. -V2ToV1ObsAgentConnector = register_lambda_agent_connector( - "V2ToV1ObsAgentConnector", v2_to_v1_obs -) - - -# Custom action connector to add a placeholder action as the addtional action input. -def v1_to_v2_action( - actions: TensorStructType, states: StateBatches, fetches: Dict -) -> PolicyOutputType: - return np.hstack((actions, [0])), states, fetches - - -# Action connector that adapts action outputs from the old policy -# into new actions for the mock environment. -V1ToV2ActionConnector = register_lambda_action_connector( - "V1ToV2ActionConnector", v1_to_v2_action -) - - -def run(checkpoint_path, policy_id): - # Restore policy. - policy = Policy.from_checkpoint( - checkpoint=checkpoint_path, - policy_ids=[policy_id], - ) - - # Adapt policy trained for standard CartPole to the new env. - ctx: ConnectorContext = ConnectorContext.from_policy(policy) - - # When this policy was trained, it relied on FlattenDataAgentConnector - # to add a batch dimension to single observations. - # This is not necessary anymore, so we first remove the previously used - # FlattenDataAgentConnector. - policy.agent_connectors.remove("FlattenDataAgentConnector") - - # We then add the two adapter connectors. - policy.agent_connectors.prepend(V2ToV1ObsAgentConnector(ctx)) - policy.action_connectors.append(V1ToV2ActionConnector(ctx)) - - # Run CartPole. - env = MyCartPole() - obs, info = env.reset() - done = False - step = 0 - while not done: - step += 1 - - # Use local_policy_inference() to easily run poicy with observations. - policy_outputs = local_policy_inference(policy, "env_1", "agent_1", obs) - assert len(policy_outputs) == 1 - actions, _, _ = policy_outputs[0] - print(f"step {step}", obs, actions) - - obs, _, done, _, _ = env.step(actions) - - -# __sphinx_doc_end__ - - -if __name__ == "__main__": - with tempfile.TemporaryDirectory() as tmpdir: - policy_id = "default_policy" - - # Note, this is just for demo purpose. - # Normally, you would use a policy checkpoint from a real training run. - create_appo_cartpole_checkpoint(tmpdir) - policy_checkpoint_path = os.path.join( - tmpdir, - "policies", - policy_id, - ) - - run(policy_checkpoint_path, policy_id) diff --git a/rllib/examples/_old_api_stack/custom_keras_model.py b/rllib/examples/_old_api_stack/custom_keras_model.py deleted file mode 100644 index e3ccad874b30..000000000000 --- a/rllib/examples/_old_api_stack/custom_keras_model.py +++ /dev/null @@ -1,160 +0,0 @@ -# @OldAPIStack -"""Example of using a custom ModelV2 Keras-style model.""" - -import argparse -import os - -import ray -from ray import air, tune -from ray.rllib.algorithms.callbacks import DefaultCallbacks -from ray.rllib.algorithms.dqn.dqn import DQNConfig -from ray.rllib.algorithms.dqn.distributional_q_tf_model import DistributionalQTFModel -from ray.rllib.models import ModelCatalog -from ray.rllib.models.tf.misc import normc_initializer -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork -from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID -from ray.rllib.utils.framework import try_import_tf -from ray.rllib.utils.metrics import ( - ENV_RUNNER_RESULTS, - EPISODE_RETURN_MEAN, -) -from ray.rllib.utils.metrics.learner_info import LEARNER_INFO, LEARNER_STATS_KEY -from ray.tune.registry import get_trainable_cls - -tf1, tf, tfv = try_import_tf() - -parser = argparse.ArgumentParser() -parser.add_argument( - "--run", type=str, default="DQN", help="The RLlib-registered algorithm to use." -) -parser.add_argument("--stop", type=int, default=200) -parser.add_argument("--use-vision-network", action="store_true") -parser.add_argument("--num-cpus", type=int, default=0) - - -class MyKerasModel(TFModelV2): - """Custom model for policy gradient algorithms.""" - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - super(MyKerasModel, self).__init__( - obs_space, action_space, num_outputs, model_config, name - ) - self.inputs = tf.keras.layers.Input(shape=obs_space.shape, name="observations") - layer_1 = tf.keras.layers.Dense( - 128, - name="my_layer1", - activation=tf.nn.relu, - kernel_initializer=normc_initializer(1.0), - )(self.inputs) - layer_out = tf.keras.layers.Dense( - num_outputs, - name="my_out", - activation=None, - kernel_initializer=normc_initializer(0.01), - )(layer_1) - value_out = tf.keras.layers.Dense( - 1, - name="value_out", - activation=None, - kernel_initializer=normc_initializer(0.01), - )(layer_1) - self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out]) - - def forward(self, input_dict, state, seq_lens): - model_out, self._value_out = self.base_model(input_dict["obs"]) - return model_out, state - - def value_function(self): - return tf.reshape(self._value_out, [-1]) - - def metrics(self): - return {"foo": tf.constant(42.0)} - - -class MyKerasQModel(DistributionalQTFModel): - """Custom model for DQN.""" - - def __init__(self, obs_space, action_space, num_outputs, model_config, name, **kw): - super(MyKerasQModel, self).__init__( - obs_space, action_space, num_outputs, model_config, name, **kw - ) - - # Define the core model layers which are used by the other - # output heads of DistributionalQModel - self.inputs = tf.keras.layers.Input(shape=obs_space.shape, name="observations") - layer_1 = tf.keras.layers.Dense( - 128, - name="my_layer1", - activation=tf.nn.relu, - kernel_initializer=normc_initializer(1.0), - )(self.inputs) - layer_out = tf.keras.layers.Dense( - num_outputs, - name="my_out", - activation=tf.nn.relu, - kernel_initializer=normc_initializer(1.0), - )(layer_1) - self.base_model = tf.keras.Model(self.inputs, layer_out) - - # Implement the core forward method. - def forward(self, input_dict, state, seq_lens): - model_out = self.base_model(input_dict["obs"]) - return model_out, state - - def metrics(self): - return {"foo": tf.constant(42.0)} - - -if __name__ == "__main__": - args = parser.parse_args() - ray.init(num_cpus=args.num_cpus or None) - ModelCatalog.register_custom_model( - "keras_model", MyVisionNetwork if args.use_vision_network else MyKerasModel - ) - ModelCatalog.register_custom_model( - "keras_q_model", MyVisionNetwork if args.use_vision_network else MyKerasQModel - ) - - # Tests https://github.com/ray-project/ray/issues/7293 - class MyCallbacks(DefaultCallbacks): - def on_train_result(self, *, algorithm, result, **kwargs): - r = result["result"]["info"][LEARNER_INFO] - if DEFAULT_POLICY_ID in r: - r = r[DEFAULT_POLICY_ID].get(LEARNER_STATS_KEY, r[DEFAULT_POLICY_ID]) - assert r["model"]["foo"] == 42, result - - config = ( - get_trainable_cls(args.run) - .get_default_config() - .environment( - "ale_py:ALE/Breakout-v5" if args.use_vision_network else "CartPole-v1" - ) - .framework("tf") - .callbacks(MyCallbacks) - .training( - model={ - "custom_model": "keras_q_model" if args.run == "DQN" else "keras_model" - } - ) - # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. - .resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0"))) - ) - - if args.run == "DQN": - config = ( - DQNConfig() - .update_from_dict(config.to_dict()) - .training(num_steps_sampled_before_learning_starts=0) - ) - - stop = { - f"{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}": args.stop, - } - - tuner = tune.Tuner( - args.run, - param_space=config, - run_config=air.RunConfig(stop=stop), - ) - tuner.fit() diff --git a/rllib/examples/_old_api_stack/models/batch_norm_model.py b/rllib/examples/_old_api_stack/models/batch_norm_model.py deleted file mode 100644 index 0153bf166dec..000000000000 --- a/rllib/examples/_old_api_stack/models/batch_norm_model.py +++ /dev/null @@ -1,237 +0,0 @@ -# @OldAPIStack -import numpy as np - -from ray.rllib.models.modelv2 import ModelV2 -from ray.rllib.models.tf.misc import normc_initializer -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.models.torch.misc import ( - SlimFC, - normc_initializer as torch_normc_initializer, -) -from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 -from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.annotations import override -from ray.rllib.utils.framework import try_import_tf, try_import_torch - -tf1, tf, tfv = try_import_tf() -torch, nn = try_import_torch() - - -class KerasBatchNormModel(TFModelV2): - """Keras version of above BatchNormModel with exactly the same structure. - - IMORTANT NOTE: This model will not work with PPO due to a bug in keras - that surfaces when having more than one input placeholder (here: `inputs` - and `is_training`) AND using the `make_tf_callable` helper (e.g. used by - PPO), in which auto-placeholders are generated, then passed through the - tf.keras. models.Model. In this last step, the connection between 1) the - provided value in the auto-placeholder and 2) the keras `is_training` - Input is broken and keras complains. - Use the below `BatchNormModel` (a non-keras based TFModelV2), instead. - """ - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - super().__init__(obs_space, action_space, num_outputs, model_config, name) - inputs = tf.keras.layers.Input(shape=obs_space.shape, name="inputs") - # Have to batch the is_training flag (its batch size will always be 1). - is_training = tf.keras.layers.Input( - shape=(), dtype=tf.bool, batch_size=1, name="is_training" - ) - last_layer = inputs - hiddens = [256, 256] - for i, size in enumerate(hiddens): - label = "fc{}".format(i) - last_layer = tf.keras.layers.Dense( - units=size, - kernel_initializer=normc_initializer(1.0), - activation=tf.nn.tanh, - name=label, - )(last_layer) - # Add a batch norm layer - last_layer = tf.keras.layers.BatchNormalization()( - last_layer, training=is_training[0] - ) - output = tf.keras.layers.Dense( - units=self.num_outputs, - kernel_initializer=normc_initializer(0.01), - activation=None, - name="fc_out", - )(last_layer) - value_out = tf.keras.layers.Dense( - units=1, - kernel_initializer=normc_initializer(0.01), - activation=None, - name="value_out", - )(last_layer) - - self.base_model = tf.keras.models.Model( - inputs=[inputs, is_training], outputs=[output, value_out] - ) - - @override(ModelV2) - def forward(self, input_dict, state, seq_lens): - if isinstance(input_dict, SampleBatch): - is_training = input_dict.is_training - else: - is_training = input_dict["is_training"] - # Have to batch the is_training flag (B=1). - out, self._value_out = self.base_model( - [input_dict["obs"], tf.expand_dims(is_training, 0)] - ) - return out, [] - - @override(ModelV2) - def value_function(self): - return tf.reshape(self._value_out, [-1]) - - -class BatchNormModel(TFModelV2): - """Example of a TFModelV2 that is built w/o using tf.keras. - - NOTE: The above keras-based example model does not work with PPO (due to - a bug in keras related to missing values for input placeholders, even - though these input values have been provided in a forward pass through the - actual keras Model). - - All Model logic (layers) is defined in the `forward` method (incl. - the batch_normalization layers). Also, all variables are registered - (only once) at the end of `forward`, so an optimizer knows which tensors - to train on. A standard `value_function` override is used. - """ - - capture_index = 0 - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - super().__init__(obs_space, action_space, num_outputs, model_config, name) - # Have we registered our vars yet (see `forward`)? - self._registered = False - - @override(ModelV2) - def forward(self, input_dict, state, seq_lens): - last_layer = input_dict["obs"] - hiddens = [256, 256] - with tf1.variable_scope("model", reuse=tf1.AUTO_REUSE): - if isinstance(input_dict, SampleBatch): - is_training = input_dict.is_training - else: - is_training = input_dict["is_training"] - for i, size in enumerate(hiddens): - last_layer = tf1.layers.dense( - last_layer, - size, - kernel_initializer=normc_initializer(1.0), - activation=tf.nn.tanh, - name="fc{}".format(i), - ) - # Add a batch norm layer - last_layer = tf1.layers.batch_normalization( - last_layer, training=is_training, name="bn_{}".format(i) - ) - - output = tf1.layers.dense( - last_layer, - self.num_outputs, - kernel_initializer=normc_initializer(0.01), - activation=None, - name="out", - ) - self._value_out = tf1.layers.dense( - last_layer, - 1, - kernel_initializer=normc_initializer(1.0), - activation=None, - name="vf", - ) - - # Register variables. - # NOTE: This is not the recommended way of doing things. We would - # prefer creating keras-style Layers like it's done in the - # `KerasBatchNormModel` class above and then have TFModelV2 auto-detect - # the created vars. However, since there is a bug - # in keras/tf that prevents us from using that KerasBatchNormModel - # example (see comments above), we do variable registration the old, - # manual way for this example Model here. - if not self._registered: - # Register already auto-detected variables (from the wrapping - # Model, e.g. DQNTFModel). - self.register_variables(self.variables()) - # Then register everything we added to the graph in this `forward` - # call. - self.register_variables( - tf1.get_collection( - tf1.GraphKeys.TRAINABLE_VARIABLES, scope=".+/model/.+" - ) - ) - self._registered = True - - return output, [] - - @override(ModelV2) - def value_function(self): - return tf.reshape(self._value_out, [-1]) - - -class TorchBatchNormModel(TorchModelV2, nn.Module): - """Example of a TorchModelV2 using batch normalization.""" - - capture_index = 0 - - def __init__( - self, obs_space, action_space, num_outputs, model_config, name, **kwargs - ): - TorchModelV2.__init__( - self, obs_space, action_space, num_outputs, model_config, name - ) - nn.Module.__init__(self) - layers = [] - prev_layer_size = int(np.prod(obs_space.shape)) - self._logits = None - - # Create layers 0 to second-last. - for size in [256, 256]: - layers.append( - SlimFC( - in_size=prev_layer_size, - out_size=size, - initializer=torch_normc_initializer(1.0), - activation_fn=nn.ReLU, - ) - ) - prev_layer_size = size - # Add a batch norm layer. - layers.append(nn.BatchNorm1d(prev_layer_size)) - - self._logits = SlimFC( - in_size=prev_layer_size, - out_size=self.num_outputs, - initializer=torch_normc_initializer(0.01), - activation_fn=None, - ) - - self._value_branch = SlimFC( - in_size=prev_layer_size, - out_size=1, - initializer=torch_normc_initializer(1.0), - activation_fn=None, - ) - - self._hidden_layers = nn.Sequential(*layers) - self._hidden_out = None - - @override(ModelV2) - def forward(self, input_dict, state, seq_lens): - if isinstance(input_dict, SampleBatch): - is_training = bool(input_dict.is_training) - else: - is_training = bool(input_dict.get("is_training", False)) - # Set the correct train-mode for our hidden module (only important - # b/c we have some batch-norm layers). - self._hidden_layers.train(mode=is_training) - self._hidden_out = self._hidden_layers(input_dict["obs"]) - logits = self._logits(self._hidden_out) - return logits, [] - - @override(ModelV2) - def value_function(self): - assert self._hidden_out is not None, "must call forward first!" - return torch.reshape(self._value_branch(self._hidden_out), [-1]) diff --git a/rllib/examples/_old_api_stack/models/custom_model_api.py b/rllib/examples/_old_api_stack/models/custom_model_api.py deleted file mode 100644 index 7297faa89038..000000000000 --- a/rllib/examples/_old_api_stack/models/custom_model_api.py +++ /dev/null @@ -1,177 +0,0 @@ -# @OldAPIStack -from gymnasium.spaces import Box - -from ray.rllib.models.tf.fcnet import FullyConnectedNetwork -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.models.torch.fcnet import ( - FullyConnectedNetwork as TorchFullyConnectedNetwork, -) -from ray.rllib.models.torch.misc import SlimFC -from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 -from ray.rllib.utils.framework import try_import_tf, try_import_torch - -tf1, tf, tfv = try_import_tf() -torch, nn = try_import_torch() - - -# __sphinx_doc_model_api_1_begin__ -class DuelingQModel(TFModelV2): # or: TorchModelV2 - """A simple, hard-coded dueling head model.""" - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - # Pass num_outputs=None into super constructor (so that no action/ - # logits output layer is built). - # Alternatively, you can pass in num_outputs=[last layer size of - # config[model][fcnet_hiddens]] AND set no_last_linear=True, but - # this seems more tedious as you will have to explain users of this - # class that num_outputs is NOT the size of your Q-output layer. - super(DuelingQModel, self).__init__( - obs_space, action_space, None, model_config, name - ) - # Now: self.num_outputs contains the last layer's size, which - # we can use to construct the dueling head (see torch: SlimFC - # below). - - # Construct advantage head ... - self.A = tf.keras.layers.Dense(num_outputs) - # torch: - # self.A = SlimFC( - # in_size=self.num_outputs, out_size=num_outputs) - - # ... and value head. - self.V = tf.keras.layers.Dense(1) - # torch: - # self.V = SlimFC(in_size=self.num_outputs, out_size=1) - - def get_q_values(self, underlying_output): - # Calculate q-values following dueling logic: - v = self.V(underlying_output) # value - a = self.A(underlying_output) # advantages (per action) - advantages_mean = tf.reduce_mean(a, 1) - advantages_centered = a - tf.expand_dims(advantages_mean, 1) - return v + advantages_centered # q-values - - -# __sphinx_doc_model_api_1_end__ - - -class TorchDuelingQModel(TorchModelV2): - """A simple, hard-coded dueling head model.""" - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - # Pass num_outputs=None into super constructor (so that no action/ - # logits output layer is built). - # Alternatively, you can pass in num_outputs=[last layer size of - # config[model][fcnet_hiddens]] AND set no_last_linear=True, but - # this seems more tedious as you will have to explain users of this - # class that num_outputs is NOT the size of your Q-output layer. - nn.Module.__init__(self) - super(TorchDuelingQModel, self).__init__( - obs_space, action_space, None, model_config, name - ) - # Now: self.num_outputs contains the last layer's size, which - # we can use to construct the dueling head (see torch: SlimFC - # below). - - # Construct advantage head ... - self.A = SlimFC(in_size=self.num_outputs, out_size=num_outputs) - - # ... and value head. - self.V = SlimFC(in_size=self.num_outputs, out_size=1) - - def get_q_values(self, underlying_output): - # Calculate q-values following dueling logic: - v = self.V(underlying_output) # value - a = self.A(underlying_output) # advantages (per action) - advantages_mean = torch.mean(a, 1) - advantages_centered = a - torch.unsqueeze(advantages_mean, 1) - return v + advantages_centered # q-values - - -class ContActionQModel(TFModelV2): - """A simple, q-value-from-cont-action model (for e.g. SAC type algos).""" - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - # Pass num_outputs=None into super constructor (so that no action/ - # logits output layer is built). - # Alternatively, you can pass in num_outputs=[last layer size of - # config[model][fcnet_hiddens]] AND set no_last_linear=True, but - # this seems more tedious as you will have to explain users of this - # class that num_outputs is NOT the size of your Q-output layer. - super(ContActionQModel, self).__init__( - obs_space, action_space, None, model_config, name - ) - - # Now: self.num_outputs contains the last layer's size, which - # we can use to construct the single q-value computing head. - - # Nest an RLlib FullyConnectedNetwork (torch or tf) into this one here - # to be used for Q-value calculation. - # Use the current value of self.num_outputs, which is the wrapped - # model's output layer size. - combined_space = Box(-1.0, 1.0, (self.num_outputs + action_space.shape[0],)) - self.q_head = FullyConnectedNetwork( - combined_space, action_space, 1, model_config, "q_head" - ) - - # Missing here: Probably still have to provide action output layer - # and value layer and make sure self.num_outputs is correctly set. - - def get_single_q_value(self, underlying_output, action): - # Calculate the q-value after concating the underlying output with - # the given action. - input_ = tf.concat([underlying_output, action], axis=-1) - # Construct a simple input_dict (needed for self.q_head as it's an - # RLlib ModelV2). - input_dict = {"obs": input_} - # Ignore state outputs. - q_values, _ = self.q_head(input_dict) - return q_values - - -# __sphinx_doc_model_api_2_begin__ - - -class TorchContActionQModel(TorchModelV2): - """A simple, q-value-from-cont-action model (for e.g. SAC type algos).""" - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - nn.Module.__init__(self) - # Pass num_outputs=None into super constructor (so that no action/ - # logits output layer is built). - # Alternatively, you can pass in num_outputs=[last layer size of - # config[model][fcnet_hiddens]] AND set no_last_linear=True, but - # this seems more tedious as you will have to explain users of this - # class that num_outputs is NOT the size of your Q-output layer. - super(TorchContActionQModel, self).__init__( - obs_space, action_space, None, model_config, name - ) - - # Now: self.num_outputs contains the last layer's size, which - # we can use to construct the single q-value computing head. - - # Nest an RLlib FullyConnectedNetwork (torch or tf) into this one here - # to be used for Q-value calculation. - # Use the current value of self.num_outputs, which is the wrapped - # model's output layer size. - combined_space = Box(-1.0, 1.0, (self.num_outputs + action_space.shape[0],)) - self.q_head = TorchFullyConnectedNetwork( - combined_space, action_space, 1, model_config, "q_head" - ) - - # Missing here: Probably still have to provide action output layer - # and value layer and make sure self.num_outputs is correctly set. - - def get_single_q_value(self, underlying_output, action): - # Calculate the q-value after concating the underlying output with - # the given action. - input_ = torch.cat([underlying_output, action], dim=-1) - # Construct a simple input_dict (needed for self.q_head as it's an - # RLlib ModelV2). - input_dict = {"obs": input_} - # Ignore state outputs. - q_values, _ = self.q_head(input_dict) - return q_values - - -# __sphinx_doc_model_api_2_end__ diff --git a/rllib/examples/_old_api_stack/models/eager_model.py b/rllib/examples/_old_api_stack/models/eager_model.py deleted file mode 100644 index 1628fcda4abe..000000000000 --- a/rllib/examples/_old_api_stack/models/eager_model.py +++ /dev/null @@ -1,64 +0,0 @@ -# @OldAPIStack -import random - -from ray.rllib.models.modelv2 import ModelV2 -from ray.rllib.models.tf.fcnet import FullyConnectedNetwork -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.utils.annotations import override -from ray.rllib.utils.framework import try_import_tf - -tf1, tf, tfv = try_import_tf() - - -class EagerModel(TFModelV2): - """Example of using embedded eager execution in a custom model. - - This shows how to use tf.py_function() to execute a snippet of TF code - in eager mode. Here the `self.forward_eager` method just prints out - the intermediate tensor for debug purposes, but you can in general - perform any TF eager operation in tf.py_function(). - """ - - def __init__( - self, observation_space, action_space, num_outputs, model_config, name - ): - super().__init__( - observation_space, action_space, num_outputs, model_config, name - ) - - inputs = tf.keras.layers.Input(shape=observation_space.shape) - self.fcnet = FullyConnectedNetwork( - obs_space=self.obs_space, - action_space=self.action_space, - num_outputs=self.num_outputs, - model_config=self.model_config, - name="fc1", - ) - out, value_out = self.fcnet.base_model(inputs) - - def lambda_(x): - eager_out = tf.py_function(self.forward_eager, [x], tf.float32) - with tf1.control_dependencies([eager_out]): - eager_out.set_shape(x.shape) - return eager_out - - out = tf.keras.layers.Lambda(lambda_)(out) - self.base_model = tf.keras.models.Model(inputs, [out, value_out]) - - @override(ModelV2) - def forward(self, input_dict, state, seq_lens): - out, self._value_out = self.base_model(input_dict["obs"], state, seq_lens) - return out, [] - - @override(ModelV2) - def value_function(self): - return tf.reshape(self._value_out, [-1]) - - def forward_eager(self, feature_layer): - assert tf.executing_eagerly() - if random.random() > 0.99: - print( - "Eagerly printing the feature layer mean value", - tf.reduce_mean(feature_layer), - ) - return feature_layer diff --git a/rllib/examples/_old_api_stack/models/modelv3.py b/rllib/examples/_old_api_stack/models/modelv3.py deleted file mode 100644 index a93879510455..000000000000 --- a/rllib/examples/_old_api_stack/models/modelv3.py +++ /dev/null @@ -1,61 +0,0 @@ -# @OldAPIStack -import numpy as np - -from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.framework import try_import_tf, try_import_torch - -tf1, tf, tfv = try_import_tf() -torch, nn = try_import_torch() - - -class RNNModel(tf.keras.models.Model if tf else object): - """Example of using the Keras functional API to define an RNN model.""" - - def __init__( - self, - input_space, - action_space, - num_outputs, - *, - name="", - hiddens_size=256, - cell_size=64 - ): - super().__init__(name=name) - - self.cell_size = cell_size - - # Preprocess observation with a hidden layer and send to LSTM cell - self.dense = tf.keras.layers.Dense( - hiddens_size, activation=tf.nn.relu, name="dense1" - ) - self.lstm = tf.keras.layers.LSTM( - cell_size, return_sequences=True, return_state=True, name="lstm" - ) - - # Postprocess LSTM output with another hidden layer and compute - # values. - self.logits = tf.keras.layers.Dense( - num_outputs, activation=tf.keras.activations.linear, name="logits" - ) - self.values = tf.keras.layers.Dense(1, activation=None, name="values") - - def call(self, sample_batch): - dense_out = self.dense(sample_batch["obs"]) - B = tf.shape(sample_batch[SampleBatch.SEQ_LENS])[0] - lstm_in = tf.reshape(dense_out, [B, -1, dense_out.shape.as_list()[1]]) - lstm_out, h, c = self.lstm( - inputs=lstm_in, - mask=tf.sequence_mask(sample_batch[SampleBatch.SEQ_LENS]), - initial_state=[sample_batch["state_in_0"], sample_batch["state_in_1"]], - ) - lstm_out = tf.reshape(lstm_out, [-1, lstm_out.shape.as_list()[2]]) - logits = self.logits(lstm_out) - values = tf.reshape(self.values(lstm_out), [-1]) - return logits, [h, c], {SampleBatch.VF_PREDS: values} - - def get_initial_state(self): - return [ - np.zeros(self.cell_size, np.float32), - np.zeros(self.cell_size, np.float32), - ] diff --git a/rllib/examples/_old_api_stack/models/rnn_model.py b/rllib/examples/_old_api_stack/models/rnn_model.py deleted file mode 100644 index bdbc8b6a9c85..000000000000 --- a/rllib/examples/_old_api_stack/models/rnn_model.py +++ /dev/null @@ -1,142 +0,0 @@ -# @OldAPIStack -import numpy as np - -from ray.rllib.models.modelv2 import ModelV2 -from ray.rllib.models.preprocessors import get_preprocessor -from ray.rllib.models.tf.recurrent_net import RecurrentNetwork -from ray.rllib.models.torch.recurrent_net import RecurrentNetwork as TorchRNN -from ray.rllib.utils.annotations import override -from ray.rllib.utils.framework import try_import_tf, try_import_torch - -tf1, tf, tfv = try_import_tf() -torch, nn = try_import_torch() - - -class RNNModel(RecurrentNetwork): - """Example of using the Keras functional API to define a RNN model.""" - - def __init__( - self, - obs_space, - action_space, - num_outputs, - model_config, - name, - hiddens_size=256, - cell_size=64, - ): - super(RNNModel, self).__init__( - obs_space, action_space, num_outputs, model_config, name - ) - self.cell_size = cell_size - - # Define input layers - input_layer = tf.keras.layers.Input( - shape=(None, obs_space.shape[0]), name="inputs" - ) - state_in_h = tf.keras.layers.Input(shape=(cell_size,), name="h") - state_in_c = tf.keras.layers.Input(shape=(cell_size,), name="c") - seq_in = tf.keras.layers.Input(shape=(), name="seq_in", dtype=tf.int32) - - # Preprocess observation with a hidden layer and send to LSTM cell - dense1 = tf.keras.layers.Dense( - hiddens_size, activation=tf.nn.relu, name="dense1" - )(input_layer) - lstm_out, state_h, state_c = tf.keras.layers.LSTM( - cell_size, return_sequences=True, return_state=True, name="lstm" - )( - inputs=dense1, - mask=tf.sequence_mask(seq_in), - initial_state=[state_in_h, state_in_c], - ) - - # Postprocess LSTM output with another hidden layer and compute values - logits = tf.keras.layers.Dense( - self.num_outputs, activation=tf.keras.activations.linear, name="logits" - )(lstm_out) - values = tf.keras.layers.Dense(1, activation=None, name="values")(lstm_out) - - # Create the RNN model - self.rnn_model = tf.keras.Model( - inputs=[input_layer, seq_in, state_in_h, state_in_c], - outputs=[logits, values, state_h, state_c], - ) - self.rnn_model.summary() - - @override(RecurrentNetwork) - def forward_rnn(self, inputs, state, seq_lens): - model_out, self._value_out, h, c = self.rnn_model([inputs, seq_lens] + state) - return model_out, [h, c] - - @override(ModelV2) - def get_initial_state(self): - return [ - np.zeros(self.cell_size, np.float32), - np.zeros(self.cell_size, np.float32), - ] - - @override(ModelV2) - def value_function(self): - return tf.reshape(self._value_out, [-1]) - - -class TorchRNNModel(TorchRNN, nn.Module): - def __init__( - self, - obs_space, - action_space, - num_outputs, - model_config, - name, - fc_size=64, - lstm_state_size=256, - ): - nn.Module.__init__(self) - super().__init__(obs_space, action_space, num_outputs, model_config, name) - - self.obs_size = get_preprocessor(obs_space)(obs_space).size - self.fc_size = fc_size - self.lstm_state_size = lstm_state_size - - # Build the Module from fc + LSTM + 2xfc (action + value outs). - self.fc1 = nn.Linear(self.obs_size, self.fc_size) - self.lstm = nn.LSTM(self.fc_size, self.lstm_state_size, batch_first=True) - self.action_branch = nn.Linear(self.lstm_state_size, num_outputs) - self.value_branch = nn.Linear(self.lstm_state_size, 1) - # Holds the current "base" output (before logits layer). - self._features = None - - @override(ModelV2) - def get_initial_state(self): - # TODO: (sven): Get rid of `get_initial_state` once Trajectory - # View API is supported across all of RLlib. - # Place hidden states on same device as model. - h = [ - self.fc1.weight.new(1, self.lstm_state_size).zero_().squeeze(0), - self.fc1.weight.new(1, self.lstm_state_size).zero_().squeeze(0), - ] - return h - - @override(ModelV2) - def value_function(self): - assert self._features is not None, "must call forward() first" - return torch.reshape(self.value_branch(self._features), [-1]) - - @override(TorchRNN) - def forward_rnn(self, inputs, state, seq_lens): - """Feeds `inputs` (B x T x ..) through the Gru Unit. - - Returns the resulting outputs as a sequence (B x T x ...). - Values are stored in self._cur_value in simple (B) shape (where B - contains both the B and T dims!). - - Returns: - NN Outputs (B x T x ...) as sequence. - The state batches as a List of two items (c- and h-states). - """ - x = nn.functional.relu(self.fc1(inputs)) - self._features, [h, c] = self.lstm( - x, [torch.unsqueeze(state[0], 0), torch.unsqueeze(state[1], 0)] - ) - action_out = self.action_branch(self._features) - return action_out, [torch.squeeze(h, 0), torch.squeeze(c, 0)] diff --git a/rllib/examples/_old_api_stack/models/rnn_spy_model.py b/rllib/examples/_old_api_stack/models/rnn_spy_model.py deleted file mode 100644 index 337990a60759..000000000000 --- a/rllib/examples/_old_api_stack/models/rnn_spy_model.py +++ /dev/null @@ -1,142 +0,0 @@ -# @OldAPIStack -import numpy as np -import pickle - -import ray -from ray.rllib.models.modelv2 import ModelV2 -from ray.rllib.models.tf.misc import normc_initializer -from ray.rllib.models.tf.recurrent_net import RecurrentNetwork -from ray.rllib.utils.annotations import override -from ray.rllib.utils.framework import try_import_tf - -tf1, tf, tfv = try_import_tf() - - -class SpyLayer(tf.keras.layers.Layer): - """A keras Layer, which intercepts its inputs and stored them as pickled.""" - - output = np.array(0, dtype=np.int64) - - def __init__(self, num_outputs, **kwargs): - super().__init__(**kwargs) - - self.dense = tf.keras.layers.Dense( - units=num_outputs, kernel_initializer=normc_initializer(0.01) - ) - - def call(self, inputs, **kwargs): - """Does a forward pass through our Dense, but also intercepts inputs.""" - - del kwargs - spy_fn = tf1.py_func( - self.spy, - [ - inputs[0], # observations - inputs[2], # seq_lens - inputs[3], # h_in - inputs[4], # c_in - inputs[5], # h_out - inputs[6], # c_out - ], - tf.int64, # Must match SpyLayer.output's type. - stateful=True, - ) - - # Compute outputs - with tf1.control_dependencies([spy_fn]): - return self.dense(inputs[1]) - - @staticmethod - def spy(inputs, seq_lens, h_in, c_in, h_out, c_out): - """The actual spy operation: Store inputs in internal_kv.""" - - if len(inputs) == 1: - # don't capture inference inputs - return SpyLayer.output - # TF runs this function in an isolated context, so we have to use - # redis to communicate back to our suite - ray.experimental.internal_kv._internal_kv_put( - "rnn_spy_in_{}".format(RNNSpyModel.capture_index), - pickle.dumps( - { - "sequences": inputs, - "seq_lens": seq_lens, - "state_in": [h_in, c_in], - "state_out": [h_out, c_out], - } - ), - overwrite=True, - ) - RNNSpyModel.capture_index += 1 - return SpyLayer.output - - -class RNNSpyModel(RecurrentNetwork): - capture_index = 0 - cell_size = 3 - - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - super().__init__(obs_space, action_space, num_outputs, model_config, name) - self.cell_size = RNNSpyModel.cell_size - - # Create a keras LSTM model. - inputs = tf.keras.layers.Input(shape=(None,) + obs_space.shape, name="input") - state_in_h = tf.keras.layers.Input(shape=(self.cell_size,), name="h") - state_in_c = tf.keras.layers.Input(shape=(self.cell_size,), name="c") - seq_lens = tf.keras.layers.Input(shape=(), name="seq_lens", dtype=tf.int32) - - lstm_out, state_out_h, state_out_c = tf.keras.layers.LSTM( - self.cell_size, return_sequences=True, return_state=True, name="lstm" - )( - inputs=inputs, - mask=tf.sequence_mask(seq_lens), - initial_state=[state_in_h, state_in_c], - ) - - logits = SpyLayer(num_outputs=self.num_outputs)( - [ - inputs, - lstm_out, - seq_lens, - state_in_h, - state_in_c, - state_out_h, - state_out_c, - ] - ) - - # Value branch. - value_out = tf.keras.layers.Dense( - units=1, kernel_initializer=normc_initializer(1.0) - )(lstm_out) - - self.base_model = tf.keras.Model( - [inputs, seq_lens, state_in_h, state_in_c], - [logits, value_out, state_out_h, state_out_c], - ) - self.base_model.summary() - - @override(RecurrentNetwork) - def forward_rnn(self, inputs, state, seq_lens): - # Previously, a new class object was created during - # deserialization and this `capture_index` - # variable would be refreshed between class instantiations. - # This behavior is no longer the case, so we manually refresh - # the variable. - RNNSpyModel.capture_index = 0 - model_out, value_out, h, c = self.base_model( - [inputs, seq_lens, state[0], state[1]] - ) - self._value_out = value_out - return model_out, [h, c] - - @override(ModelV2) - def value_function(self): - return tf.reshape(self._value_out, [-1]) - - @override(ModelV2) - def get_initial_state(self): - return [ - np.zeros(self.cell_size, np.float32), - np.zeros(self.cell_size, np.float32), - ] diff --git a/rllib/examples/_old_api_stack/models/trajectory_view_utilizing_models.py b/rllib/examples/_old_api_stack/models/trajectory_view_utilizing_models.py deleted file mode 100644 index ed7e2919ede3..000000000000 --- a/rllib/examples/_old_api_stack/models/trajectory_view_utilizing_models.py +++ /dev/null @@ -1,129 +0,0 @@ -# @OldAPIStack -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.models.torch.misc import SlimFC -from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 -from ray.rllib.policy.view_requirement import ViewRequirement -from ray.rllib.utils.framework import try_import_tf, try_import_torch -from ray.rllib.utils.tf_utils import one_hot -from ray.rllib.utils.torch_utils import one_hot as torch_one_hot - -tf1, tf, tfv = try_import_tf() -torch, nn = try_import_torch() - -# __sphinx_doc_begin__ - - -class FrameStackingCartPoleModel(TFModelV2): - """A simple FC model that takes the last n observations as input.""" - - def __init__( - self, obs_space, action_space, num_outputs, model_config, name, num_frames=3 - ): - super(FrameStackingCartPoleModel, self).__init__( - obs_space, action_space, None, model_config, name - ) - - self.num_frames = num_frames - self.num_outputs = num_outputs - - # Construct actual (very simple) FC model. - assert len(obs_space.shape) == 1 - obs = tf.keras.layers.Input(shape=(self.num_frames, obs_space.shape[0])) - obs_reshaped = tf.keras.layers.Reshape([obs_space.shape[0] * self.num_frames])( - obs - ) - rewards = tf.keras.layers.Input(shape=(self.num_frames)) - rewards_reshaped = tf.keras.layers.Reshape([self.num_frames])(rewards) - actions = tf.keras.layers.Input(shape=(self.num_frames, self.action_space.n)) - actions_reshaped = tf.keras.layers.Reshape([action_space.n * self.num_frames])( - actions - ) - input_ = tf.keras.layers.Concatenate(axis=-1)( - [obs_reshaped, actions_reshaped, rewards_reshaped] - ) - layer1 = tf.keras.layers.Dense(256, activation=tf.nn.relu)(input_) - layer2 = tf.keras.layers.Dense(256, activation=tf.nn.relu)(layer1) - out = tf.keras.layers.Dense(self.num_outputs)(layer2) - values = tf.keras.layers.Dense(1)(layer1) - self.base_model = tf.keras.models.Model([obs, actions, rewards], [out, values]) - self._last_value = None - - self.view_requirements["prev_n_obs"] = ViewRequirement( - data_col="obs", shift="-{}:0".format(num_frames - 1), space=obs_space - ) - self.view_requirements["prev_n_rewards"] = ViewRequirement( - data_col="rewards", shift="-{}:-1".format(self.num_frames) - ) - self.view_requirements["prev_n_actions"] = ViewRequirement( - data_col="actions", - shift="-{}:-1".format(self.num_frames), - space=self.action_space, - ) - - def forward(self, input_dict, states, seq_lens): - obs = tf.cast(input_dict["prev_n_obs"], tf.float32) - rewards = tf.cast(input_dict["prev_n_rewards"], tf.float32) - actions = one_hot(input_dict["prev_n_actions"], self.action_space) - out, self._last_value = self.base_model([obs, actions, rewards]) - return out, [] - - def value_function(self): - return tf.squeeze(self._last_value, -1) - - -# __sphinx_doc_end__ - - -class TorchFrameStackingCartPoleModel(TorchModelV2, nn.Module): - """A simple FC model that takes the last n observations as input.""" - - def __init__( - self, obs_space, action_space, num_outputs, model_config, name, num_frames=3 - ): - nn.Module.__init__(self) - super(TorchFrameStackingCartPoleModel, self).__init__( - obs_space, action_space, None, model_config, name - ) - - self.num_frames = num_frames - self.num_outputs = num_outputs - - # Construct actual (very simple) FC model. - assert len(obs_space.shape) == 1 - in_size = self.num_frames * (obs_space.shape[0] + action_space.n + 1) - self.layer1 = SlimFC(in_size=in_size, out_size=256, activation_fn="relu") - self.layer2 = SlimFC(in_size=256, out_size=256, activation_fn="relu") - self.out = SlimFC( - in_size=256, out_size=self.num_outputs, activation_fn="linear" - ) - self.values = SlimFC(in_size=256, out_size=1, activation_fn="linear") - - self._last_value = None - - self.view_requirements["prev_n_obs"] = ViewRequirement( - data_col="obs", shift="-{}:0".format(num_frames - 1), space=obs_space - ) - self.view_requirements["prev_n_rewards"] = ViewRequirement( - data_col="rewards", shift="-{}:-1".format(self.num_frames) - ) - self.view_requirements["prev_n_actions"] = ViewRequirement( - data_col="actions", - shift="-{}:-1".format(self.num_frames), - space=self.action_space, - ) - - def forward(self, input_dict, states, seq_lens): - obs = input_dict["prev_n_obs"] - obs = torch.reshape(obs, [-1, self.obs_space.shape[0] * self.num_frames]) - rewards = torch.reshape(input_dict["prev_n_rewards"], [-1, self.num_frames]) - actions = torch_one_hot(input_dict["prev_n_actions"], self.action_space) - actions = torch.reshape(actions, [-1, self.num_frames * actions.shape[-1]]) - input_ = torch.cat([obs, actions, rewards], dim=-1) - features = self.layer1(input_) - features = self.layer2(features) - out = self.out(features) - self._last_value = self.values(features) - return out, [] - - def value_function(self): - return torch.squeeze(self._last_value, -1) diff --git a/rllib/examples/_old_api_stack/policy/episode_env_aware_policy.py b/rllib/examples/_old_api_stack/policy/episode_env_aware_policy.py deleted file mode 100644 index f33197eed96c..000000000000 --- a/rllib/examples/_old_api_stack/policy/episode_env_aware_policy.py +++ /dev/null @@ -1,134 +0,0 @@ -# @OldAPIStack -import numpy as np -from gymnasium.spaces import Box - -from ray.rllib.core.columns import Columns -from ray.rllib.examples._old_api_stack.policy.random_policy import RandomPolicy -from ray.rllib.examples.rl_modules.classes.random_rlm import StatefulRandomRLModule -from ray.rllib.policy.policy import Policy -from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.policy.view_requirement import ViewRequirement -from ray.rllib.utils.annotations import override - - -class StatefulRandomPolicy(RandomPolicy): - """A Policy that has acts randomly and has stateful view requirements.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.model = StatefulRandomRLModule( - action_space=self.action_space, - model_config={ - "max_seq_len": 50, - "lstm_use_prev_action": False, - "lstm_use_prev_reward": False, - }, - ) - - self.view_requirements = self.model.update_default_view_requirements( - self.view_requirements - ) - - @override(Policy) - def is_recurrent(self): - return True - - @override(Policy) - def postprocess_trajectory( - self, sample_batch, other_agent_batches=None, episode=None - ): - sample_batch["2xobs"] = sample_batch["obs"] * 2.0 - return sample_batch - - @override(Policy) - def compute_actions_from_input_dict(self, input_dict, *args, **kwargs): - fwd_out = self.model.forward_exploration(input_dict) - actions = fwd_out[SampleBatch.ACTIONS] - state_out = fwd_out[Columns.STATE_OUT] - return actions, state_out, {} - - -class EpisodeEnvAwareAttentionPolicy(RandomPolicy): - """A Policy that always knows the current EpisodeID and EnvID and - returns these in its actions.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.state_space = Box(-1.0, 1.0, (1,)) - self.config["model"] = {"max_seq_len": 50} - - class _fake_model: - def __init__(self, state_space, config): - self.state_space = state_space - self.view_requirements = { - SampleBatch.AGENT_INDEX: ViewRequirement(), - SampleBatch.EPS_ID: ViewRequirement(), - "env_id": ViewRequirement(), - "t": ViewRequirement(), - SampleBatch.OBS: ViewRequirement(), - "state_in_0": ViewRequirement( - "state_out_0", - # Provide state outs -50 to -1 as "state-in". - shift="-50:-1", - # Repeat the incoming state every n time steps (usually max seq - # len). - batch_repeat_value=config["model"]["max_seq_len"], - space=state_space, - ), - "state_out_0": ViewRequirement( - space=state_space, used_for_compute_actions=False - ), - } - - def compile(self, *args, **kwargs): - """Dummy method for compatibility with TorchRLModule. - - This is hit when RolloutWorker tries to compile TorchRLModule.""" - pass - - # We need to provide at least an initial state if we want agent collector - # to build episodes with state_in_ and state_out_ - def get_initial_state(self): - return [self.state_space.sample()] - - self.model = _fake_model(self.state_space, self.config) - - self.view_requirements = dict( - super()._get_default_view_requirements(), **self.model.view_requirements - ) - - @override(Policy) - def get_initial_state(self): - return self.model.get_initial_state() - - @override(Policy) - def is_recurrent(self): - return True - - @override(Policy) - def compute_actions_from_input_dict( - self, input_dict, explore=None, timestep=None, **kwargs - ): - ts = input_dict["t"] - print(ts) - # Always return [episodeID, envID] as actions. - actions = np.array( - [ - [ - input_dict[SampleBatch.AGENT_INDEX][i], - input_dict[SampleBatch.EPS_ID][i], - input_dict["env_id"][i], - ] - for i, _ in enumerate(input_dict["obs"]) - ] - ) - states = [np.array([[ts[i]] for i in range(len(input_dict["obs"]))])] - self.global_timestep += 1 - return actions, states, {} - - @override(Policy) - def postprocess_trajectory( - self, sample_batch, other_agent_batches=None, episode=None - ): - sample_batch["3xobs"] = sample_batch["obs"] * 3.0 - return sample_batch diff --git a/rllib/examples/_old_api_stack/policy/memory_leaking_policy.py b/rllib/examples/_old_api_stack/policy/memory_leaking_policy.py deleted file mode 100644 index 3a5fa13ed509..000000000000 --- a/rllib/examples/_old_api_stack/policy/memory_leaking_policy.py +++ /dev/null @@ -1,60 +0,0 @@ -# @OldAPIStack -import gymnasium as gym - -from ray.rllib.examples._old_api_stack.policy.random_policy import RandomPolicy -from ray.rllib.utils.annotations import override -from ray.rllib.utils.typing import AlgorithmConfigDict - - -class MemoryLeakingPolicy(RandomPolicy): - """A Policy that leaks very little memory. - - Useful for proving that our memory-leak tests can catch the - slightest leaks. - """ - - def __init__( - self, - observation_space: gym.Space, - action_space: gym.Space, - config: AlgorithmConfigDict, - ): - super().__init__(observation_space, action_space, config) - self._leakage_size = config.get("leakage_size", "small") - self._leak = [] - - @override(RandomPolicy) - def compute_actions(self, *args, **kwargs): - # Leak. - if self._leakage_size == "small": - self._leak.append(1.5) - else: - self._leak.append([1.5] * 100) - return super().compute_actions(*args, **kwargs) - - @override(RandomPolicy) - def compute_actions_from_input_dict(self, *args, **kwargs): - # Leak. - if self._leakage_size == "small": - self._leak.append(1) - else: - self._leak.append([1] * 100) - return super().compute_actions_from_input_dict(*args, **kwargs) - - @override(RandomPolicy) - def learn_on_batch(self, samples): - # Leak. - if self._leakage_size == "small": - self._leak.append(False) - else: - self._leak.append([False] * 100) - return super().learn_on_batch(samples) - - @override(RandomPolicy) - def compute_log_likelihoods(self, *args, **kwargs): - # Leak. - if self._leakage_size == "small": - self._leak.append("test") - else: - self._leak.append(["test"] * 100) - return super().compute_log_likelihoods(*args, **kwargs) diff --git a/rllib/examples/_old_api_stack/policy/rock_paper_scissors_dummies.py b/rllib/examples/_old_api_stack/policy/rock_paper_scissors_dummies.py deleted file mode 100644 index dbaa0e401038..000000000000 --- a/rllib/examples/_old_api_stack/policy/rock_paper_scissors_dummies.py +++ /dev/null @@ -1,96 +0,0 @@ -# @OldAPIStack -import gymnasium as gym -import numpy as np -import random -from ray.rllib.policy.policy import Policy -from ray.rllib.policy.view_requirement import ViewRequirement - -ROCK = 0 -PAPER = 1 -SCISSORS = 2 - - -class AlwaysSameHeuristic(Policy): - """Pick a random move and stick with it for the entire episode.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.exploration = self._create_exploration() - self.view_requirements.update( - { - "state_in_0": ViewRequirement( - "state_out_0", - shift=-1, - space=gym.spaces.Box(ROCK, SCISSORS, shape=(1,), dtype=np.int32), - ) - } - ) - - def get_initial_state(self): - return [random.choice([ROCK, PAPER, SCISSORS])] - - def is_recurrent(self) -> bool: - return True - - def compute_actions( - self, - obs_batch, - state_batches=None, - prev_action_batch=None, - prev_reward_batch=None, - info_batch=None, - episodes=None, - **kwargs - ): - return [state_batches[0][0] for x in obs_batch], state_batches, {} - - -class BeatLastHeuristic(Policy): - """Play the move that would beat the last move of the opponent.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.exploration = self._create_exploration() - - def compute_actions( - self, - obs_batch, - state_batches=None, - prev_action_batch=None, - prev_reward_batch=None, - info_batch=None, - episodes=None, - **kwargs - ): - def successor(x): - # Make this also work w/o one-hot preprocessing. - if isinstance(self.observation_space, gym.spaces.Discrete): - if x == ROCK: - return PAPER - elif x == PAPER: - return SCISSORS - elif x == SCISSORS: - return ROCK - else: - return random.choice([ROCK, PAPER, SCISSORS]) - # One-hot (auto-preprocessed) inputs. - else: - if x[ROCK] == 1: - return PAPER - elif x[PAPER] == 1: - return SCISSORS - elif x[SCISSORS] == 1: - return ROCK - elif x[-1] == 1: - return random.choice([ROCK, PAPER, SCISSORS]) - - return [successor(x) for x in obs_batch], [], {} - - def learn_on_batch(self, samples): - pass - - def get_weights(self): - pass - - def set_weights(self, weights): - pass diff --git a/rllib/examples/_old_api_stack/remote_envs_with_inference_done_on_main_node.py b/rllib/examples/_old_api_stack/remote_envs_with_inference_done_on_main_node.py deleted file mode 100644 index eac4adbc6064..000000000000 --- a/rllib/examples/_old_api_stack/remote_envs_with_inference_done_on_main_node.py +++ /dev/null @@ -1,186 +0,0 @@ -# @OldAPIStack -""" -This script specifies n (vectorized) envs -as Ray remote (actors), such that stepping through these occurs in parallel. -Also, actions for each env step are calculated on the "main" node. - -This behavior can be useful if the "main" node is a GPU machine and you would like to -speed up batched action calculations, similar to DeepMind's SEED -architecture, described here: - -https://ai.googleblog.com/2020/03/massively-scaling-reinforcement.html -""" -import argparse -import os -from typing import Union - -import ray -from ray import air, tune -from ray.air.constants import TRAINING_ITERATION -from ray.rllib.algorithms.ppo import PPO, PPOConfig -from ray.rllib.algorithms.algorithm import Algorithm -from ray.rllib.algorithms.algorithm_config import AlgorithmConfig -from ray.rllib.utils.annotations import override -from ray.rllib.utils.metrics import ( - ENV_RUNNER_RESULTS, - EPISODE_RETURN_MEAN, - NUM_ENV_STEPS_SAMPLED_LIFETIME, -) -from ray.rllib.utils.test_utils import check_learning_achieved -from ray.rllib.utils.typing import PartialAlgorithmConfigDict -from ray.tune import PlacementGroupFactory -from ray.tune.logger import pretty_print - - -def get_cli_args(): - """Create CLI parser and return parsed arguments""" - parser = argparse.ArgumentParser() - - # example-specific args - # This should be >1, otherwise, remote envs make no sense. - parser.add_argument("--num-envs-per-env-runner", type=int, default=4) - - # general args - parser.add_argument( - "--framework", - choices=["tf", "tf2", "torch"], - default="torch", - help="The DL framework specifier.", - ) - parser.add_argument( - "--as-test", - action="store_true", - help="Whether this script should be run as a test: --stop-reward must " - "be achieved within --stop-timesteps AND --stop-iters.", - ) - parser.add_argument( - "--stop-iters", type=int, default=50, help="Number of iterations to train." - ) - parser.add_argument( - "--stop-timesteps", - type=int, - default=100000, - help="Number of timesteps to train.", - ) - parser.add_argument( - "--stop-reward", - type=float, - default=150.0, - help="Reward at which we stop training.", - ) - parser.add_argument( - "--no-tune", - action="store_true", - help="Run without Tune using a manual train loop instead. Here," - "there is no TensorBoard support.", - ) - parser.add_argument( - "--local-mode", - action="store_true", - help="Init Ray in local mode for easier debugging.", - ) - - args = parser.parse_args() - print(f"Running with following CLI args: {args}") - return args - - -# The modified Algorithm class we use: -# Subclassing from PPO, our algo only modifies `default_resource_request`, -# telling Ray Tune that it's ok (not mandatory) to place our n remote envs on a -# different node (each env using 1 CPU). -class PPORemoteInference(PPO): - @classmethod - @override(Algorithm) - def default_resource_request( - cls, - config: Union[AlgorithmConfig, PartialAlgorithmConfigDict], - ): - if isinstance(config, AlgorithmConfig): - cf = config - else: - cf = cls.get_default_config().update_from_dict(config) - - # Return PlacementGroupFactory containing all needed resources - # (already properly defined as device bundles). - return PlacementGroupFactory( - bundles=[ - { - # Single CPU for the local worker. This CPU hosts the - # main model in this example (num_env_runners=0). - "CPU": 1, - # Possibly add n GPUs to this. - "GPU": cf.num_gpus, - }, - { - # Different bundle (meaning: possibly different node) - # for your n "remote" envs (set remote_worker_envs=True). - "CPU": cf.num_envs_per_env_runner, - }, - ], - strategy=cf.placement_strategy, - ) - - -if __name__ == "__main__": - args = get_cli_args() - - ray.init(num_cpus=6, local_mode=args.local_mode) - - config = ( - PPOConfig() - .environment("CartPole-v1") - .framework(args.framework) - .env_runners( - # Force sub-envs to be ray.actor.ActorHandles, so we can step - # through them in parallel. - remote_worker_envs=True, - num_envs_per_env_runner=args.num_envs_per_env_runner, - # Use a single worker (however, with n parallelized remote envs, maybe - # even running on another node). - # Action computations occur on the "main" (GPU?) node, while - # the envs run on one or more CPU node(s). - num_env_runners=0, - ) - .resources( - # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. - num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0")), - # Set the number of CPUs used by the (local) worker, aka "driver" - # to match the number of Ray remote envs. - num_cpus_for_main_process=args.num_envs_per_env_runner + 1, - ) - ) - - # Run as manual training loop. - if args.no_tune: - # manual training loop using PPO and manually keeping track of state - algo = PPORemoteInference(config=config) - # run manual training loop and print results after each iteration - for _ in range(args.stop_iters): - result = algo.train() - print(pretty_print(result)) - # Stop training if the target train steps or reward are reached. - if ( - result[f"{NUM_ENV_STEPS_SAMPLED_LIFETIME}"] >= args.stop_timesteps - or result[ENV_RUNNER_RESULTS][EPISODE_RETURN_MEAN] >= args.stop_reward - ): - break - - # Run with Tune for auto env and algorithm creation and TensorBoard. - else: - stop = { - TRAINING_ITERATION: args.stop_iters, - NUM_ENV_STEPS_SAMPLED_LIFETIME: args.stop_timesteps, - f"{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}": args.stop_reward, - } - - results = tune.Tuner( - PPORemoteInference, - param_space=config, - run_config=air.RunConfig(stop=stop, verbose=1), - ).fit() - - if args.as_test: - check_learning_achieved(results, args.stop_reward) - - ray.shutdown() diff --git a/rllib/examples/_old_api_stack/sb2rllib_rllib_example.py b/rllib/examples/_old_api_stack/sb2rllib_rllib_example.py deleted file mode 100644 index 28b5ddd830b9..000000000000 --- a/rllib/examples/_old_api_stack/sb2rllib_rllib_example.py +++ /dev/null @@ -1,52 +0,0 @@ -# @OldAPIStack -""" -Example script that trains, saves, loads, and tests an RLlib agent. -Equivalent script with stable baselines: sb2rllib_sb_example.py. -Demonstrates transition from stable_baselines to Ray RLlib. - -Run example: python sb2rllib_rllib_example.py -""" -import gymnasium as gym -from ray import tune, air -import ray.rllib.algorithms.ppo as ppo -from ray.rllib.utils.metrics import NUM_ENV_STEPS_SAMPLED_LIFETIME - -# settings used for both stable baselines and rllib -env_name = "CartPole-v1" -train_steps = 10000 -learning_rate = 1e-3 -save_dir = "saved_models" - -# training and saving -analysis = tune.Tuner( - "PPO", - run_config=air.RunConfig( - stop={NUM_ENV_STEPS_SAMPLED_LIFETIME: train_steps}, - local_dir=save_dir, - checkpoint_config=air.CheckpointConfig( - checkpoint_at_end=True, - ), - ), - param_space={"env": env_name, "lr": learning_rate}, -).fit() -# retrieve the checkpoint path -analysis.default_metric = "episode_return_mean" -analysis.default_mode = "max" -checkpoint_path = analysis.get_best_checkpoint(trial=analysis.get_best_trial()) -print(f"Trained model saved at {checkpoint_path}") - -# load and restore model -agent = ppo.PPO(env=env_name) -agent.restore(checkpoint_path) -print(f"Agent loaded from saved model at {checkpoint_path}") - -# inference -env = gym.make(env_name) -obs, info = env.reset() -for i in range(1000): - action = agent.compute_single_action(obs) - obs, reward, terminated, truncated, info = env.step(action) - env.render() - if terminated or truncated: - print(f"Cart pole ended after {i} steps.") - break diff --git a/rllib/examples/_old_api_stack/sb2rllib_sb_example.py b/rllib/examples/_old_api_stack/sb2rllib_sb_example.py deleted file mode 100644 index 8e3686074935..000000000000 --- a/rllib/examples/_old_api_stack/sb2rllib_sb_example.py +++ /dev/null @@ -1,41 +0,0 @@ -# @OldAPIStack -""" -Example script that trains, saves, loads, and tests a stable baselines 2 agent. -Code taken and adjusted from SB2 docs: -https://stable-baselines.readthedocs.io/en/master/guide/quickstart.html -Equivalent script with RLlib: sb2rllib_rllib_example.py -""" -import gymnasium as gym - -from stable_baselines.common.policies import MlpPolicy -from stable_baselines import PPO2 - -# settings used for both stable baselines and rllib -env_name = "CartPole-v1" -train_steps = 10000 -learning_rate = 1e-3 -save_dir = "saved_models" - -save_path = f"{save_dir}/sb_model_{train_steps}steps" -env = gym.make(env_name) - -# training and saving -model = PPO2(MlpPolicy, env, learning_rate=learning_rate, verbose=1) -model.learn(total_timesteps=train_steps) -model.save(save_path) -print(f"Trained model saved at {save_path}") - -# delete and load model (just for illustration) -del model -model = PPO2.load(save_path) -print(f"Agent loaded from saved model at {save_path}") - -# inference -obs, info = env.reset() -for i in range(1000): - action, _states = model.predict(obs) - obs, reward, terminated, truncated, info = env.step(action) - env.render() - if terminated or truncated: - print(f"Cart pole ended after {i} steps.") - break diff --git a/rllib/models/tests/test_models.py b/rllib/models/tests/test_models.py deleted file mode 100644 index 8ba77be666e1..000000000000 --- a/rllib/models/tests/test_models.py +++ /dev/null @@ -1,93 +0,0 @@ -from gymnasium.spaces import Box -import numpy as np -import unittest - -import ray -import ray.rllib.algorithms.ppo as ppo -from ray.rllib.examples._old_api_stack.models.modelv3 import RNNModel -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.models.tf.fcnet import FullyConnectedNetwork -from ray.rllib.utils.framework import try_import_tf - -tf1, tf, tfv = try_import_tf() - - -class TestTFModel(TFModelV2): - def __init__(self, obs_space, action_space, num_outputs, model_config, name): - super().__init__(obs_space, action_space, num_outputs, model_config, name) - input_ = tf.keras.layers.Input(shape=(3,)) - output = tf.keras.layers.Dense(2)(input_) - # A keras model inside. - self.keras_model = tf.keras.models.Model([input_], [output]) - # A RLlib FullyConnectedNetwork (tf) inside (which is also a keras - # Model). - self.fc_net = FullyConnectedNetwork(obs_space, action_space, 3, {}, "fc1") - - def forward(self, input_dict, state, seq_lens): - obs = input_dict["obs_flat"] - out1 = self.keras_model(obs) - out2, _ = self.fc_net({"obs": obs}) - return tf.concat([out1, out2], axis=-1), [] - - -class TestModels(unittest.TestCase): - """Tests ModelV2 classes and their modularization capabilities.""" - - @classmethod - def setUpClass(cls) -> None: - ray.init() - - @classmethod - def tearDownClass(cls) -> None: - ray.shutdown() - - def test_tf_modelv2(self): - obs_space = Box(-1.0, 1.0, (3,)) - action_space = Box(-1.0, 1.0, (2,)) - my_tf_model = TestTFModel(obs_space, action_space, 5, {}, "my_tf_model") - # Call the model. - out, states = my_tf_model({"obs": np.array([obs_space.sample()])}) - self.assertTrue(out.shape == (1, 5)) - self.assertTrue(out.dtype == tf.float32) - self.assertTrue(states == []) - vars = my_tf_model.variables(as_dict=True) - self.assertTrue(len(vars) == 6) - self.assertTrue("keras_model.dense.kernel:0" in vars) - self.assertTrue("keras_model.dense.bias:0" in vars) - self.assertTrue("fc_net.base_model.fc_out.kernel:0" in vars) - self.assertTrue("fc_net.base_model.fc_out.bias:0" in vars) - self.assertTrue("fc_net.base_model.value_out.kernel:0" in vars) - self.assertTrue("fc_net.base_model.value_out.bias:0" in vars) - - def test_modelv3(self): - config = ( - ppo.PPOConfig() - .api_stack( - enable_env_runner_and_connector_v2=False, - enable_rl_module_and_learner=False, - ) - .environment("CartPole-v1") - .framework("tf") - .env_runners(num_env_runners=0) - .training( - model={ - "custom_model": RNNModel, - "custom_model_config": { - "hiddens_size": 64, - "cell_size": 128, - }, - } - ) - ) - algo = config.build() - for _ in range(2): - results = algo.train() - print(results) - algo.stop() - - -if __name__ == "__main__": - import pytest - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/tests/test_lstm.py b/rllib/tests/test_lstm.py index 9481205f9291..66ceda0b4f2b 100644 --- a/rllib/tests/test_lstm.py +++ b/rllib/tests/test_lstm.py @@ -1,16 +1,8 @@ import numpy as np -import pickle import unittest -import ray -from ray.rllib.algorithms.ppo import PPOConfig -from ray.rllib.examples.envs.classes.debug_counter_env import DebugCounterEnv -from ray.rllib.examples._old_api_stack.models.rnn_spy_model import RNNSpyModel -from ray.rllib.models import ModelCatalog from ray.rllib.policy.rnn_sequencing import chop_into_sequences -from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.test_utils import check -from ray.tune.registry import register_env class TestLSTMUtils(unittest.TestCase): @@ -166,101 +158,6 @@ def test_dynamic_max_len(self): self.assertEqual(seq_lens.tolist(), [1, 2]) -class TestRNNSequencing(unittest.TestCase): - def setUp(self) -> None: - ray.init(num_cpus=4) - - def tearDown(self) -> None: - ray.shutdown() - - def test_minibatch_sequencing(self): - ModelCatalog.register_custom_model("rnn", RNNSpyModel) - register_env("counter", lambda _: DebugCounterEnv()) - config = ( - PPOConfig() - .api_stack( - enable_env_runner_and_connector_v2=False, - enable_rl_module_and_learner=False, - ) - .environment("counter") - .framework("tf") - .env_runners(num_env_runners=0, rollout_fragment_length=20) - .training( - train_batch_size=20, - minibatch_size=10, - num_epochs=1, - model={ - "custom_model": "rnn", - "max_seq_len": 4, - "vf_share_layers": True, - }, - ) - ) - ppo = config.build() - ppo.train() - ppo.train() - ppo.stop() - - # first epoch: 20 observations get split into 2 minibatches of 8 - # four observations are discarded - batch0 = pickle.loads( - ray.experimental.internal_kv._internal_kv_get("rnn_spy_in_0") - ) - batch1 = pickle.loads( - ray.experimental.internal_kv._internal_kv_get("rnn_spy_in_1") - ) - if batch0["sequences"][0][0][0] > batch1["sequences"][0][0][0]: - batch0, batch1 = batch1, batch0 # sort minibatches - self.assertEqual(batch0[SampleBatch.SEQ_LENS].tolist(), [4, 4, 2]) - self.assertEqual(batch1[SampleBatch.SEQ_LENS].tolist(), [2, 3, 4, 1]) - check( - batch0["sequences"], - [ - [[0], [1], [2], [3]], - [[4], [5], [6], [7]], - [[8], [9], [0], [0]], - ], - ) - check( - batch1["sequences"], - [ - [[10], [11], [0], [0]], - [[12], [13], [14], [0]], - [[0], [1], [2], [3]], - [[4], [0], [0], [0]], - ], - ) - - # second epoch: 20 observations get split into 2 minibatches of 8 - # four observations are discarded - batch2 = pickle.loads( - ray.experimental.internal_kv._internal_kv_get("rnn_spy_in_2") - ) - batch3 = pickle.loads( - ray.experimental.internal_kv._internal_kv_get("rnn_spy_in_3") - ) - if batch2["sequences"][0][0][0] > batch3["sequences"][0][0][0]: - batch2, batch3 = batch3, batch2 - self.assertEqual(batch2[SampleBatch.SEQ_LENS].tolist(), [4, 4, 2]) - self.assertEqual(batch3[SampleBatch.SEQ_LENS].tolist(), [4, 4, 2]) - check( - batch2["sequences"], - [ - [[0], [1], [2], [3]], - [[4], [5], [6], [7]], - [[8], [9], [0], [0]], - ], - ) - check( - batch3["sequences"], - [ - [[5], [6], [7], [8]], - [[9], [10], [11], [12]], - [[13], [14], [0], [0]], - ], - ) - - if __name__ == "__main__": import pytest import sys