Skip to content

Commit

Permalink
Add per-step token utilization to tensorboard and progress tracker. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinxzhao authored Jan 12, 2024
1 parent a325bd8 commit b0795e7
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 43 deletions.
Empty file added ludwig/accounting/__init__.py
Empty file.
55 changes: 55 additions & 0 deletions ludwig/accounting/used_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import Dict, Union

import torch


def get_used_tokens_for_gbm(inputs: Union[torch.Tensor, Dict[str, torch.Tensor]]) -> int:
"""Returns the number of used tokens for a GBM model.
The number of used tokens is:
1. the size of the input tensor, which corresponds to 1 token for each input feature
(binary, category, number) in the batch.
2. batch_size, which corresponds to 1 token for the batch of target features.
Args:
inputs: The input tensors that are fed to the gbm.forward() method.
"""
if isinstance(inputs, torch.Tensor):
# Inputs may be a tensor for evaluation.
# Use the total number of inputs + the batch size as the number of output tokens.
return torch.flatten(inputs).shape[0] + inputs.shape[0]
return len(inputs.keys()) + 1


def get_used_tokens_for_ecd(inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor]) -> int:
"""Returns the number of used tokens for an ECD model.
The number of used tokens is the total size of the input and output tensors, which corresponds to 1 token for
binary, category, and number features, and variable number of tokens for text features, for each example in the
batch.
Args:
inputs: The input tensors for one forward pass through ecd.
targets: The target tensors for one forward pass through ecd.
"""
used_tokens = 0
for input_feature_tensor in inputs.values():
used_tokens += torch.flatten(input_feature_tensor).shape[0]
if targets is not None:
# targets may be None for evaluation.
for output_feature_tensor in targets.values():
used_tokens += torch.flatten(output_feature_tensor).shape[0]
return used_tokens


def get_used_tokens_for_llm(model_inputs: torch.Tensor, tokenizer) -> int:
"""Returns the number of used tokens for an LLM model.
Args:
model_inputs: torch.Tensor with the merged input and target IDs.
tokenizer: The tokenizer used to encode the inputs.
Returns:
The total number of non-pad tokens, for all examples in the batch.
"""
return torch.sum(model_inputs != tokenizer.pad_token_id).item()
7 changes: 5 additions & 2 deletions ludwig/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import List, Optional, Union

import numpy as np
import torch
import torchinfo

from ludwig.api import LudwigModel
Expand Down Expand Up @@ -156,8 +157,10 @@ def save_tensors(collected_tensors, output_directory):
filenames = []
for tensor_name, tensor_value in collected_tensors:
np_filename = os.path.join(output_directory, make_safe_filename(tensor_name) + ".npy")
np.save(np_filename, tensor_value.detach().cpu().numpy())
filenames.append(np_filename)
if isinstance(tensor_value, torch.Tensor):
# Skip non-tensor collected artifacts, e.g. used_tokens.
np.save(np_filename, tensor_value.detach().cpu().numpy())
filenames.append(np_filename)
return filenames


Expand Down
1 change: 1 addition & 0 deletions ludwig/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
USE_PRETRAINED = "use_pretrained"
TRAINABLE = "trainable"
CLASS_WEIGHTS = "class_weights"
USED_TOKENS = "used_tokens"
LOSS = "loss"
ROC_AUC = "roc_auc"
EVAL_LOSS = "eval_loss"
Expand Down
9 changes: 7 additions & 2 deletions ludwig/models/ecd.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import numpy as np
import torch

