Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/unittest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ jobs:
TYPE="${{ steps.test_type.outputs.type }}"
if [ "$TYPE" = "all" ]; then
echo "tests_run=true" >> $GITHUB_ENV
docker compose exec trinity-node-1 pytest tests -v -s --ignore=tests/data --ctrf report.json
docker compose exec trinity-node-1 pytest tests -v -s --ctrf report.json
elif [ "$TYPE" = "diff" ]; then
if [ -s ../../../test_dirs.txt ]; then
echo "tests_run=true" >> $GITHUB_ENV
TEST_DIRS=$(cat ../../../test_dirs.txt | xargs)
docker compose exec trinity-node-1 pytest $TEST_DIRS -v -s --ignore=tests/data --ctrf report.json
docker compose exec trinity-node-1 pytest $TEST_DIRS -v -s --ctrf report.json
else
echo "No changed modules detected, skipping tests."
echo "tests_run=false" >> $GITHUB_ENV
Expand All @@ -90,7 +90,7 @@ jobs:
MODULE="${{ steps.test_type.outputs.module }}"
if [ -n "$MODULE" ]; then
echo "tests_run=true" >> $GITHUB_ENV
docker compose exec trinity-node-1 pytest tests/$MODULE -v -s --ignore=tests/data --ctrf report.json
docker compose exec trinity-node-1 pytest tests/$MODULE -v -s --ctrf report.json
else
echo "No module specified, skipping tests."
echo "tests_run=false" >> $GITHUB_ENV
Expand Down
50 changes: 33 additions & 17 deletions docs/sphinx_doc/source/tutorial/trinity_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ monitor:
data_processor:
# Preprocessing data settings
...

service:
# Services to use
...

