diff --git a/openfl-tutorials/Federated_PyTorch_LLM.ipynb b/openfl-tutorials/Federated_PyTorch_LLM.ipynb deleted file mode 100644 index ba5b2bede9..0000000000 --- a/openfl-tutorials/Federated_PyTorch_LLM.ipynb +++ /dev/null @@ -1,510 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Federated PyTorch LLM Tutorial" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This notebook is an example of LLM fine-tuning\n", - "\n", - "Custom DataLoader is used with OpenFL Python API" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Install dependencies if not already installed\n", - "!pip install torch torchvision peft transformers sentencepiece huggingface_hub accelerate datasets evaluate seqeval\n", - "%load_ext autoreload\n", - "%autoreload 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from typing import Any, Mapping\n", - "import numpy as np\n", - "import openfl.native as fx\n", - "import torch\n", - "import torch as pt\n", - "from datasets import Dataset, load_dataset, load_metric\n", - "from openfl.federated import PyTorchTaskRunner\n", - "from openfl.federated.task.runner_pt import change_tags\n", - "from openfl.utilities import Metric, TensorKey\n", - "from openfl.utilities.data_splitters import EqualNumPyDataSplitter\n", - "from peft import LoraConfig, TaskType, get_peft_model\n", - "from peft.utils import get_peft_model_state_dict, set_peft_model_state_dict\n", - "from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss\n", - "from torch.optim import AdamW\n", - "from torch.utils.data import DataLoader\n", - "from tqdm import tqdm\n", - "import torch.nn as nn\n", - "from transformers.trainer_pt_utils import get_parameter_names\n", - "from transformers import (AutoModelForSequenceClassification,\n", - " AutoTokenizer, DataCollatorWithPadding, get_scheduler)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "After importing the required packages, the next step is setting up our openfl workspace. To do this, simply run the `fx.init()` command as follows:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Setup default workspace, logging, etc.\n", - "fx.init('torch_cnn_mnist')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now we are ready to define our dataset and model to perform federated learning on. The dataset should be composed of a numpy arrayWe start with a simple Roberta model that is trained on the glue mrpc dataset. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Download the data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def get_glue_mrpc_dataset(tokenizer):\n", - " dataset = load_dataset(\"glue\", \"mrpc\")\n", - "\n", - " def tokenize_function(examples):\n", - " # max_length=None => use the model max length (it's actually the default)\n", - " outputs = tokenizer(\n", - " examples[\"sentence1\"],\n", - " examples[\"sentence2\"],\n", - " truncation=True,\n", - " max_length=None,\n", - " )\n", - " return outputs\n", - "\n", - " tokenized_datasets = dataset.map(\n", - " tokenize_function,\n", - " batched=True,\n", - " remove_columns=[\"idx\", \"sentence1\", \"sentence2\"],\n", - " )\n", - " tokenized_datasets = tokenized_datasets.rename_column(\"label\", \"labels\")\n", - " tokenized_datasets.set_format(\"torch\")\n", - " data_collator = DataCollatorWithPadding(tokenizer=tokenizer, padding=\"longest\")\n", - " return data_collator, tokenized_datasets\n", - "\n", - "base_model_name = \"roberta-base\"\n", - "padding_side = \"right\"\n", - "tokenizer = AutoTokenizer.from_pretrained(base_model_name, padding_side=padding_side)\n", - "if getattr(tokenizer, \"pad_token_id\") is None:\n", - " tokenizer.pad_token_id = tokenizer.eos_token_id\n", - "data_collator, tokenized_datasets = get_glue_mrpc_dataset(tokenizer)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Describe the dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "class GlueMrpc(Dataset):\n", - " \"\"\"\n", - " Has 5.8k pairs of sentences with annotations if the two sentences are equivalent\n", - " \"\"\" \n", - " def get_shape(self):\n", - " \n", - " if not hasattr(self, 'saved_shape'):\n", - " self.saved_shape = max([len(i) for i in self.data['input_ids']])\n", - " return self.saved_shape\n", - "\n", - "train_set = GlueMrpc.from_dict(tokenized_datasets['train'].to_dict())\n", - "valid_set = GlueMrpc.from_dict(tokenized_datasets['test'].to_dict())\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Implement Federated dataset\n", - "We have to implement `split` method" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "class GlueMrpcFederatedDataset(DataLoader):\n", - " def __init__(self, train_set, valid_set, batch_size, data_collator=None):\n", - " self.data_splitter = EqualNumPyDataSplitter(shuffle=True)\n", - " if isinstance(train_set,Dataset):\n", - " self.train_set = GlueMrpc.from_dict(train_set.to_dict())\n", - " else:\n", - " self.train_set = train_set\n", - " \n", - " if isinstance(valid_set,Dataset):\n", - " self.valid_set = GlueMrpc.from_dict(valid_set.to_dict())\n", - " else:\n", - " self.valid_set = valid_set \n", - " \n", - " self.batch_size = batch_size\n", - " self.data_collator = data_collator\n", - " \n", - " def split(self, num_collaborators):\n", - " train_split = self.data_splitter.split(self.train_set, num_collaborators)\n", - " valid_split = self.data_splitter.split(self.valid_set, num_collaborators)\n", - " return [\n", - " GlueMrpcFederatedDataset(\n", - " self.train_set.select(train_split[i]),\n", - " self.valid_set.select(valid_split[i]),\n", - " self.batch_size\n", - " )\n", - " for i in range(num_collaborators)\n", - " ]\n", - " \n", - " def get_feature_shape(self):\n", - " return self.train_set.get_shape()\n", - " \n", - " def get_train_loader(self, num_batches=None):\n", - " return DataLoader(self.train_set, batch_size=self.batch_size, collate_fn=data_collator)\n", - " \n", - " def get_valid_loader(self):\n", - " return DataLoader(self.valid_set, batch_size=self.batch_size, collate_fn=data_collator)\n", - " \n", - " def get_train_data_size(self):\n", - " return len(self.train_set)\n", - " \n", - " def get_valid_data_size(self):\n", - " return len(self.valid_set)\n", - " \n", - "fl_data = GlueMrpcFederatedDataset(train_set, valid_set, batch_size=32)\n", - "metric = load_metric('glue', \"mrpc\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Define model" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "class LLMTaskRunner(PyTorchTaskRunner):\n", - " def __init__(\n", - " self, base_model_name, data_loader, device=None, metric=None, **kwargs\n", - " ):\n", - " kwargs[\"data_loader\"] = data_loader\n", - " super().__init__(device, **kwargs)\n", - " self.base_model_name = base_model_name\n", - " self.metric = metric\n", - " self._init_model()\n", - " self._init_optimizer()\n", - " self.save_models = []\n", - "\n", - " def _init_model(self):\n", - " model = AutoModelForSequenceClassification.from_pretrained(\n", - " self.base_model_name, return_dict=True\n", - " )\n", - " peft_config = LoraConfig(\n", - " task_type=TaskType.SEQ_CLS,\n", - " inference_mode=False,\n", - " r=16,\n", - " lora_alpha=16,\n", - " lora_dropout=0.1,\n", - " bias=\"lora_only\",\n", - " )\n", - " self.model = get_peft_model(model, peft_config)\n", - "\n", - " def _init_optimizer(self):\n", - " ALL_LAYERNORM_LAYERS = [nn.LayerNorm]\n", - " decay_parameters = get_parameter_names(self.model, ALL_LAYERNORM_LAYERS)\n", - " decay_parameters = [name for name in decay_parameters if \"bias\" not in name]\n", - "\n", - " optimizer_grouped_parameters = [\n", - " {\n", - " \"params\": [\n", - " p\n", - " for n, p in self.model.named_parameters()\n", - " if (n in decay_parameters and p.requires_grad)\n", - " ],\n", - " \"weight_decay\": 0.01,\n", - " },\n", - " {\n", - " \"params\": [\n", - " p\n", - " for n, p in self.model.named_parameters()\n", - " if (n not in decay_parameters and p.requires_grad)\n", - " ],\n", - " \"weight_decay\": 0.0,\n", - " },\n", - " ]\n", - " self.optimizer = AdamW(optimizer_grouped_parameters, lr=0.001)\n", - " self.lr_scheduler = get_scheduler(\n", - " name=\"linear\",\n", - " optimizer=self.optimizer,\n", - " num_warmup_steps=0,\n", - " num_training_steps=len(self.data_loader.train_set) * 5,\n", - " )\n", - "\n", - " self.training_round_completed = False\n", - " self.initialize_tensorkeys_for_functions()\n", - "\n", - " def train(self):\n", - " return self.model.train()\n", - "\n", - " def state_dict(self):\n", - " return get_peft_model_state_dict(self.model)\n", - "\n", - " def load_state_dict(self, state_dict: Mapping[str, Any], strict: bool = True):\n", - " return set_peft_model_state_dict(self.model, state_dict)\n", - "\n", - " def validate(\n", - " self, col_name, round_num, input_tensor_dict, use_tqdm=False, **kwargs\n", - " ):\n", - " \"\"\"Validate.\n", - "\n", - " Run validation of the model on the local data.\n", - "\n", - " Args:\n", - " col_name: Name of the collaborator\n", - " round_num: What round is it\n", - " input_tensor_dict: Required input tensors (for model)\n", - " use_tqdm (bool): Use tqdm to print a progress bar (Default=True)\n", - "\n", - " Returns:\n", - " global_output_dict: Tensors to send back to the aggregator\n", - " local_output_dict: Tensors to maintain in the local TensorDB\n", - "\n", - " \"\"\"\n", - " self.save_models.append(input_tensor_dict.copy())\n", - " self.rebuild_model(round_num, input_tensor_dict, validation=True)\n", - " self.model.eval()\n", - " \n", - "\n", - " self.model.to(self.device)\n", - " val_score = 0\n", - " total_samples = 0\n", - "\n", - " loader = self.data_loader.get_valid_loader()\n", - " if use_tqdm:\n", - " loader = tqdm(loader, desc=\"validate\")\n", - "\n", - " with pt.no_grad():\n", - " for sample in loader:\n", - " samples = sample[\"input_ids\"].shape[0]\n", - " total_samples += samples\n", - " output = self.model(**sample)\n", - " # get the index of the max log-probability\n", - " logits = output.logits\n", - " predictions = torch.argmax(logits, dim=-1)\n", - " metric.add_batch(predictions=predictions, references=sample[\"labels\"])\n", - " val_score = metric.compute()[\"accuracy\"]\n", - "\n", - " origin = col_name\n", - " suffix = \"validate\"\n", - " if kwargs[\"apply\"] == \"local\":\n", - " suffix += \"_local\"\n", - " else:\n", - " suffix += \"_agg\"\n", - " tags = (\"metric\",)\n", - " tags = change_tags(tags, add_field=suffix)\n", - " # TODO figure out a better way to pass in metric for this pytorch\n", - " # validate function\n", - " output_tensor_dict = {\n", - " TensorKey(\"acc\", origin, round_num, True, tags): np.array(val_score)\n", - " }\n", - "\n", - " # Empty list represents metrics that should only be stored locally\n", - " return output_tensor_dict, {}\n", - "\n", - " def train_epoch(self, batch_generator) -> Metric:\n", - " \"\"\"Train single epoch.\n", - "\n", - " Override this function in order to use custom training.\n", - "\n", - " Args:\n", - " batch_generator: Train dataset batch generator. Yields (samples, targets) tuples of\n", - " size = `self.data_loader.batch_size`.\n", - " Returns:\n", - " Metric: An object containing name and np.ndarray value.\n", - " \"\"\"\n", - " losses = []\n", - " for sample in batch_generator:\n", - " self.model.zero_grad()\n", - " output = self.model(**sample)\n", - " loss = output.loss\n", - " loss.backward()\n", - " torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)\n", - " self.optimizer.step()\n", - " self.lr_scheduler.step()\n", - " losses.append(loss.detach().cpu().numpy())\n", - " loss = np.mean(losses)\n", - " if self.model.config.problem_type == \"regression\":\n", - " loss_fct = MSELoss()\n", - " elif self.model.config.problem_type == \"single_label_classification\":\n", - " loss_fct = CrossEntropyLoss()\n", - " elif self.model.config.problem_type == \"multi_label_classification\":\n", - " loss_fct = BCEWithLogitsLoss()\n", - " return Metric(name=loss_fct._get_name(), value=np.array(loss))\n", - "\n", - " def save_native(\n", - " self,\n", - " filepath,\n", - " model_state_dict_key=\"model_state_dict\",\n", - " optimizer_state_dict_key=\"optimizer_state_dict\",\n", - " **kwargs,\n", - " ):\n", - " \"\"\"\n", - " Save model and optimizer states in a picked file specified by the \\\n", - " filepath. model_/optimizer_state_dicts are stored in the keys provided. \\\n", - " Uses pt.save().\n", - "\n", - " Args:\n", - " filepath (string) : Path to pickle file to be\n", - " created by pt.save().\n", - " model_state_dict_key (string) : key for model state dict\n", - " in pickled file.\n", - " optimizer_state_dict_key (string) : key for optimizer state\n", - " dict in picked file.\n", - " kwargs : unused\n", - "\n", - " Returns:\n", - " None\n", - " \"\"\"\n", - " pickle_dict = {\n", - " model_state_dict_key: get_peft_model_state_dict(self.model),\n", - " optimizer_state_dict_key: self.optimizer.state_dict(),\n", - " }\n", - " pt.save(pickle_dict, filepath)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "num_collaborators = 2\n", - "collaborator_models = [\n", - " LLMTaskRunner(\n", - " base_model_name,\n", - " data_loader=data_slice,\n", - " metric=metric\n", - " )\n", - " for data_slice in fl_data.split(num_collaborators)]\n", - "collaborators = {'one':collaborator_models[0],'two':collaborator_models[1]}#, 'three':collaborator_models[2]}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Original TinyImageNet dataset\n", - "print(f'Original training data size: {len(fl_data.train_set)}')\n", - "print(f'Original validation data size: {len(fl_data.valid_set)}\\n')\n", - "\n", - "#Collaborator one's data\n", - "for i, model in enumerate(collaborator_models):\n", - " print(f'Collaborator {i}\\'s training data size: {len(model.data_loader.train_set)}')\n", - " print(f'Collaborator {i}\\'s validation data size: {len(model.data_loader.valid_set)}\\n')\n", - "\n", - "#Collaborator three's data\n", - "#print(f'Collaborator three\\'s training data size: {len(collaborator_models[2].data_loader.X_train)}')\n", - "#print(f'Collaborator three\\'s validation data size: {len(collaborator_models[2].data_loader.X_valid)}')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Run experiment, return trained FederatedModel\n", - "final_fl_model = fx.run_experiment(collaborators,{'aggregator.settings.rounds_to_train':10,\"tasks.train.kwargs.epochs\":2})" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Save final model\n", - "final_fl_model.save_native('final_model.pth')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "llama-env", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.0" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -}