diff --git a/nemo/export/tensorrt_llm.py b/nemo/export/tensorrt_llm.py index 401ac2e930a6..7705f6553210 100644 --- a/nemo/export/tensorrt_llm.py +++ b/nemo/export/tensorrt_llm.py @@ -28,7 +28,8 @@ from nemo.deploy import ITritonDeployable from nemo.export.tarutils import TarPath, unpack_tarball -from nemo.export.trt_llm.nemo_utils import get_tokenzier, is_nemo_file, nemo_to_trtllm_config +from nemo.export.trt_llm.converter.model_converter import model_to_trtllm_ckpt +from nemo.export.trt_llm.nemo_ckpt_loader.nemo_file import get_tokenzier, is_nemo_file, load_nemo_model from nemo.export.trt_llm.qnemo import qnemo_to_tensorrt_llm from nemo.export.trt_llm.qnemo.tokenizer_utils import get_nmt_tokenizer from nemo.export.trt_llm.tensorrt_llm_build import build_and_save_engine @@ -225,15 +226,16 @@ def export( lora_target_modules=lora_target_modules, ) else: - weights_dicts, model_configs, self.tokenizer = nemo_to_trtllm_config( - in_file=nemo_checkpoint_path, + model, model_configs, self.tokenizer = load_nemo_model(nemo_checkpoint_path, nemo_export_dir) + weights_dicts, model_configs = model_to_trtllm_ckpt( + model=model, + nemo_model_config=model_configs, + nemo_export_dir=nemo_export_dir, decoder_type=model_type, dtype=dtype, tensor_parallel_size=tensor_parallel_size, pipeline_parallel_size=pipeline_parallel_size, use_parallel_embedding=use_parallel_embedding, - nemo_export_dir=nemo_export_dir, - save_nemo_model_config=save_nemo_model_config, ) for weight_dict, model_config in zip(weights_dicts, model_configs): diff --git a/nemo/export/trt_llm/converter/__init__.py b/nemo/export/trt_llm/converter/__init__.py new file mode 100644 index 000000000000..4fc50543f1d2 --- /dev/null +++ b/nemo/export/trt_llm/converter/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/nemo/export/trt_llm/nemo_utils.py b/nemo/export/trt_llm/converter/model_converter.py similarity index 54% rename from nemo/export/trt_llm/nemo_utils.py rename to nemo/export/trt_llm/converter/model_converter.py index 7e687ce020da..5e522d8bbff2 100644 --- a/nemo/export/trt_llm/nemo_utils.py +++ b/nemo/export/trt_llm/converter/model_converter.py @@ -13,14 +13,9 @@ # limitations under the License. -import argparse import csv -import datetime import logging -import os -import sys -from pathlib import Path -from typing import Dict, List, Tuple, Union +from typing import Dict, List, Tuple import numpy as np import tensorrt_llm @@ -28,20 +23,9 @@ from tensorrt_llm.functional import non_gated_version from tensorrt_llm.layers import MoeConfig from tensorrt_llm.models.modeling_utils import PretrainedConfig -from transformers import AutoTokenizer, LlamaConfig, PreTrainedTokenizer -from nemo.export.tarutils import TarPath -from nemo.export.trt_llm.nemo.nemo import UnpackedNemoCheckpointDir -from nemo.export.trt_llm.nemo.nemo_ckpt_convert import build_tokenizer, convert_dist_checkpoint - - -DECODER_MODEL_TYPE = { - "gptj": 'GPTForCausalLM', - "gptnext": 'GPTForCausalLM', - "llama": 'LLaMAForCausalLM', - "gemma": 'GemmaForCausalLM', - "falcon": 'FalconForCausalLM', -} +from nemo.export.trt_llm.converter.model_to_trt_llm_ckpt import convert_model_to_trt_llm_ckpt +from nemo.export.trt_llm.converter.utils import DECODER_MODEL_TYPE, split LOGGER = logging.getLogger("NeMo") @@ -80,181 +64,26 @@ def prompt_convert(prompt_config, prompt_weights): return vtokens_embeddings -def is_nemo_file(path): - flag = False - - if path is not None: - if len(path) > 5: - pc = pathlib.Path(path) - if pc.exists(): - if pc.is_file(): - if path[-5 : len(path)] == ".nemo": - flag = True - - return flag - - -def split(v, tp_size, idx, dim=0): - """Splits the np tensor v on dim and return the idx's slice.""" - if tp_size == 1: - return v - if len(v.shape) == 1: - return np.ascontiguousarray(np.split(v, tp_size)[idx]) - else: - return np.ascontiguousarray(np.split(v, tp_size, axis=dim)[idx]) - - -def _nemo_llm_decode( - in_file: str, - out_dir: str, - tensor_parallelism: int = 1, - processes: int = 1, - storage_type: str = "bfloat16", - load_checkpoints_on_gpu: bool = False, - decoder_type: str = "gptnext", - use_parallel_embedding: bool = False, - save_nemo_model_config: bool = False, -) -> Tuple[Dict[str, np.ndarray], PretrainedConfig, PreTrainedTokenizer]: - """Decodes the NEMO file and returns the weights dict, llm config and tokenizer.""" - args = argparse.Namespace() - args.out_dir = out_dir - args.tensor_parallelism = tensor_parallelism - args.processes = processes - args.storage_type = storage_type - args.load_checkpoints_on_gpu = load_checkpoints_on_gpu - args.verbose = False - args.decoder_type = decoder_type - args.use_parallel_embedding = use_parallel_embedding - - if not os.path.exists(in_file): - LOGGER.error("%s does not exist", in_file) - sys.exit(1) - - if os.path.isdir(in_file): - nemo_dir = Path(in_file) - else: - nemo_dir = TarPath(in_file) - - try: - unpacked_checkpoint_dir = UnpackedNemoCheckpointDir( - nemo_dir, load_checkpoints_to_cpu=not args.load_checkpoints_on_gpu - ) - - start_time = datetime.datetime.now() - dist_ckpt_folder = nemo_dir / "model_weights" - - if dist_ckpt_folder.exists(): - weights_dict, llm_config, tokenizer = convert_dist_checkpoint(unpacked_checkpoint_dir, args) - else: - raise Exception( - "Not a supported nemo file format. " "Only distributed mcore nemo checkpoints are support." - ) - - LOGGER.info("Spent %s (h:m:s) to convert the model", datetime.datetime.now() - start_time) - - if save_nemo_model_config: - # Copy the config file without using shutil.copy(...) because input may be a TarPath - with (unpacked_checkpoint_dir._checkpoints_dir / "model_config.yaml").open("rb") as infile: - with open(os.path.join(args.out_dir, "model_config.yaml"), "wb") as outfile: - outfile.write(infile.read()) - finally: - if isinstance(nemo_dir, TarPath): - nemo_dir.tarobject.close() - - return weights_dict, llm_config, tokenizer - - -def get_tokenzier(tokenizer_dir_or_path: Path) -> PreTrainedTokenizer: - """Loads the tokenizer from the decoded NEMO weights dir.""" - if os.path.isdir(os.path.join(tokenizer_dir_or_path, "huggingface_tokenizer")): - return AutoTokenizer.from_pretrained(os.path.join(tokenizer_dir_or_path, "huggingface_tokenizer")) - - model_path = tokenizer_dir_or_path / "tokenizer.model" if tokenizer_dir_or_path.is_dir() else tokenizer_dir_or_path - tokenizer_config = {"library": "sentencepiece", "model": str(model_path)} - return build_tokenizer(tokenizer_config) - - -def to_word_list_format( - word_dict: List[List[str]], - tokenizer=None, - ref_str="", -): - ''' - format of word_dict - len(word_dict) should be same to batch_size - word_dict[i] means the words for batch i - len(word_dict[i]) must be 1, which means it only contains 1 string - This string can contains several sentences and split by ",". - For example, if word_dict[2] = " I am happy, I am sad", then this function will return - the ids for two short sentences " I am happy" and " I am sad". - ''' - assert tokenizer is not None, "need to set tokenizer" - - flat_ids = [] - offsets = [] - # The encoding of a single word can't always be trusted. See - # https://github.com/NVIDIA/NeMo/blob/bb575b72fd0be51ae10cc77d9f89ddb9e9d3b96d/nemo/collections/nlp/modules/common/text_generation_strategy.py#L229 - ids_ref = tokenizer.encode(ref_str) - for word_dict_item in word_dict: - item_flat_ids = [] - item_offsets = [] - - if isinstance(word_dict_item[0], bytes): - word_dict_item = [word_dict_item[0].decode()] - - words = list(csv.reader(word_dict_item))[0] - for word in words: - ids = tokenizer.encode(f"{ref_str}{word}") - if ids[0 : len(ids_ref)] == ids_ref: - # It worked! We can obtain the token(s) associated to `word` by stripping the prefix tokens. - ids = ids[len(ids_ref) :] - else: - # Unfortunately the prefix was merged with `word`. We could try with a different prefix, but - # for now we just use the basic encoding since this should be a very rare edge case. - ids = tokenizer.encode(word) - logging.warning(f"The encoding of word '{word}' into tokens {ids} might be incorrect") - - if len(ids) == 0: - continue - - item_flat_ids += ids - item_offsets.append(len(ids)) - - flat_ids.append(np.array(item_flat_ids)) - offsets.append(np.cumsum(np.array(item_offsets))) - - pad_to = max(1, max(len(ids) for ids in flat_ids)) - - for i, (ids, offs) in enumerate(zip(flat_ids, offsets)): - flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0) - offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1) - - return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2)) - - -def nemo_to_trtllm_config( - in_file: str, +def model_to_trtllm_ckpt( + model, + nemo_model_config, + nemo_export_dir, decoder_type: str, - nemo_export_dir: Union[str, Path], dtype: str = "bfloat16", tensor_parallel_size: int = 1, pipeline_parallel_size: int = 1, use_parallel_embedding: bool = False, - save_nemo_model_config: bool = False, -) -> Tuple[List[Dict], List[PretrainedConfig], PreTrainedTokenizer]: - """Converts the NEMO file and construct the `PretrainedConfig` before tensorrt_llm deployment.""" - dtype_str = dtype - - weights_dict, nemo_model_config, tokenizer = _nemo_llm_decode( - in_file=in_file, - out_dir=nemo_export_dir, - tensor_parallelism=tensor_parallel_size, +) -> Tuple[List[Dict], List[PretrainedConfig]]: + + weights_dict = convert_model_to_trt_llm_ckpt( + model=model, + nemo_model_config=nemo_model_config, + nemo_export_dir=nemo_export_dir, + inference_tp_size=tensor_parallel_size, processes=1, - storage_type=dtype_str, + storage_type=dtype, use_parallel_embedding=use_parallel_embedding, - load_checkpoints_on_gpu=False, decoder_type=decoder_type, - save_nemo_model_config=save_nemo_model_config, ) world_size = tensor_parallel_size * pipeline_parallel_size @@ -275,7 +104,7 @@ def nemo_to_trtllm_config( config = { 'architecture': DECODER_MODEL_TYPE[decoder_type], - 'dtype': dtype_str, + 'dtype': dtype, 'num_hidden_layers': nemo_model_config.get('num_layers'), 'num_attention_heads': nemo_model_config.get('num_attention_heads'), 'num_key_value_heads': nemo_model_config.get('num_query_groups', nemo_model_config['num_attention_heads']), @@ -387,4 +216,4 @@ def nemo_to_trtllm_config( model_configs.append(model_config) weights_dicts.append(weights_dict_local) - return weights_dicts, model_configs, tokenizer + return weights_dicts, model_configs diff --git a/nemo/export/trt_llm/converter/model_to_trt_llm_ckpt.py b/nemo/export/trt_llm/converter/model_to_trt_llm_ckpt.py new file mode 100644 index 000000000000..df7e43548a44 --- /dev/null +++ b/nemo/export/trt_llm/converter/model_to_trt_llm_ckpt.py @@ -0,0 +1,251 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import math +import multiprocessing +from collections import defaultdict +from pathlib import Path + +import numpy as np +import torch +from tensorrt_llm._utils import pad_vocab_size, str_dtype_to_torch, torch_to_numpy +from tqdm import tqdm + +from nemo.export.trt_llm.converter.utils import split_and_save_weight + +LOGGER = logging.getLogger("NeMo") + +layer_names = { + "position_embedding": "embedding.position_embeddings.weight", + "word_embedding": "embedding.word_embeddings.weight", + "output_layer": "output_layer.weight", + "final_layernorm.weight": "final_layernorm.weight", + "final_layernorm.bias": "final_layernorm.bias", +} + + +def extract_layers_with_prefix(model_, prefix): + length_to_trim = len(prefix) + model_state = model_.get("state_dict", model_) + return {key[length_to_trim:]: model_state[key] for key in model_state.keys() if prefix in key} + + +def get_layer_name(layer_type: str, prefix: str): + layer_dict = layer_names + if layer_type in layer_dict: + return prefix + layer_dict[layer_type] + else: + raise ValueError(f"Unknown layer type {layer_type}") + + +def get_layer_prefix(layer_names, is_mcore): + transformer_layer_prefix = None + + for layer_name in layer_names: + if 'self_attention' in layer_name: + transformer_layer_prefix = layer_name.split('layers')[0] + break + assert transformer_layer_prefix is not None, "Cannot extract transformer layer prefix from {layer_name}" + if is_mcore: + model_prefix = transformer_layer_prefix.split('decoder')[0] + else: + model_prefix = transformer_layer_prefix.split('encoder')[0] + assert model_prefix is not None, "Cannot extract model prefix from {layer_name}" + + return model_prefix, transformer_layer_prefix + + +def rename_key_dist_ckpt(old_key: str, layer: int): + new_key = old_key + + if "layers." in old_key: + split_key = old_key.split(".") + split_key.insert(1, str(layer)) + new_key = ".".join(split_key) + + if "self_attention" in new_key: + new_key = new_key.replace("self_attention", "attention") + if "attention.linear_qkv.layer_norm_weight" in new_key: + new_key = new_key.replace("attention.linear_qkv.layer_norm_weight", "input_layernorm.weight") + if "attention.linear_qkv.layer_norm_bias" in new_key: + new_key = new_key.replace("attention.linear_qkv.layer_norm_bias", "input_layernorm.bias") + if "mlp.linear_fc1.layer_norm_weight" in new_key: + new_key = new_key.replace("mlp.linear_fc1.layer_norm_weight", "post_attention_layernorm.weight") + if "mlp.linear_fc1.layer_norm_bias" in new_key: + new_key = new_key.replace("mlp.linear_fc1.layer_norm_bias", "post_attention_layernorm.bias") + + return new_key + + +@torch.no_grad() +def convert_model_to_trt_llm_ckpt( + nemo_model_config, + model, + nemo_export_dir, + storage_type, + inference_tp_size, + decoder_type, + use_parallel_embedding, + processes, +): + + # if checkpoints files could be found - start preparing output dir + out_dir = create_export_dir(nemo_export_dir) + storage_type = str_dtype_to_torch(storage_type) + is_mcore = nemo_model_config.get("mcore_gpt", False) + + # load position_embedding from rank 0 + model_state_dict = model.get("state_dict", model) + + prefix, transformer_layer_prefix = get_layer_prefix(model_state_dict.keys(), is_mcore) + + has_position_embedding = get_layer_name("position_embedding", prefix) in model_state_dict + has_lm_head = get_layer_name("output_layer", prefix) in model_state_dict + share_embeddings_and_output = nemo_model_config.get("share_embeddings_and_output_weights", False) + embedding_scaling = nemo_model_config.get("apply_embedding_scaling", False) + hidden_size = nemo_model_config["hidden_size"] + + num_layers = nemo_model_config["num_layers"] + training_tp_size = 1 + training_pp_size = 1 + num_kv_heads = nemo_model_config.get("num_query_groups", 0) + multi_query_mode = nemo_model_config.get("multi_query_mode", False) + num_attention_heads = nemo_model_config["num_attention_heads"] + kv_channels = nemo_model_config.get("kv_channels", None) + + if num_kv_heads == 0: + if multi_query_mode: + num_kv_heads = 1 + else: + num_kv_heads = num_attention_heads + + export_config = { + "apply_layernorm_1p": nemo_model_config.get("normalization", "") == "layernorm1p", + "tp_size": training_tp_size, + "split_gated_activation": nemo_model_config.get("activation", "gelu") + in ["swiglu", "geglu", "fast-swiglu", "fast-geglu"] + and (decoder_type == "gptnext" or is_mcore), + "num_attention_heads": num_attention_heads, + "num_kv_heads": num_kv_heads, + "kv_channels": kv_channels, + "use_attention_nemo_shape": True, + "transpose_weights": True, + "use_parallel_embedding": use_parallel_embedding, + } + + # split_factor: in how many parts a TP training node is split + split_factor = inference_tp_size + model_level_weights = defaultdict(list) + + def handle_model_level_weights(model, tp_idx: int, pp_idx: int): + if tp_idx == 0 and pp_idx == 0: + if has_position_embedding: + val = model[get_layer_name("position_embedding", prefix)] + val = torch_to_numpy(val.to(storage_type).cpu()) + model_level_weights["transformer.position_embedding.weight"].append(val) + if pp_idx == 0: + val = model.get("state_dict", model)[get_layer_name("word_embedding", prefix)] + if embedding_scaling: + val = val * float(math.sqrt(hidden_size)) + + vocab_size = val.shape[0] + if use_parallel_embedding: + # Pad vocab_size first + if vocab_size % inference_tp_size != 0: + vocab_size_padded = pad_vocab_size(vocab_size, inference_tp_size) + pad_width = vocab_size_padded - vocab_size + val = torch.nn.functional.pad(val, (0, 0, 0, pad_width), value=0) + + val = torch_to_numpy(val.to(storage_type).cpu()) + model_level_weights["transformer.vocab_embedding.weight"].append(val) + if share_embeddings_and_output: + val = model.get("state_dict", model)[get_layer_name("word_embedding", prefix)] + val = torch_to_numpy(val.to(storage_type).cpu()) + model_level_weights["lm_head.weight"].append(val) + if has_lm_head and pp_idx == training_pp_size - 1: + val = model.get("state_dict", model)[get_layer_name("output_layer", prefix)] + val = torch_to_numpy(val.to(storage_type).cpu()) + model_level_weights["lm_head.weight"].append(val) + + weights_dict = {} + + tp_rank = 0 + + handle_model_level_weights(model, 0, 0) + model = extract_layers_with_prefix(model, transformer_layer_prefix) + + starmap_args = [] + for key, val in model.items(): + if "_extra_state" not in key: + if len(val.size()) == 1: + starmap_args.append( + ( + tp_rank, + out_dir, + split_factor, + # Let's rename/map the key to the old layer name previously. You can try printing out + # the rename_key output of the old llama checkpoint and compare. + rename_key_dist_ckpt(key, 0), + # Since the state dict value has the full layers, let's select the ith layer weights/biases here. + [val], + storage_type, + None, + export_config, + ) + ) + else: + for i in range(num_layers): + starmap_args.append( + ( + tp_rank, + out_dir, + split_factor, + # Let's rename/map the key to the old layer name previously. You can try printing out + # the rename_key output of the old llama checkpoint and compare. + rename_key_dist_ckpt(key, i), + # Since the state dict value has the full layers, let's select the ith layer weights/biases here. + [val[i]], + storage_type, + None, + export_config, + ) + ) + + starmap_args = tqdm(starmap_args, desc="saving weights") + + if processes > 1: + with multiprocessing.Pool(processes) as pool: + weights_dicts = pool.starmap(split_and_save_weight, starmap_args) + weights_dict_local = {k: v for d in weights_dicts for k, v in d.items()} + else: + # simpler for debug situations + for starmap_arg in starmap_args: + weights_dict_local = split_and_save_weight(*starmap_arg) + + weights_dict.update(weights_dict_local) + + for key, values in model_level_weights.items(): + model_level_weights[key] = np.concatenate(values, axis=0) + weights_dict[key] = model_level_weights[key] + + return weights_dict + + +def create_export_dir(nemo_export_dir): + out_dir = Path(nemo_export_dir) + if not out_dir.exists(): + out_dir.mkdir(parents=True) + return out_dir diff --git a/nemo/export/trt_llm/nemo/convert.py b/nemo/export/trt_llm/converter/utils.py similarity index 97% rename from nemo/export/trt_llm/nemo/convert.py rename to nemo/export/trt_llm/converter/utils.py index aa2a29888703..469d624bdb18 100644 --- a/nemo/export/trt_llm/nemo/convert.py +++ b/nemo/export/trt_llm/converter/utils.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Utilities for exporting a model to our custom format.""" import numpy as np import torch @@ -23,6 +22,15 @@ weights_dict = {} +DECODER_MODEL_TYPE = { + "gptj": 'GPTForCausalLM', + "gptnext": 'GPTForCausalLM', + "llama": 'LLaMAForCausalLM', + "gemma": 'GemmaForCausalLM', + "falcon": 'FalconForCausalLM', +} + + def save_val(val, dir, key, tp_num=None): suffix = "" if tp_num is None else f".{tp_num}.bin" # Transpose linear layer weights to the correct shape. @@ -396,3 +404,13 @@ def split_and_save_weight(tp_rank, saved_dir, split_factor, key, vals, storage_t global weights_dict return weights_dict + + +def split(v, tp_size, idx, dim=0): + """Splits the np tensor v on dim and return the idx's slice.""" + if tp_size == 1: + return v + if len(v.shape) == 1: + return np.ascontiguousarray(np.split(v, tp_size)[idx]) + else: + return np.ascontiguousarray(np.split(v, tp_size, axis=dim)[idx]) diff --git a/nemo/export/trt_llm/nemo/nemo.py b/nemo/export/trt_llm/nemo/nemo.py deleted file mode 100644 index 6276de5dddd9..000000000000 --- a/nemo/export/trt_llm/nemo/nemo.py +++ /dev/null @@ -1,255 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import functools -import logging -import pathlib -import typing - -import torch -import yaml -from transformers import FalconConfig, GPT2Config, LlamaConfig - -from nemo.export.tarutils import TarPath - -LOGGER = logging.getLogger("NeMo") - - -def cpu_map_location(storage, loc): - return storage.cpu() - - -def gpu_map_location(storage, loc): - if loc.startswith("cuda"): - training_gpu_idx = int(loc.split(":")[1]) - inference_gpu_idx = training_gpu_idx % torch.cuda.device_count() - return storage.cuda(inference_gpu_idx) - elif loc.startswith("cpu"): - return storage.cpu() - else: - raise ValueError(f"Not handled {loc}") - - -def nemo_to_llm_config(nemo_model_config, vocab_size, eos_id, bos_id, decoder_type): - convertion_dict = { - "activation_function": "activation", - "layer_norm_epsilon": "layernorm_epsilon", - "n_embd": "hidden_size", - "n_head": "num_attention_heads", - "n_layer": "num_layers", - "n_positions": "max_position_embeddings", - "rotary_pct": "rotary_percentage", - "rotary_base": "rotary_base", - "rotary_scaling": "seq_len_interpolation_factor", - "position_embedding_type": "position_embedding_type", - "bias": "bias", - "intermediate_size": "ffn_hidden_size", - "num_kv_heads": "num_query_groups", - "moe_num_experts": "num_moe_experts", - "moe_top_k": "moe_router_topk", - "moe_renorm_mode": "moe_renorm_mode", - "kv_channels": "kv_channels", - "norm_epsilon": "layernorm_epsilon", - } - - kwargs = {key: nemo_model_config[value] for key, value in convertion_dict.items() if value in nemo_model_config} - kwargs["vocab_size"] = vocab_size - kwargs["eos_token_id"] = eos_id - kwargs["bos_token_id"] = eos_id if decoder_type == 'falcon' else bos_id # in HF falcon eos==bos - if "moe_num_experts" not in kwargs: - kwargs["moe_num_experts"] = 0 - config_dict = {"llama": LlamaConfig, "falcon": FalconConfig, "gemma": LlamaConfig} - llm_config = config_dict[decoder_type] if decoder_type in config_dict else GPT2Config - - return llm_config(**kwargs) - - -def add_special_tokens_to_tokenizer(tokenizer): - # Need to add cls, sep, mask tokens to the tokenizer if they don't exist. - # If cls, sep and mask are not attributes of the tokenizer, add it. - if not hasattr(tokenizer, "cls_token"): - tokenizer.add_special_tokens({"cls_token": ""}) - if not hasattr(tokenizer.tokenizer, "sep_id"): - tokenizer.add_special_tokens({"sep_token": ""}) - if not hasattr(tokenizer.tokenizer, "mask_id"): - tokenizer.add_special_tokens({"mask_token": ""}) - - # bos, eos, pad and unk may be present in the provided spm .model file, if they are, use it. - if not hasattr(tokenizer, "pad_token"): - if hasattr(tokenizer.tokenizer, "pad_id") and tokenizer.tokenizer.pad_id() > 0: - tokenizer.pad_token = tokenizer.tokenizer.id_to_piece(tokenizer.tokenizer.pad_id()) - else: - tokenizer.add_special_tokens({"pad_token": ""}) - else: - tokenizer.add_special_tokens({"pad_token": ""}) - - if not hasattr(tokenizer, "bos_token"): - if hasattr(tokenizer.tokenizer, "bos_id") and tokenizer.tokenizer.bos_id() > 0: - tokenizer.bos_token = tokenizer.tokenizer.id_to_piece(tokenizer.tokenizer.bos_id()) - else: - tokenizer.add_special_tokens({"bos_token": ""}) - else: - tokenizer.add_special_tokens({"bos_token": ""}) - - if not hasattr(tokenizer, "eos_token"): - if hasattr(tokenizer.tokenizer, "eos_id") and tokenizer.tokenizer.eos_id() > 0: - tokenizer.eos_token = tokenizer.tokenizer.id_to_piece(tokenizer.tokenizer.eos_id()) - else: - tokenizer.add_special_tokens({"eos_token": ""}) - else: - tokenizer.add_special_tokens({"eos_token": ""}) - - -def extract_layers_with_prefix(model_, prefix): - length_to_trim = len(prefix) - model_state = model_.get("state_dict", model_) - return {key[length_to_trim:]: model_state[key] for key in model_state.keys() if prefix in key} - - -class UnpackedNemoCheckpointDir: - def __init__( - self, - checkpoints_dir: typing.Union[pathlib.Path, TarPath], - load_checkpoints_to_cpu: bool = False, - ): - assert isinstance(checkpoints_dir, (pathlib.Path, TarPath)) - self._checkpoints_dir = checkpoints_dir - self._load_checkpoints_to_cpu = load_checkpoints_to_cpu - - @property - @functools.lru_cache - def model_config(self): - model_config = None - - model_config_filename = "model_config.yaml" - model_configs_paths = list(self._checkpoints_dir.rglob(model_config_filename)) - if model_configs_paths: - if len(model_configs_paths) > 1: - LOGGER.debug(f"There are more than single {model_config_filename} in" f" {self._checkpoints_dir}") - model_config_path = model_configs_paths[0] - LOGGER.debug("Loading model config from %s", model_config_path) - with model_config_path.open("r") as model_config_file: - model_config = yaml.load(model_config_file, Loader=yaml.SafeLoader) - else: - LOGGER.debug("Searching model config in checkpoints") - # try to obtain from checkpoint - checkpoint_name = self.checkpoint_name - checkpoints_paths = sorted(self._checkpoints_dir.rglob(checkpoint_name)) - if checkpoints_paths: - # assume that parallel ranks 0 checkpoint should have model config embedded - checkpoint_path = checkpoints_paths[0] - - map_location_fn = cpu_map_location if self._load_checkpoints_to_cpu else gpu_map_location - - model_00 = torch.load(checkpoint_path, map_location=map_location_fn) - if "hyper_parameters" in model_00 and "cfg" in model_00["hyper_parameters"]: - model_config = model_00["hyper_parameters"]["cfg"] - LOGGER.debug("Loaded model config from checkpoint %s", checkpoint_path) - else: - LOGGER.debug("Could not find model config in checkpoint %s", checkpoint_path) - - del model_00 - - if model_config is None: - LOGGER.warning("Could not find checkpoint with NeMo model config in %s", self._checkpoints_dir) - - LOGGER.debug("Loaded model config %s", model_config) - - return model_config - - @property - def checkpoints_dir(self): - return self._checkpoints_dir - - def get_checkpoints_paths(self, tensor_model_parallel_size=1, pipeline_model_parallel_size=1): - """Injects tensor/pipeline model parallel ranks into the filepath. - Does nothing if not using model parallelism. - """ - checkpoint_path_without_rank = self.checkpoints_dir / self.checkpoint_name - - def _inject_parallel_ranks(tp_rank, pp_rank): - if tensor_model_parallel_size > 1 or pipeline_model_parallel_size > 1: - if pipeline_model_parallel_size is None or pipeline_model_parallel_size == 1: - checkpoint_path = ( - checkpoint_path_without_rank.parent - / f"mp_rank_{tp_rank:02d}" - / checkpoint_path_without_rank.name - ) - else: - checkpoint_path = ( - checkpoint_path_without_rank.parent - / f"tp_rank_{tp_rank:02d}_pp_rank_{pp_rank:03d}" - / checkpoint_path_without_rank.name - ) - return checkpoint_path - else: - return checkpoint_path_without_rank - - return [ - [ - _inject_parallel_ranks(tp_rank=tp_rank, pp_rank=pp_rank) - for pp_rank in range(pipeline_model_parallel_size) - ] - for tp_rank in range(tensor_model_parallel_size) - ] - - @property - @functools.lru_cache - def checkpoint_name(self): - patterns = [ - "model_weights.ckpt", # older megatron checkpoints - "*last.ckpt", # newer format of checkpoints - ] - for pattern in patterns: - model_files = sorted(list(self._checkpoints_dir.rglob(pattern))) - if model_files: - return model_files[0].name - - raise ValueError(f"Could not find checkpoint files in {self._checkpoints_dir}") - - @functools.lru_cache - def get_tokenizer_file_path(self, tokenizer_key, file_key, default_filename_pattern): - model_config = self.model_config - file_property = None - if tokenizer_key in model_config and file_key in model_config[tokenizer_key]: - file_property = model_config[tokenizer_key][file_key] - elif file_key in model_config: - file_property = model_config[file_key] - - LOGGER.debug("model_config[%s][%s]=%s", tokenizer_key, file_key, file_property) - - if file_property and file_property.startswith("nemo:"): - filename = file_property.split("nemo:")[1] - filename_pattern = f"*{filename}" - elif file_property and file_property.startswith("/artifacts/"): - filename = pathlib.Path(file_property).name - filename_pattern = f"*{filename}" - elif file_property is None or file_property == "None": - filename_pattern = None - else: - filename_pattern = default_filename_pattern - LOGGER.warning( - f"Tokenizer file from config: {tokenizer_key}.{file_key}={file_property} " - f"looks like unsupported path. Pattern {filename_pattern} will be used." - ) - - file_path = None - if filename_pattern is not None: - files_paths = list(self._checkpoints_dir.glob(filename_pattern)) - if files_paths: - assert len(files_paths) == 1 - file_path = files_paths[0] - - return file_path diff --git a/nemo/export/trt_llm/nemo/nemo_ckpt_convert.py b/nemo/export/trt_llm/nemo/nemo_ckpt_convert.py deleted file mode 100644 index d83129b43fab..000000000000 --- a/nemo/export/trt_llm/nemo/nemo_ckpt_convert.py +++ /dev/null @@ -1,647 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import configparser -import json -import logging -import math -import multiprocessing -import os -import typing -from collections import defaultdict -from pathlib import Path -from typing import Union - -import numpy as np -import tensorstore # This is important even though not used. Otherwise zarr raises error. -import torch -import zarr -from tensorrt_llm._utils import np_bfloat16, pad_vocab_size, str_dtype_to_torch, torch_to_numpy -from torch.distributed.checkpoint import FileSystemReader, TensorStorageMetadata -from torch.distributed.checkpoint.state_dict_loader import load_state_dict -from tqdm import tqdm -from transformers import AutoTokenizer, GPT2Tokenizer, LlamaConfig - -from nemo.export.tarutils import TarPath, ZarrPathStore -from nemo.export.trt_llm.nemo.convert import split_and_save_weight -from nemo.export.trt_llm.nemo.nemo import UnpackedNemoCheckpointDir, extract_layers_with_prefix, nemo_to_llm_config -from nemo.export.trt_llm.nemo.sentencepiece_tokenizer import SentencePieceTokenizer - -LOGGER = logging.getLogger("NeMo") - -layer_names = { - "position_embedding": "embedding.position_embeddings.weight", - "word_embedding": "embedding.word_embeddings.weight", - "output_layer": "output_layer.weight", - "final_layernorm.weight": "final_layernorm.weight", - "final_layernorm.bias": "final_layernorm.bias", -} - - -def get_layer_name(layer_type: str, prefix: str): - layer_dict = layer_names - if layer_type in layer_dict: - return prefix + layer_dict[layer_type] - else: - raise ValueError(f"Unknown layer type {layer_type}") - - -def get_layer_prefix(layer_names, is_mcore): - transformer_layer_prefix = None - - for layer_name in layer_names: - if 'self_attention' in layer_name: - transformer_layer_prefix = layer_name.split('layers')[0] - break - assert transformer_layer_prefix is not None, "Cannot extract transformer layer prefix from {layer_name}" - if is_mcore: - model_prefix = transformer_layer_prefix.split('decoder')[0] - else: - model_prefix = transformer_layer_prefix.split('encoder')[0] - assert model_prefix is not None, "Cannot extract model prefix from {layer_name}" - - return model_prefix, transformer_layer_prefix - - -def get_layer_index(split_key): - index = 0 - for key in split_key: - if key == "layers": - return index + 1 - index += 1 - - -def rename_key(old_key: str, pp_rank: int, num_layers: int, pp_size: int): - new_key = old_key - - if "layers." in old_key: - split_key = old_key.split(".") - layer_index = get_layer_index(split_key) - split_key[layer_index] = str(int(split_key[layer_index]) + pp_rank * num_layers // pp_size) - new_key = ".".join(split_key) - - if "self_attention" in new_key: - new_key = new_key.replace("self_attention", "attention") - if "attention.linear_qkv.layer_norm_weight" in new_key: - new_key = new_key.replace("attention.linear_qkv.layer_norm_weight", "input_layernorm.weight") - if "mlp.linear_fc1.layer_norm_weight" in new_key: - new_key = new_key.replace("mlp.linear_fc1.layer_norm_weight", "post_attention_layernorm.weight") - - return new_key - - -def rename_key_dist_ckpt(old_key: str, layer: int): - new_key = old_key - - if "layers." in old_key: - split_key = old_key.split(".") - split_key.insert(1, str(layer)) - new_key = ".".join(split_key) - - if "self_attention" in new_key: - new_key = new_key.replace("self_attention", "attention") - if "attention.linear_qkv.layer_norm_weight" in new_key: - new_key = new_key.replace("attention.linear_qkv.layer_norm_weight", "input_layernorm.weight") - if "attention.linear_qkv.layer_norm_bias" in new_key: - new_key = new_key.replace("attention.linear_qkv.layer_norm_bias", "input_layernorm.bias") - if "mlp.linear_fc1.layer_norm_weight" in new_key: - new_key = new_key.replace("mlp.linear_fc1.layer_norm_weight", "post_attention_layernorm.weight") - if "mlp.linear_fc1.layer_norm_bias" in new_key: - new_key = new_key.replace("mlp.linear_fc1.layer_norm_bias", "post_attention_layernorm.bias") - - return new_key - - -def load_sharded_metadata(checkpoint_dir: Union[Path, TarPath], torch_tensor=True): - with (checkpoint_dir / 'metadata.json').open(mode='r') as f: - config_dict = json.load(f) - if config_dict['sharded_backend'] == 'zarr': - return load_sharded_metadata_zarr(checkpoint_dir, torch_tensor) - elif config_dict['sharded_backend'] == 'torch_dist': - return load_sharded_metadata_torch_dist(checkpoint_dir, torch_tensor) - else: - raise NotImplementedError(f'Distributed checkpoint backend {config_dict["sharded_backend"]} not supported') - - -class TarFileSystemReader(FileSystemReader): - """Reader that accepts both Path and TarPath checkpoint directory. - - The FileSystemReader works with TarPath, but expects a pure Path. - It's enough to skip the Path check in __init__. - """ - - def __init__(self, path: Union[Path, TarPath]) -> None: - """No call to super().__init__ because it expects pure Path.""" - self.path = path - self.storage_data = dict() - - -def load_sharded_metadata_torch_dist(checkpoint_dir: Union[Path, TarPath], torch_tensor=True): - fs_reader = TarFileSystemReader(checkpoint_dir) - metadata = fs_reader.read_metadata() - - state_dict = { - k: torch.empty(tp.size, dtype=tp.properties.dtype) - for k, tp in metadata.state_dict_metadata.items() - if isinstance(tp, TensorStorageMetadata) - } - load_state_dict( - state_dict, - storage_reader=fs_reader, - no_dist=True, - ) - - if not torch_tensor: - for k, v in state_dict.items(): - if v.dtype == torch.bfloat16: - state_dict[k] = v.view(torch.int16).numpy().view(np_bfloat16) - else: - state_dict[k] = v.numpy() - return state_dict - - -def load_sharded_metadata_zarr(checkpoint_dir: Union[Path, TarPath], torch_tensor=True): - sharded_state_dict = {} - for subdir in checkpoint_dir.iterdir(): - if not subdir.is_dir() or not (subdir / '.zarray').exists(): - continue - key = subdir.name - - zstore = ZarrPathStore(subdir) - arr = zarr.open(zstore, 'r') - - if torch_tensor: - # sharded_state_dict[key] = torch.from_numpy(arr[:].astype("float32")).to(dtype=torch.bfloat16) - if arr.dtype.name == "bfloat16": - sharded_state_dict[key] = torch.from_numpy(arr[:].view(np.int16)).view(torch.bfloat16) - else: - sharded_state_dict[key] = torch.from_numpy(arr[:]) - else: - sharded_state_dict[key] = arr[:] - - return sharded_state_dict - - -@torch.no_grad() -def convert_dist_checkpoint(unpacked_checkpoints_dir: UnpackedNemoCheckpointDir, args): - nemo_model_config = unpacked_checkpoints_dir.model_config - checkpoints_path = unpacked_checkpoints_dir.checkpoints_dir / "model_weights" - - # if checkpoints files could be found - start preparing output dir - out_dir = create_out_dir(args) - - storage_type = str_dtype_to_torch(args.storage_type) - is_mcore = nemo_model_config.get("mcore_gpt", False) - - # load position_embedding from rank 0 - model = load_sharded_metadata(checkpoints_path) - model_state_dict = model.get("state_dict", model) - - prefix, transformer_layer_prefix = get_layer_prefix(model_state_dict.keys(), is_mcore) - - has_position_embedding = get_layer_name("position_embedding", prefix) in model_state_dict - has_lm_head = get_layer_name("output_layer", prefix) in model_state_dict - share_embeddings_and_output = nemo_model_config.get("share_embeddings_and_output_weights", False) - embedding_scaling = nemo_model_config.get("apply_embedding_scaling", False) - hidden_size = nemo_model_config["hidden_size"] - - num_layers = nemo_model_config["num_layers"] - training_tp_size = 1 - training_pp_size = 1 - inference_tp_size = args.tensor_parallelism - num_kv_heads = nemo_model_config.get("num_query_groups", 0) - multi_query_mode = nemo_model_config.get("multi_query_mode", False) - num_attention_heads = nemo_model_config["num_attention_heads"] - kv_channels = nemo_model_config.get("kv_channels", None) - use_parallel_embedding = args.use_parallel_embedding - if num_kv_heads == 0: - if multi_query_mode: - num_kv_heads = 1 - else: - num_kv_heads = num_attention_heads - - export_config = { - "apply_layernorm_1p": nemo_model_config.get("normalization", "") == "layernorm1p", - "tp_size": training_tp_size, - "split_gated_activation": nemo_model_config.get("activation", "gelu") - in ["swiglu", "geglu", "fast-swiglu", "fast-geglu"] - and (args.decoder_type == "gptnext" or is_mcore), - "num_attention_heads": num_attention_heads, - "num_kv_heads": num_kv_heads, - "kv_channels": kv_channels, - "use_attention_nemo_shape": True, - "transpose_weights": True, - "use_parallel_embedding": use_parallel_embedding, - } - - # split_factor: in how many parts a TP training node is split - split_factor = inference_tp_size - model_level_weights = defaultdict(list) - - def handle_model_level_weights(model, tp_idx: int, pp_idx: int): - if tp_idx == 0 and pp_idx == 0: - if has_position_embedding: - val = model[get_layer_name("position_embedding", prefix)] - val = torch_to_numpy(val.to(storage_type).cpu()) - model_level_weights["transformer.position_embedding.weight"].append(val) - if pp_idx == 0: - val = model.get("state_dict", model)[get_layer_name("word_embedding", prefix)] - if embedding_scaling: - val = val * float(math.sqrt(hidden_size)) - - vocab_size = val.shape[0] - if use_parallel_embedding: - # Pad vocab_size first - if vocab_size % inference_tp_size != 0: - vocab_size_padded = pad_vocab_size(vocab_size, inference_tp_size) - pad_width = vocab_size_padded - vocab_size - val = torch.nn.functional.pad(val, (0, 0, 0, pad_width), value=0) - - val = torch_to_numpy(val.to(storage_type).cpu()) - model_level_weights["transformer.vocab_embedding.weight"].append(val) - if share_embeddings_and_output: - val = model.get("state_dict", model)[get_layer_name("word_embedding", prefix)] - val = torch_to_numpy(val.to(storage_type).cpu()) - model_level_weights["lm_head.weight"].append(val) - if has_lm_head and pp_idx == training_pp_size - 1: - val = model.get("state_dict", model)[get_layer_name("output_layer", prefix)] - val = torch_to_numpy(val.to(storage_type).cpu()) - model_level_weights["lm_head.weight"].append(val) - - weights_dict = {} - - tp_rank = 0 - - handle_model_level_weights(model, 0, 0) - model = extract_layers_with_prefix(model, transformer_layer_prefix) - - starmap_args = [] - for key, val in model.items(): - if "_extra_state" not in key: - if len(val.size()) == 1: - starmap_args.append( - ( - tp_rank, - out_dir, - split_factor, - # Let's rename/map the key to the old layer name previously. You can try printing out - # the rename_key output of the old llama checkpoint and compare. - rename_key_dist_ckpt(key, 0), - # Since the state dict value has the full layers, let's select the ith layer weights/biases here. - [val], - storage_type, - None, - export_config, - ) - ) - else: - for i in range(num_layers): - starmap_args.append( - ( - tp_rank, - out_dir, - split_factor, - # Let's rename/map the key to the old layer name previously. You can try printing out - # the rename_key output of the old llama checkpoint and compare. - rename_key_dist_ckpt(key, i), - # Since the state dict value has the full layers, let's select the ith layer weights/biases here. - [val[i]], - storage_type, - None, - export_config, - ) - ) - - starmap_args = tqdm(starmap_args, desc="saving weights") - - if args.processes > 1: - with multiprocessing.Pool(args.processes) as pool: - weights_dicts = pool.starmap(split_and_save_weight, starmap_args) - weights_dict_local = {k: v for d in weights_dicts for k, v in d.items()} - else: - # simpler for debug situations - for starmap_arg in starmap_args: - weights_dict_local = split_and_save_weight(*starmap_arg) - - weights_dict.update(weights_dict_local) - - for key, values in model_level_weights.items(): - model_level_weights[key] = np.concatenate(values, axis=0) - - weights_dict[key] = model_level_weights[key] - - if nemo_model_config["tokenizer"].get("library", None) == "huggingface": - tokenizer = AutoTokenizer.from_pretrained( - nemo_model_config["tokenizer"]["type"], use_fast=nemo_model_config["tokenizer"].get("use_fast", False) - ) - else: - tokenizer_config = update_tokenizer_paths(nemo_model_config["tokenizer"], unpacked_checkpoints_dir) - copy_tokenizer_files(tokenizer_config, out_dir) - - tokenizer_config["model"] = os.path.join(out_dir, "tokenizer.model") - tokenizer = build_tokenizer(tokenizer_config) - - return weights_dict, nemo_model_config, tokenizer - - -@torch.no_grad() -def convert_nemo_model(nemo_model, nemo_model_config, storage_type_str, decoder_type=None): - from megatron.core import parallel_state - - is_mcore = nemo_model_config.get("mcore_gpt", False) - - nemo_model_state_dict = nemo_model.state_dict() - prefix, transformer_layer_prefix = get_layer_prefix(nemo_model_state_dict, is_mcore) - has_position_embedding = get_layer_name("position_embedding", prefix) in nemo_model_state_dict - has_lm_head = get_layer_name("output_layer", prefix) in nemo_model_state_dict - has_final_layer_bias = get_layer_name("final_layernorm.bias", transformer_layer_prefix) in nemo_model_state_dict - - tp_rank = parallel_state.get_tensor_model_parallel_rank() - tp_size = parallel_state.get_tensor_model_parallel_world_size() - pp_rank = parallel_state.get_pipeline_model_parallel_rank() - pp_size = parallel_state.get_pipeline_model_parallel_world_size() - pp_group = parallel_state.get_pipeline_model_parallel_group() - # split_factor = 1 - storage_type = str_dtype_to_torch(storage_type_str) - - num_layers = nemo_model_config["num_layers"] - training_tp_size = nemo_model_config.get("tensor_model_parallel_size", 1) - training_pp_size = nemo_model_config.get("pipeline_model_parallel_size", 1) - num_kv_heads = nemo_model_config.get("num_query_groups", 0) - multi_query_mode = nemo_model_config.get("multi_query_mode", False) - num_attention_heads = nemo_model_config["num_attention_heads"] - - # pp currently unsupported so reshard away PP - is_pp_resharding = False - if pp_size > 1: - is_pp_resharding = True - - if num_kv_heads == 0: - if multi_query_mode: - num_kv_heads = 1 - else: - num_kv_heads = num_attention_heads - - export_config = { - "apply_layernorm_1p": nemo_model_config.get("normalization", "") == "layernorm1p", - "tp_size": training_tp_size, - "split_gated_activation": "swiglu" in nemo_model_config.get("activation", "gelu") - and (decoder_type == "gptnext" or is_mcore), - "num_attention_heads": nemo_model_config["num_attention_heads"], - "num_kv_heads": num_kv_heads, - "use_attention_nemo_shape": True, - "transpose_weights": True, - "from_nemo_model": True, - } - - # Gather meta data from first and last PP stage - if is_pp_resharding: - has_lm_head = torch.tensor(has_lm_head).cuda() - src_rank = torch.distributed.get_global_rank(pp_group, pp_size - 1) - torch.distributed.broadcast(has_lm_head, src_rank, group=pp_group) - has_lm_head = has_lm_head.item() - - has_position_embedding = torch.tensor(has_position_embedding).cuda() - src_rank = torch.distributed.get_global_rank(pp_group, 0) - torch.distributed.broadcast(has_position_embedding, src_rank, group=pp_group) - has_position_embedding = has_position_embedding.item() - - has_final_layer_bias = torch.tensor(has_final_layer_bias).cuda() - src_rank = torch.distributed.get_global_rank(pp_group, pp_size - 1) - torch.distributed.broadcast(has_final_layer_bias, src_rank, group=pp_group) - has_final_layer_bias = has_final_layer_bias.item() - - trt_inflight_weights = {} - starmap_args = [] - - def handle_model_level_weights(model, tp_idx: int, pp_idx: int): - def _handle_weights(src_key: str, dst_key: str, pp_src_idx: int, tensor_dim: int): - src_pp_global_rank = torch.distributed.get_global_rank(pp_group, pp_src_idx) - # Broadcast the shape - if pp_idx == pp_src_idx: - gathered_tensor = model.get("state_dict", model)[src_key].type(storage_type).cuda() - shape = torch.IntTensor(list(gathered_tensor.shape)).cuda() - else: - shape = torch.zeros(tensor_dim, dtype=torch.int32).cuda() - torch.distributed.broadcast(shape, src_pp_global_rank, group=pp_group) - - # Collect the tensor - if pp_idx != pp_src_idx: - gathered_tensor = torch.zeros(*shape, dtype=storage_type).cuda() - torch.distributed.broadcast(gathered_tensor, src_pp_global_rank, group=pp_group) - - if "final_layernorm" not in src_key: - gathered_tensor = gathered_tensor.to(storage_type).cpu() - trt_inflight_weights[dst_key] = torch_to_numpy(gathered_tensor) - else: - starmap_args.append( - { - "tp_rank": tp_idx, - "saved_dir": trt_inflight_weights, - "split_factor": 1, - "key": dst_key, - "vals": [gathered_tensor], - "storage_type": storage_type, - "act_range": None, - "config": export_config, - } - ) - - if has_lm_head: - _handle_weights(get_layer_name("output_layer", prefix), "model.lm_head.weight.bin", pp_size - 1, 2) - if has_position_embedding: - _handle_weights(get_layer_name("position_embedding", prefix), "model.wpe.bin", 0, 2) - - _handle_weights(get_layer_name("word_embedding", prefix), "model.wte.bin", 0, 2) - _handle_weights( - get_layer_name("final_layernorm.weight", transformer_layer_prefix), - "final_layernorm.weight", - pp_size - 1, - 1, - ) - - if has_final_layer_bias: - _handle_weights( - get_layer_name("final_layernorm.bias", transformer_layer_prefix), - "final_layernorm.bias", - pp_size - 1, - 1, - ) - - torch.cuda.empty_cache() - - models = [] - - handle_model_level_weights(nemo_model_state_dict, tp_rank, pp_rank) - layers = extract_layers_with_prefix(nemo_model_state_dict, transformer_layer_prefix) - models.append(layers) - - for key in models[0].keys(): - # Skip final_layernorm. - if not key.startswith("layers."): - continue - if "_extra_state" not in key: - starmap_args.append( - { - "tp_rank": tp_rank, - "saved_dir": trt_inflight_weights, - "split_factor": 1, - "key": rename_key(key, pp_rank, num_layers, training_pp_size), - "vals": [model[key] for model in models], - "storage_type": storage_type, - "act_range": None, - "config": export_config, - } - ) - starmap_args = tqdm(starmap_args, desc="saving weights") - for starmap_arg in starmap_args: - save_weight_torch(**starmap_arg) - - # Collect weights from different pp stages - # Assume each rank has the same number of layers - if is_pp_resharding: - collect_pp_weights = {} - for key, val in trt_inflight_weights.items(): - # Skip embedding and final layer - if not key.startswith("model.layers"): - continue - # Convert numpy array to torch tensor and gather weights - curr_weight = trt_inflight_weights[key] - if curr_weight.dtype != np_bfloat16: - curr_weight = torch.tensor(curr_weight).cuda() - else: - curr_weight = torch.tensor(curr_weight.view(np.int16)).view(torch.bfloat16).cuda() - weight_list = [torch.zeros_like(curr_weight) for _ in range(pp_size)] - torch.distributed.all_gather(weight_list, curr_weight, group=pp_group) - # Collect weights name - for rank in range(pp_size): - split_key = key.split(".") - layer_index = get_layer_index(split_key) - split_key[layer_index] = str(int(split_key[layer_index]) + (rank - pp_rank) * num_layers // pp_size) - new_key = ".".join(split_key) - collect_pp_weights[new_key] = torch_to_numpy(weight_list[rank].to(storage_type).cpu()) - - trt_inflight_weights.update(collect_pp_weights) - - vocab_size = trt_inflight_weights["model.wte.bin"].shape[0] * tp_size - - llm_config = nemo_to_llm_config( - nemo_model_config, - vocab_size, - None, - None, - decoder_type=decoder_type, # how to get eos_id and bos_id from different tokenizer? - ) - llm_config.is_mcore = is_mcore - - config = configparser.ConfigParser() - model_name = "llama" if isinstance(llm_config, LlamaConfig) else "gpt" - config[model_name] = {k: str(v) for k, v in vars(llm_config).items()} - config[model_name]["storage_dtype"] = storage_type_str - - return trt_inflight_weights, llm_config - - -def create_out_dir(args): - out_dir = Path(args.out_dir) - if not out_dir.exists(): - out_dir.mkdir(parents=True) - return out_dir - - -def update_tokenizer_paths(tokenizer_config: typing.Dict, unpacked_checkpoints_dir): - def _update_config_entry(key, file_pattern): - old_path = tokenizer_config[key] - if old_path is None: - return - old_path = Path(old_path) - new_path = unpacked_checkpoints_dir.get_tokenizer_file_path("tokenizer", key, file_pattern) - if new_path: - LOGGER.debug(f"Update tokenizer {key} {old_path} -> {new_path}") - tokenizer_config[key] = new_path - elif not old_path.exists(): - LOGGER.warning(f"Tokenizer {key}'s path {old_path} does not exists: set it to None") - tokenizer_config[key] = None - - _update_config_entry("model", "*.model") - _update_config_entry("vocab_file", "*vocab*") - _update_config_entry("merge_file", "*merge*.txt") - - return tokenizer_config - - -def copy_tokenizer_files(config, out_dir): - basenames = { - "model": "tokenizer", - "vocab_file": "vocab", - "merge_file": "merges", - } - - for key in basenames.keys(): - if config[key] is None: - continue - - path = config[key] - - if isinstance(path, str): - path = Path(path) - - if not path.exists(): - LOGGER.debug(f"Tokenizer {key}: {path} file not found") - continue - - dst_path = out_dir / f"{basenames[key]}{path.suffix}" - LOGGER.debug(f"Copy tokenizer {key}: {path}->{dst_path}") - - # Copy 'path' to 'dst_path' without shutil.copy(...) because 'path' may be a TarPath - with path.open('rb') as infile: - with open(dst_path, 'wb') as outfile: - outfile.write(infile.read()) - - -def build_tokenizer(tokenizer): - if isinstance(tokenizer, dict): - tokenizer_config = tokenizer - if tokenizer_config["library"] == "sentencepiece": - return SentencePieceTokenizer(model_path=tokenizer_config["model"]) - elif "GPT2" in tokenizer_config["type"]: - tokenizer = GPT2Tokenizer(tokenizer_config["vocab_file"], tokenizer_config["merge_file"]) - else: - raise ValueError(f'Tokenizer type {tokenizer_config["library"]} not handled') - - if tokenizer.bos_token_id is None: - tokenizer.add_special_tokens({"bos_token": ""}) - if tokenizer.eos_token_id is None: - tokenizer.add_special_tokens({"eos_token": ""}) - else: - try: - # If NeMo tokenizer, monkey patch interface - from nemo.collections.common.tokenizers.tokenizer_spec import TokenizerSpec - - if isinstance(tokenizer, TokenizerSpec): - - def batch_encode_patch(self, ids): - if torch.is_tensor(ids): - ids = ids.cpu().numpy() - return self.ids_to_text(ids) - - tokenizer.bos_token_id = tokenizer.bos_id - tokenizer.eos_token_id = tokenizer.eos_id - tokenizer.encode = tokenizer.text_to_ids - TokenizerSpec.batch_decode = batch_encode_patch - except: - raise TypeError(f'Unsupported tokenizer build input: {type(tokenizer)}') - - return tokenizer diff --git a/nemo/export/trt_llm/nemo/__init__.py b/nemo/export/trt_llm/nemo_ckpt_loader/__init__.py similarity index 86% rename from nemo/export/trt_llm/nemo/__init__.py rename to nemo/export/trt_llm/nemo_ckpt_loader/__init__.py index 19059dfa144a..c9c6f65d27e0 100644 --- a/nemo/export/trt_llm/nemo/__init__.py +++ b/nemo/export/trt_llm/nemo_ckpt_loader/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. -from nemo.export.trt_llm.nemo.sentencepiece_tokenizer import SentencePieceTokenizer +from nemo.export.trt_llm.nemo_ckpt_loader.sentencepiece_tokenizer import SentencePieceTokenizer diff --git a/nemo/export/trt_llm/nemo_ckpt_loader/nemo_file.py b/nemo/export/trt_llm/nemo_ckpt_loader/nemo_file.py new file mode 100644 index 000000000000..09eae628999a --- /dev/null +++ b/nemo/export/trt_llm/nemo_ckpt_loader/nemo_file.py @@ -0,0 +1,406 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import functools +import json +import logging +import os +from pathlib import Path +from typing import Dict, Union + +import numpy as np +import tensorstore # This is important even though not used. Otherwise zarr raises error. +import torch +import yaml +import zarr +from torch.distributed.checkpoint import FileSystemReader +from transformers import AutoTokenizer, PreTrainedTokenizer + +from nemo.export.tarutils import TarPath, ZarrPathStore +from nemo.export.trt_llm.nemo_ckpt_loader.sentencepiece_tokenizer import SentencePieceTokenizer + +LOGGER = logging.getLogger("NeMo") + + +def is_nemo_file(path): + flag = False + + if path is not None: + if len(path) > 5: + pc = Path(path) + if pc.exists(): + if pc.is_file(): + if path[-5 : len(path)] == ".nemo": + flag = True + + return flag + + +class TarFileSystemReader(FileSystemReader): + """Reader that accepts both Path and TarPath checkpoint directory. + + The FileSystemReader works with TarPath, but expects a pure Path. + It's enough to skip the Path check in __init__. + """ + + def __init__(self, path: Union[Path, TarPath]) -> None: + """No call to super().__init__ because it expects pure Path.""" + self.path = path + self.storage_data = dict() + + +def load_sharded_metadata_torch_dist(checkpoint_dir: Union[Path, TarPath], torch_tensor=True): + fs_reader = TarFileSystemReader(checkpoint_dir) + metadata = fs_reader.read_metadata() + + state_dict = { + k: torch.empty(tp.size, dtype=tp.properties.dtype) + for k, tp in metadata.state_dict_metadata.items() + if isinstance(tp, TensorStorageMetadata) + } + load_state_dict( + state_dict, + storage_reader=fs_reader, + no_dist=True, + ) + + if not torch_tensor: + for k, v in state_dict.items(): + if v.dtype == torch.bfloat16: + state_dict[k] = v.view(torch.int16).numpy().view(np_bfloat16) + else: + state_dict[k] = v.numpy() + return state_dict + + +def load_sharded_metadata_zarr(checkpoint_dir: Union[Path, TarPath], torch_tensor=True): + sharded_state_dict = {} + for subdir in checkpoint_dir.iterdir(): + if not subdir.is_dir() or not (subdir / '.zarray').exists(): + continue + key = subdir.name + + zstore = ZarrPathStore(subdir) + arr = zarr.open(zstore, 'r') + + if torch_tensor: + # sharded_state_dict[key] = torch.from_numpy(arr[:].astype("float32")).to(dtype=torch.bfloat16) + if arr.dtype.name == "bfloat16": + sharded_state_dict[key] = torch.from_numpy(arr[:].view(np.int16)).view(torch.bfloat16) + else: + sharded_state_dict[key] = torch.from_numpy(arr[:]) + else: + sharded_state_dict[key] = arr[:] + + return sharded_state_dict + + +def load_sharded_metadata(checkpoint_dir: Union[Path, TarPath], torch_tensor=True): + with (checkpoint_dir / 'metadata.json').open(mode='r') as f: + config_dict = json.load(f) + if config_dict['sharded_backend'] == 'zarr': + return load_sharded_metadata_zarr(checkpoint_dir, torch_tensor) + elif config_dict['sharded_backend'] == 'torch_dist': + return load_sharded_metadata_torch_dist(checkpoint_dir, torch_tensor) + else: + raise NotImplementedError(f'Distributed checkpoint backend {config_dict["sharded_backend"]} not supported') + + +def update_tokenizer_paths(tokenizer_config: Dict, unpacked_checkpoints_dir): + def _update_config_entry(key, file_pattern): + old_path = tokenizer_config[key] + if old_path is None: + return + old_path = Path(old_path) + new_path = unpacked_checkpoints_dir.get_tokenizer_file_path("tokenizer", key, file_pattern) + if new_path: + LOGGER.debug(f"Update tokenizer {key} {old_path} -> {new_path}") + tokenizer_config[key] = new_path + elif not old_path.exists(): + LOGGER.warning(f"Tokenizer {key}'s path {old_path} does not exists: set it to None") + tokenizer_config[key] = None + + _update_config_entry("model", "*.model") + _update_config_entry("vocab_file", "*vocab*") + _update_config_entry("merge_file", "*merge*.txt") + + return tokenizer_config + + +def copy_tokenizer_files(config, out_dir): + basenames = { + "model": "tokenizer", + "vocab_file": "vocab", + "merge_file": "merges", + } + + for key in basenames.keys(): + if config[key] is None: + continue + + path = config[key] + + if isinstance(path, str): + path = Path(path) + + if not path.exists(): + LOGGER.debug(f"Tokenizer {key}: {path} file not found") + continue + + dst_path = out_dir / f"{basenames[key]}{path.suffix}" + LOGGER.debug(f"Copy tokenizer {key}: {path}->{dst_path}") + + # Copy 'path' to 'dst_path' without shutil.copy(...) because 'path' may be a TarPath + with path.open('rb') as infile: + with open(dst_path, 'wb') as outfile: + outfile.write(infile.read()) + + +def get_tokenzier(tokenizer_dir_or_path: Path) -> PreTrainedTokenizer: + """Loads the tokenizer from the decoded NEMO weights dir.""" + if os.path.isdir(os.path.join(tokenizer_dir_or_path, "huggingface_tokenizer")): + return AutoTokenizer.from_pretrained(os.path.join(tokenizer_dir_or_path, "huggingface_tokenizer")) + + model_path = tokenizer_dir_or_path / "tokenizer.model" if tokenizer_dir_or_path.is_dir() else tokenizer_dir_or_path + tokenizer_config = {"library": "sentencepiece", "model": str(model_path)} + return build_tokenizer(tokenizer_config) + + +def build_tokenizer(tokenizer): + if isinstance(tokenizer, dict): + tokenizer_config = tokenizer + if tokenizer_config["library"] == "sentencepiece": + return SentencePieceTokenizer(model_path=tokenizer_config["model"]) + elif "GPT2" in tokenizer_config["type"]: + tokenizer = GPT2Tokenizer(tokenizer_config["vocab_file"], tokenizer_config["merge_file"]) + else: + raise ValueError(f'Tokenizer type {tokenizer_config["library"]} not handled') + + if tokenizer.bos_token_id is None: + tokenizer.add_special_tokens({"bos_token": ""}) + if tokenizer.eos_token_id is None: + tokenizer.add_special_tokens({"eos_token": ""}) + else: + try: + # If NeMo tokenizer, monkey patch interface + from nemo.collections.common.tokenizers.tokenizer_spec import TokenizerSpec + + if isinstance(tokenizer, TokenizerSpec): + + def batch_encode_patch(self, ids): + if torch.is_tensor(ids): + ids = ids.cpu().numpy() + return self.ids_to_text(ids) + + tokenizer.bos_token_id = tokenizer.bos_id + tokenizer.eos_token_id = tokenizer.eos_id + tokenizer.encode = tokenizer.text_to_ids + TokenizerSpec.batch_decode = batch_encode_patch + except: + raise TypeError(f'Unsupported tokenizer build input: {type(tokenizer)}') + + return tokenizer + + +def load_nemo_model(nemo_ckpt: Union[str, Path], nemo_export_dir: Union[str, Path]): + + if not os.path.exists(nemo_ckpt): + raise TypeError("%s does not exist", nemo_ckpt) + + if os.path.isdir(nemo_ckpt): + nemo_dir = Path(nemo_ckpt) + else: + nemo_dir = TarPath(nemo_ckpt) + + try: + unpacked_checkpoint_dir = UnpackedNemoCheckpointDir(nemo_dir, load_checkpoints_to_cpu=True) + + dist_ckpt_folder = nemo_dir / "model_weights" + if dist_ckpt_folder.exists(): + model = load_sharded_metadata(dist_ckpt_folder) + nemo_model_config = unpacked_checkpoint_dir.model_config + + if nemo_model_config["tokenizer"].get("library", None) == "huggingface": + tokenizer = AutoTokenizer.from_pretrained( + nemo_model_config["tokenizer"]["type"], + use_fast=nemo_model_config["tokenizer"].get("use_fast", False), + ) + else: + tokenizer_config = update_tokenizer_paths(nemo_model_config["tokenizer"], unpacked_checkpoint_dir) + copy_tokenizer_files(tokenizer_config, nemo_export_dir) + + tokenizer_config["model"] = os.path.join(nemo_export_dir, "tokenizer.model") + tokenizer = build_tokenizer(tokenizer_config) + else: + raise Exception( + "Not a supported nemo file format. " "Only distributed mcore nemo checkpoints are support." + ) + finally: + if isinstance(nemo_dir, TarPath): + nemo_dir.tarobject.close() + + return model, nemo_model_config, tokenizer + + +def cpu_map_location(storage, loc): + return storage.cpu() + + +def gpu_map_location(storage, loc): + if loc.startswith("cuda"): + training_gpu_idx = int(loc.split(":")[1]) + inference_gpu_idx = training_gpu_idx % torch.cuda.device_count() + return storage.cuda(inference_gpu_idx) + elif loc.startswith("cpu"): + return storage.cpu() + else: + raise ValueError(f"Not handled {loc}") + + +class UnpackedNemoCheckpointDir: + def __init__( + self, + checkpoints_dir: Union[Path, TarPath], + load_checkpoints_to_cpu: bool = False, + ): + assert isinstance(checkpoints_dir, (Path, TarPath)) + self._checkpoints_dir = checkpoints_dir + self._load_checkpoints_to_cpu = load_checkpoints_to_cpu + + @property + @functools.lru_cache + def model_config(self): + model_config = None + + model_config_filename = "model_config.yaml" + model_configs_paths = list(self._checkpoints_dir.rglob(model_config_filename)) + if model_configs_paths: + if len(model_configs_paths) > 1: + LOGGER.debug(f"There are more than single {model_config_filename} in" f" {self._checkpoints_dir}") + model_config_path = model_configs_paths[0] + LOGGER.debug("Loading model config from %s", model_config_path) + with model_config_path.open("r") as model_config_file: + model_config = yaml.load(model_config_file, Loader=yaml.SafeLoader) + else: + LOGGER.debug("Searching model config in checkpoints") + # try to obtain from checkpoint + checkpoint_name = self.checkpoint_name + checkpoints_paths = sorted(self._checkpoints_dir.rglob(checkpoint_name)) + if checkpoints_paths: + # assume that parallel ranks 0 checkpoint should have model config embedded + checkpoint_path = checkpoints_paths[0] + + map_location_fn = cpu_map_location if self._load_checkpoints_to_cpu else gpu_map_location + + model_00 = torch.load(checkpoint_path, map_location=map_location_fn) + if "hyper_parameters" in model_00 and "cfg" in model_00["hyper_parameters"]: + model_config = model_00["hyper_parameters"]["cfg"] + LOGGER.debug("Loaded model config from checkpoint %s", checkpoint_path) + else: + LOGGER.debug("Could not find model config in checkpoint %s", checkpoint_path) + + del model_00 + + if model_config is None: + LOGGER.warning("Could not find checkpoint with NeMo model config in %s", self._checkpoints_dir) + + LOGGER.debug("Loaded model config %s", model_config) + + return model_config + + @property + def checkpoints_dir(self): + return self._checkpoints_dir + + def get_checkpoints_paths(self, tensor_model_parallel_size=1, pipeline_model_parallel_size=1): + """Injects tensor/pipeline model parallel ranks into the filepath. + Does nothing if not using model parallelism. + """ + checkpoint_path_without_rank = self.checkpoints_dir / self.checkpoint_name + + def _inject_parallel_ranks(tp_rank, pp_rank): + if tensor_model_parallel_size > 1 or pipeline_model_parallel_size > 1: + if pipeline_model_parallel_size is None or pipeline_model_parallel_size == 1: + checkpoint_path = ( + checkpoint_path_without_rank.parent + / f"mp_rank_{tp_rank:02d}" + / checkpoint_path_without_rank.name + ) + else: + checkpoint_path = ( + checkpoint_path_without_rank.parent + / f"tp_rank_{tp_rank:02d}_pp_rank_{pp_rank:03d}" + / checkpoint_path_without_rank.name + ) + return checkpoint_path + else: + return checkpoint_path_without_rank + + return [ + [ + _inject_parallel_ranks(tp_rank=tp_rank, pp_rank=pp_rank) + for pp_rank in range(pipeline_model_parallel_size) + ] + for tp_rank in range(tensor_model_parallel_size) + ] + + @property + @functools.lru_cache + def checkpoint_name(self): + patterns = [ + "model_weights.ckpt", # older megatron checkpoints + "*last.ckpt", # newer format of checkpoints + ] + for pattern in patterns: + model_files = sorted(list(self._checkpoints_dir.rglob(pattern))) + if model_files: + return model_files[0].name + + raise ValueError(f"Could not find checkpoint files in {self._checkpoints_dir}") + + @functools.lru_cache + def get_tokenizer_file_path(self, tokenizer_key, file_key, default_filename_pattern): + model_config = self.model_config + file_property = None + if tokenizer_key in model_config and file_key in model_config[tokenizer_key]: + file_property = model_config[tokenizer_key][file_key] + elif file_key in model_config: + file_property = model_config[file_key] + + LOGGER.debug("model_config[%s][%s]=%s", tokenizer_key, file_key, file_property) + + if file_property and file_property.startswith("nemo:"): + filename = file_property.split("nemo:")[1] + filename_pattern = f"*{filename}" + elif file_property and file_property.startswith("/artifacts/"): + filename = Path(file_property).name + filename_pattern = f"*{filename}" + elif file_property is None or file_property == "None": + filename_pattern = None + else: + filename_pattern = default_filename_pattern + LOGGER.warning( + f"Tokenizer file from config: {tokenizer_key}.{file_key}={file_property} " + f"looks like unsupported path. Pattern {filename_pattern} will be used." + ) + + file_path = None + if filename_pattern is not None: + files_paths = list(self._checkpoints_dir.glob(filename_pattern)) + if files_paths: + assert len(files_paths) == 1 + file_path = files_paths[0] + + return file_path diff --git a/nemo/export/trt_llm/nemo/sentencepiece_tokenizer.py b/nemo/export/trt_llm/nemo_ckpt_loader/sentencepiece_tokenizer.py similarity index 100% rename from nemo/export/trt_llm/nemo/sentencepiece_tokenizer.py rename to nemo/export/trt_llm/nemo_ckpt_loader/sentencepiece_tokenizer.py diff --git a/nemo/export/trt_llm/qnemo/tokenizer_utils.py b/nemo/export/trt_llm/qnemo/tokenizer_utils.py index 3fde26253af6..4b0775a0aa2a 100644 --- a/nemo/export/trt_llm/qnemo/tokenizer_utils.py +++ b/nemo/export/trt_llm/qnemo/tokenizer_utils.py @@ -17,7 +17,7 @@ from omegaconf import OmegaConf from transformers import AutoTokenizer -from nemo.export.trt_llm.nemo.sentencepiece_tokenizer import SentencePieceTokenizer +from nemo.export.trt_llm.nemo_ckpt_loader.sentencepiece_tokenizer import SentencePieceTokenizer # TODO: use get_nmt_tokenizer helper below to instantiate tokenizer once environment / dependencies get stable # from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer diff --git a/nemo/export/trt_llm/tensorrt_llm_build.py b/nemo/export/trt_llm/tensorrt_llm_build.py index 30490cc91254..bbafec319fd5 100644 --- a/nemo/export/trt_llm/tensorrt_llm_build.py +++ b/nemo/export/trt_llm/tensorrt_llm_build.py @@ -13,25 +13,15 @@ # limitations under the License. -import argparse import logging -import os -import time -from pathlib import Path -from typing import List - import tensorrt_llm from tensorrt_llm._common import check_max_num_tokens -from tensorrt_llm._utils import np_dtype_to_trt from tensorrt_llm.builder import BuildConfig, Builder from tensorrt_llm.commands.build import build as build_trtllm from tensorrt_llm.logger import logger from tensorrt_llm.lora_manager import LoraBuildConfig from tensorrt_llm.models.modeling_utils import add_lora, optimize_model, preprocess_weights -from tensorrt_llm.network import net_guard from tensorrt_llm.plugin import PluginConfig -from tensorrt_llm.plugin.plugin import ContextFMHAType -from tensorrt_llm.quantization import QuantMode MODEL_NAME = "NeMo" diff --git a/nemo/export/trt_llm/tensorrt_llm_run.py b/nemo/export/trt_llm/tensorrt_llm_run.py index f79d6ddce4bc..8fdd747dcb90 100644 --- a/nemo/export/trt_llm/tensorrt_llm_run.py +++ b/nemo/export/trt_llm/tensorrt_llm_run.py @@ -13,6 +13,7 @@ # limitations under the License. +import csv import json import logging import os @@ -30,8 +31,6 @@ from tensorrt_llm.runtime import ModelConfig, ModelRunner, ModelRunnerCpp, SamplingConfig from transformers import PreTrainedTokenizer -from nemo.export.trt_llm.nemo_utils import to_word_list_format # isort:skip - LOGGER = logging.getLogger("NeMo") @@ -627,3 +626,61 @@ def unload(host_context: TensorrtLLMHostContext): global tensorrt_llm_worker_context tensorrt_llm_worker_context.decoder = None tensorrt_llm_worker_context = TensorrtLLMWorkerContext() + + +def to_word_list_format( + word_dict: List[List[str]], + tokenizer=None, + ref_str="", +): + ''' + format of word_dict + len(word_dict) should be same to batch_size + word_dict[i] means the words for batch i + len(word_dict[i]) must be 1, which means it only contains 1 string + This string can contains several sentences and split by ",". + For example, if word_dict[2] = " I am happy, I am sad", then this function will return + the ids for two short sentences " I am happy" and " I am sad". + ''' + assert tokenizer is not None, "need to set tokenizer" + + flat_ids = [] + offsets = [] + # The encoding of a single word can't always be trusted. See + # https://github.com/NVIDIA/NeMo/blob/bb575b72fd0be51ae10cc77d9f89ddb9e9d3b96d/nemo/collections/nlp/modules/common/text_generation_strategy.py#L229 + ids_ref = tokenizer.encode(ref_str) + for word_dict_item in word_dict: + item_flat_ids = [] + item_offsets = [] + + if isinstance(word_dict_item[0], bytes): + word_dict_item = [word_dict_item[0].decode()] + + words = list(csv.reader(word_dict_item))[0] + for word in words: + ids = tokenizer.encode(f"{ref_str}{word}") + if ids[0 : len(ids_ref)] == ids_ref: + # It worked! We can obtain the token(s) associated to `word` by stripping the prefix tokens. + ids = ids[len(ids_ref) :] + else: + # Unfortunately the prefix was merged with `word`. We could try with a different prefix, but + # for now we just use the basic encoding since this should be a very rare edge case. + ids = tokenizer.encode(word) + logging.warning(f"The encoding of word '{word}' into tokens {ids} might be incorrect") + + if len(ids) == 0: + continue + + item_flat_ids += ids + item_offsets.append(len(ids)) + + flat_ids.append(np.array(item_flat_ids)) + offsets.append(np.cumsum(np.array(item_offsets))) + + pad_to = max(1, max(len(ids) for ids in flat_ids)) + + for i, (ids, offs) in enumerate(zip(flat_ids, offsets)): + flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0) + offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1) + + return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2))