log:
# Ray actor logging
...
```

Each of these sections will be explained in detail below.
Expand Down Expand Up @@ -395,28 +403,36 @@ trainer:

---

## Data Processor Configuration
## Service Configuration

Configures preprocessing and data cleaning pipelines.
Configures services used by Trinity-RFT. Only support Data Juicer service for now.

```yaml
data_processor:
source_data_path: /PATH/TO/DATASET
load_kwargs:
split: 'train'
format:
prompt_key: 'question'
response_key: 'answer'
dj_config_path: 'tests/test_configs/active_iterator_test_dj_cfg.yaml'
clean_strategy: 'iterative'
db_url: 'postgresql://{username}@localhost:5432/{db_name}'
service:
data_juicer:
server_url: 'http://127.0.0.1:5005'
auto_start: true
port: 5005
```

- `server_url`: The url of data juicer server.
- `auto_start`: Whether to automatically start the data juicer service.
- `port`: The port for Data Juicer service when `auto_start` is true.

--

## Log Configuration

Ray actor logging configuration.

```yaml
log:
level: INFO
group_by_node: False
```

- `source_data_path`: Path to the task dataset.
- `load_kwargs`: Arguments passed to HuggingFace’s `load_dataset()`.
- `dj_config_path`: Path to Data-Juicer configuration for cleaning.
- `clean_strategy`: Strategy for iterative data cleaning.
- `db_url`: Database URL if using SQL backend.
- `level`: The logging level (supports `DEBUG`, `INFO`, `WARNING`, `ERROR`).
- `group_by_node`: Whether to group logs by node IP. If set to `True`, an actor's logs will be save to `<checkpoint_root_dir>/<project>/<name>/log/<node_ip>/<actor_name>.log`, otherwise it will be saved to `<checkpoint_root_dir>/<project>/<name>/log/<actor_name>.log`.

---

Expand Down
5 changes: 4 additions & 1 deletion tests/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
def get_template_config() -> Config:
config_path = os.path.join(os.path.dirname(__file__), "template", "config.yaml")
config = load_config(config_path)
config.ray_namespace = ray.get_runtime_context().namespace
if ray.is_initialized():
config.ray_namespace = ray.get_runtime_context().namespace
else:
config.ray_namespace = "trinity_unittest"
return config


Expand Down
44 changes: 41 additions & 3 deletions tests/trainer/trainer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
)
from trinity.cli.launcher import bench, both, explore, train
from trinity.common.config import Config, StorageConfig
from trinity.common.constants import StorageType, SyncMethod, SyncStyle
from trinity.common.constants import (
LOG_DIR_ENV_VAR,
LOG_LEVEL_ENV_VAR,
StorageType,
SyncMethod,
SyncStyle,
)
from trinity.common.models.utils import get_checkpoint_dir_with_step_num
from trinity.manager.manager import CacheManager

Expand Down Expand Up @@ -355,12 +361,28 @@ def tearDown(self):


def run_trainer(config: Config) -> None:
ray.init(namespace=config.ray_namespace)
ray.init(
namespace=config.ray_namespace,
runtime_env={
"env_vars": {
LOG_DIR_ENV_VAR: config.log.save_dir,
LOG_LEVEL_ENV_VAR: "INFO",
}
},
)
train(config)


def run_explorer(config: Config) -> None:
ray.init(namespace=config.ray_namespace)
ray.init(
namespace=config.ray_namespace,
runtime_env={
"env_vars": {
LOG_DIR_ENV_VAR: config.log.save_dir,
LOG_LEVEL_ENV_VAR: "INFO",
}
},
)
explore(config)


Expand Down Expand Up @@ -487,6 +509,22 @@ def test_fully_async_mode(self, name, use_priority_queue):
)[1],
8,
)
log_files = os.listdir(os.path.join(explorer1_config.checkpoint_job_dir, "log"))
self.assertTrue("trainer.log" in log_files)
self.assertTrue("synchronizer.log" in log_files)
self.assertTrue("explorer1.log" in log_files)
self.assertTrue("explorer2.log" in log_files)
self.assertTrue("explorer1_runner_0.log" in log_files)
self.assertTrue("explorer1_runner_7.log" in log_files)
self.assertTrue("explorer2_runner_0.log" in log_files)
self.assertTrue("explorer2_runner_7.log" in log_files)
self.assertTrue("explorer1_experience_pipeline.log" in log_files)
self.assertTrue("explorer2_experience_pipeline.log" in log_files)
files_to_check = ["trainer.log", "synchronizer.log", "explorer1.log", "explorer2.log"]
for file_name in files_to_check:
with open(os.path.join(explorer1_config.checkpoint_job_dir, "log", file_name)) as f:
lines = f.readlines()
self.assertTrue(len(lines) > 0)
ray.shutdown()

def tearDown(self):
Expand Down
166 changes: 166 additions & 0 deletions tests/utils/log_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import logging
import os
import shutil
import unittest

import ray
from ray.runtime_env import RuntimeEnv

from tests.tools import get_template_config
from trinity.common.constants import (
LOG_DIR_ENV_VAR,
LOG_LEVEL_ENV_VAR,
LOG_NODE_IP_ENV_VAR,
)
from trinity.utils.log import get_logger


def log_outside_actor(log_level=logging.INFO):
logger = get_logger("outside_actor", level=log_level)
logger.info("Outside logger initialized")
logger.debug("Outside logger initialized")


class ModuleInActor:
def __init__(self):
self.logger = get_logger("module_in_actor", in_ray_actor=True)
self.logger.info("ModuleInActor initialized")
self.logger.debug("ModuleInActor initialized")


class ModuleInActor2:
def __init__(self):
# module create in actor should automatically inherit the logger created by the root actor
self.logger = get_logger("module_in_actor2")
self.logger.info("ModuleInActor2 initialized")
self.logger.debug("ModuleInActor2 initialized")


@ray.remote
class ActorInActor:
"""An actor created inside an actor"""

def __init__(self, parent_name, log_level):
self.logger = get_logger(f"{parent_name}_nested", in_ray_actor=True, level=log_level)
self.logger.info("ActorInActor initialized")
self.logger.debug("ActorInActor initialized")


@ray.remote
class LogActor:
def __init__(self, aid: int, log_level=logging.INFO):
assert os.environ.get(LOG_DIR_ENV_VAR) is not None, "LOG_DIR_ENV_VAR must be set"
self.logger = get_logger(f"actor_{aid}", in_ray_actor=True, level=log_level)
self.logger.info(f"LogActor {aid} initialized ")
self.logger.debug(f"LogActor {aid} initialized")
self.aid = aid
self.actor = ActorInActor.remote(f"actor_{aid}", log_level)
ray.get(self.actor.__ray_ready__.remote())

def log_info(self, message: str):
self.logger.info(f"LogActor {self.aid} info: {message}")
self.logger.debug(f"LogActor {self.aid} debug: {message}")
ModuleInActor()
ModuleInActor2()


class LogTest(unittest.TestCase):
def setUp(self):
if ray.is_initialized():
ray.shutdown()
self.config = get_template_config()
self.config.check_and_update()
self.log_dir = self.config.log.save_dir
shutil.rmtree(self.log_dir, ignore_errors=True)
os.makedirs(self.log_dir, exist_ok=True)

def test_no_actor_log(self):
ray.init(
namespace=self.config.ray_namespace,
runtime_env=RuntimeEnv(
env_vars={LOG_DIR_ENV_VAR: self.log_dir, LOG_LEVEL_ENV_VAR: "INFO"}
),
)
try:
logger = get_logger("outside_actor", level=logging.DEBUG)
logger.info("Outside logger initialized")
logger.debug("Outside logger initialized")
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "outside_actor.log")))

logger = get_logger(
"outside_actor", in_ray_actor=True
) # in_ray_actor should not take effect
logger.info("Outside logger initialized")
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "outside_actor.log")))

finally:
ray.shutdown(_exiting_interpreter=True)

def test_actor_log(self):
ray.init(
namespace=self.config.ray_namespace,
runtime_env=RuntimeEnv(
env_vars={
LOG_DIR_ENV_VAR: self.log_dir,
LOG_LEVEL_ENV_VAR: "INFO",
}
),
)
try:
actor1 = LogActor.remote(1, log_level=logging.INFO)
actor2 = LogActor.remote(2, log_level=logging.DEBUG)
actor3 = LogActor.remote(3, log_level=None)
ray.get(actor1.log_info.remote("Test message"))
ray.get(actor2.log_info.remote("Test message"))
ray.get(actor3.log_info.remote("Test message"))
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_1.log")))
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_2.log")))
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_3.log")))
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_1_nested.log")))
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_2_nested.log")))
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_3_nested.log")))
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "module_in_actor.log")))
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "module_in_actor2.log")))
with open(os.path.join(self.log_dir, "actor_1.log"), "r") as f:
lines = f.readlines()
self.assertEqual(len(lines), 4)
with open(os.path.join(self.log_dir, "actor_2.log"), "r") as f:
lines = f.readlines()
self.assertEqual(len(lines), 8)
with open(os.path.join(self.log_dir, "actor_3.log"), "r") as f:
lines = f.readlines()
self.assertEqual(len(lines), 4)
with open(os.path.join(self.log_dir, "actor_1_nested.log"), "r") as f:
lines = f.readlines()
self.assertEqual(len(lines), 1)
with open(os.path.join(self.log_dir, "actor_2_nested.log"), "r") as f:
lines = f.readlines()
self.assertEqual(len(lines), 2)
with open(os.path.join(self.log_dir, "actor_3_nested.log"), "r") as f:
lines = f.readlines()
self.assertEqual(len(lines), 1)
finally:
ray.shutdown(_exiting_interpreter=True)

def test_group_by_node(self):
ray.init(
namespace=self.config.ray_namespace,
runtime_env=RuntimeEnv(
env_vars={
LOG_DIR_ENV_VAR: self.log_dir,
LOG_LEVEL_ENV_VAR: "INFO",
LOG_NODE_IP_ENV_VAR: "1",
}
),
)
try:
actor = LogActor.remote(1, log_level=logging.INFO)
ray.get(actor.log_info.remote("Test message"))
ips = os.listdir(self.config.log.save_dir)
self.assertTrue(len(ips) > 0)
for ip in ips:
self.assertTrue(os.path.isdir(os.path.join(self.config.log.save_dir, ip)))
ip_logs = os.listdir(os.path.join(self.config.log.save_dir, ip))
self.assertTrue(len(ip_logs) > 0)
finally:
ray.shutdown(_exiting_interpreter=True)
2 changes: 1 addition & 1 deletion trinity/buffer/pipelines/experience_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ExperiencePipeline:
"""

def __init__(self, config: Config):
self.logger = get_logger(__name__)
self.logger = get_logger(f"{config.explorer.name}_experience_pipeline", in_ray_actor=True)
load_plugins()
pipeline_config = config.data_processor.experience_pipeline
buffer_config = config.buffer
Expand Down
4 changes: 2 additions & 2 deletions trinity/buffer/ray_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DBWrapper:
"""

def __init__(self, storage_config: StorageConfig, config: BufferConfig) -> None:
self.logger = get_logger(__name__)
self.logger = get_logger(f"sql_{storage_config.name}")
if storage_config.path is None:
storage_config.path = default_storage_path(storage_config, config)
self.engine = create_engine(storage_config.path, poolclass=NullPool)
Expand Down Expand Up @@ -220,7 +220,7 @@ class QueueWrapper:
"""An wrapper of a async queue."""

def __init__(self, storage_config: StorageConfig, config: BufferConfig) -> None:
self.logger = get_logger(__name__)
self.logger = get_logger(f"queue_{storage_config.name}")
self.config = config
self.capacity = storage_config.capacity
self.queue = QueueBuffer.get_queue(storage_config, config)
Expand Down
3 changes: 0 additions & 3 deletions trinity/buffer/reader/queue_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
from trinity.buffer.ray_wrapper import QueueWrapper
from trinity.common.config import BufferConfig, StorageConfig
from trinity.common.constants import ReadStrategy, StorageType
from trinity.utils.log import get_logger

logger = get_logger(__name__)


class QueueReader(BufferReader):
Expand Down
3 changes: 0 additions & 3 deletions trinity/buffer/writer/queue_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
from trinity.buffer.ray_wrapper import QueueWrapper
from trinity.common.config import BufferConfig, StorageConfig
from trinity.common.constants import StorageType
from trinity.utils.log import get_logger

logger = get_logger(__name__)


class QueueWriter(BufferWriter):
Expand Down
Loading