from ludwig.accounting.used_tokens import get_used_tokens_for_ecd
from ludwig.combiners.combiners import create_combiner
from ludwig.constants import MODEL_ECD, MODEL_LLM
from ludwig.constants import MODEL_ECD, MODEL_LLM, USED_TOKENS
from ludwig.globals import MODEL_WEIGHTS_FILE_NAME
from ludwig.models.base import BaseModel
from ludwig.schema.model_types.ecd import ECDModelConfig
Expand Down Expand Up @@ -146,7 +147,11 @@ def forward(

encoder_outputs = self.encode(inputs)
combiner_outputs = self.combine(encoder_outputs)
return self.decode(combiner_outputs, targets, mask)
decoder_outputs = self.decode(combiner_outputs, targets, mask)

# Compute the number of used tokens.
decoder_outputs[USED_TOKENS] = get_used_tokens_for_ecd(inputs, targets)
return decoder_outputs

def unskip(self):
for k in self.input_features.keys():
Expand Down
5 changes: 4 additions & 1 deletion ludwig/models/gbm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import torch
from hummingbird.ml import convert

from ludwig.constants import BINARY, LOGITS, MODEL_GBM, NUMBER
from ludwig.accounting.used_tokens import get_used_tokens_for_gbm
from ludwig.constants import BINARY, LOGITS, MODEL_GBM, NUMBER, USED_TOKENS
from ludwig.features.base_feature import OutputFeature
from ludwig.globals import MODEL_WEIGHTS_FILE_NAME
from ludwig.models.base import BaseModel
Expand Down Expand Up @@ -177,6 +178,8 @@ def forward(

output_feature_utils.set_output_feature_tensor(output_logits, output_feature_name, LOGITS, logits)

# 1 token for each input feature + 1 token for the output feature.
output_logits[USED_TOKENS] = get_used_tokens_for_gbm(inputs)
return output_logits

def save(self, save_path):
Expand Down
17 changes: 10 additions & 7 deletions ludwig/models/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import torch
from transformers import AutoConfig, GenerationConfig

from ludwig.constants import IGNORE_INDEX_TOKEN_ID, LOGITS, MODEL_LLM, PREDICTIONS, TEXT
from ludwig.accounting.used_tokens import get_used_tokens_for_llm
from ludwig.constants import IGNORE_INDEX_TOKEN_ID, LOGITS, MODEL_LLM, PREDICTIONS, TEXT, USED_TOKENS
from ludwig.features.base_feature import ModuleWrapper, OutputFeature
from ludwig.features.feature_utils import LudwigFeatureDict
from ludwig.features.text_feature import TextOutputFeature
Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(
self,
config_obj: LLMModelConfig,
random_seed=None,
device=None,
_device=None,
**_kwargs,
):
super().__init__(random_seed=random_seed)
Expand Down Expand Up @@ -125,7 +126,7 @@ def __init__(
except KeyError as e:
raise KeyError(
f"An input feature has a name that conflicts with a class attribute of torch's ModuleDict: {e}"
)
) from e

# This is used to store the model inputs during the forward pass when fine-tuning LLMs. This allows us to have
# access to the joint model inputs (input_ids and target_ids) when computing metrics. In particular, the target
Expand Down Expand Up @@ -298,6 +299,8 @@ def forward(
# (which is already the case)
outputs[prediction_key] = prediction_tensor.type(torch.float32)

# Add token usage.
outputs[USED_TOKENS] = get_used_tokens_for_llm(self.model_inputs, self.tokenizer)
return outputs

def generate(
Expand Down Expand Up @@ -566,10 +569,10 @@ def save_base_model(self, save_path):
if self.config_obj.trainer.type != "none":
weights_save_path = os.path.join(save_path, MODEL_WEIGHTS_FILE_NAME)
self.model.base_model.save_pretrained(weights_save_path)
"""While this class initializes the tokenizer (from the base_model) automatically, and hence does not
need to be saved if inference is to be done using LudwigModel.predict(), the rationale for saving the
tokenizer to HuggingFace Hub is to provide access to models fine-tuned and persisted to HuggingFace Hub
using Ludwig at a later time, with the ability to perform inference, independently of Ludwig itself."""
# While this class initializes the tokenizer (from the base_model) automatically, and hence does not
# need to be saved if inference is to be done using LudwigModel.predict(), the rationale for saving the
# tokenizer to HuggingFace Hub is to provide access to models fine-tuned and persisted to HuggingFace Hub
# using Ludwig at a later time, with the ability to perform inference, independently of Ludwig itself.
self.tokenizer.save_pretrained(weights_save_path)
else:
logger.info("Skipped saving LLM without weight adjustments.")
Expand Down
77 changes: 50 additions & 27 deletions ludwig/trainers/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@
import torch
from torch.utils.tensorboard import SummaryWriter

from ludwig.constants import AUTO, LOSS, MAX_CPU_BATCH_SIZE, MINIMIZE, MODEL_ECD, TEST, TRAINING, VALIDATION
from ludwig.constants import (
AUTO,
LOSS,
MAX_CPU_BATCH_SIZE,
MINIMIZE,
MODEL_ECD,
TEST,
TRAINING,
USED_TOKENS,
VALIDATION,
)
from ludwig.data.dataset.base import Dataset
from ludwig.distributed.base import DistributedStrategy, LocalStrategy
from ludwig.globals import (
Expand Down Expand Up @@ -263,7 +273,7 @@ def train_step(
targets: Dict[str, torch.Tensor],
should_step: bool = True,
profiler: Optional[torch.profiler.profile] = None,
) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
) -> Tuple[torch.Tensor, Dict[str, torch.Tensor], torch.Tensor]:
"""Performs a single training step.
Params:
Expand All @@ -272,7 +282,10 @@ def train_step(
should_step: Whether to perform a step of the optimizer after computing gradients.
Returns:
A tuple of the loss tensor and a dictionary of loss for every output feature.
A tuple of:
1. loss tensor
2. dictionary of loss for every output feature.
3. tokens usage tensor
"""
if isinstance(self.optimizer, torch.optim.LBFGS):
# NOTE: Horovod is not supported for L-BFGS.
Expand Down Expand Up @@ -303,7 +316,7 @@ def closure():
predictions = self.model.outputs_to_predictions(model_outputs)
self.model.update_metrics(targets, predictions)

return loss, all_losses
return loss, all_losses, model_outputs[USED_TOKENS]

with torch.cuda.amp.autocast() if self.use_amp else contextlib.nullcontext():
with self.distributed.prepare_model_update(self.dist_model, should_step=should_step):
Expand All @@ -314,6 +327,8 @@ def closure():
)
loss = loss / self.gradient_accumulation_steps

used_tokens = model_outputs[USED_TOKENS]

# Begin the backward pass
variables = self.dist_model.parameters()
if self.use_amp:
Expand All @@ -323,7 +338,7 @@ def closure():

if not should_step:
# Short-circuit the parameter updates if we are still accumulating gradients
return loss, all_losses
return loss, all_losses, used_tokens

# Wait for gradient aggregation to complete before clipping the gradients
# When using AMP, we need to do this before unscaling.
Expand Down Expand Up @@ -363,7 +378,7 @@ def closure():
if profiler:
profiler.step()

return loss, all_losses
return loss, all_losses, used_tokens

def clip_grads(self, variables):
"""Applies gradient clipping."""
Expand Down Expand Up @@ -393,10 +408,16 @@ def write_eval_summary(
summary_writer.flush()

@classmethod
def write_step_summary(cls, train_summary_writer, combined_loss, all_losses, step, learning_rate=None):
def write_step_summary(
cls, train_summary_writer, combined_loss, all_losses, step, used_tokens, total_tokens_used, learning_rate=None
):
if not train_summary_writer:
return

# token information.
train_summary_writer.add_scalar("tokens/tokens", used_tokens, global_step=step)
train_summary_writer.add_scalar("tokens/total_tokens_used", total_tokens_used, global_step=step)

# combined loss
train_summary_writer.add_scalar("combined/step_training_loss", combined_loss, global_step=step)

Expand Down Expand Up @@ -773,6 +794,17 @@ def run_evaluation(

return should_break

def save_checkpoint(self, progress_tracker: ProgressTracker, save_path: str, checkpoint_manager: CheckpointManager):
"""Checkpoints the model, progress tracker, and invokes the checkpoint callback."""
progress_tracker.increment_checkpoint()

checkpoint_manager.save(progress_tracker.steps)
if self.is_coordinator():
progress_tracker.save(os.path.join(save_path, TRAINING_PROGRESS_TRACKER_FILE_NAME))

# Callback that the checkpoint was reached, regardless of whether the model was evaluated.
self.callback(lambda c: c.on_checkpoint(self, progress_tracker))

def train(
self,
training_set,
Expand Down Expand Up @@ -1029,14 +1061,7 @@ def train(
break

if not self.skip_save_progress:
progress_tracker.checkpoint_number += 1

checkpoint_manager.save(progress_tracker.steps)
if self.is_coordinator():
progress_tracker.save(os.path.join(save_path, TRAINING_PROGRESS_TRACKER_FILE_NAME))

# Callback that the checkpoint was reached, regardless of whether the model was evaluated.
self.callback(lambda c: c.on_checkpoint(self, progress_tracker))
self.save_checkpoint(progress_tracker, save_path, checkpoint_manager)

if not self.skip_save_model and self.skip_all_evaluation:
# All evaluation was skipped, so save the current step as the best so far.
Expand Down Expand Up @@ -1199,17 +1224,22 @@ def _train_loop(
for o_feat in self.model.output_features.values()
}

loss, all_losses = self.train_step(inputs, targets, should_step=should_step, profiler=profiler)
loss, all_losses, used_tokens = self.train_step(inputs, targets, should_step=should_step, profiler=profiler)

# Update LR schduler here instead of train loop to avoid updating during batch size tuning, etc.
self.scheduler.step()

# Update progress tracker with token information.
progress_tracker.set_token_usage_for_this_step(used_tokens)

if self.is_coordinator() and not self.skip_save_log:
self.write_step_summary(
train_summary_writer=train_summary_writer,
combined_loss=loss.detach().float(),
all_losses=all_losses,
step=progress_tracker.steps,
used_tokens=used_tokens,
total_tokens_used=progress_tracker.total_tokens_used,
learning_rate=progress_tracker.learning_rate,
)

Expand All @@ -1218,9 +1248,9 @@ def _train_loop(
progress_bar.update(1)
if self.is_coordinator():
logger.debug(
f"training: completed batch {progress_bar.total_steps} "
f"memory used: "
f"{psutil.Process(os.getpid()).memory_info()[0] / 1e6:0.2f}MB"
"training: completed batch %s memory used: %.2fMB",
progress_bar.total_steps,
psutil.Process(os.getpid()).memory_info()[0] / 1e6,
)

# Executing `on_batch_end` calls before `run_evaluation` enables more accurate
Expand Down Expand Up @@ -1271,14 +1301,7 @@ def _train_loop(
# this should not make a difference, except in the unlikely event an error occurs during eval and we
# want to resume from the last checkpoint, in which case we will lose slightly more progress this way.
if not self.skip_save_progress:
# This saves the latest checkpoint, which is the same as the current step.
progress_tracker.checkpoint_number += 1
checkpoint_manager.save(progress_tracker.steps)
if self.is_coordinator():
progress_tracker.save(os.path.join(save_path, TRAINING_PROGRESS_TRACKER_FILE_NAME))

# Callback that the checkpoint was reached, regardless of whether the model was evaluated or not.
self.callback(lambda c: c.on_checkpoint(self, progress_tracker))
self.save_checkpoint(progress_tracker, save_path, checkpoint_manager)

# If this was the last batch, then increment the epoch counter and invoke the `on_epoch_end` callback.
if batcher.last_batch():
Expand Down
Loading

0 comments on commit b0795e7

Please sign in to comment.