forked from karpathy/llama2.c
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wiki.py
94 lines (84 loc) · 3.54 KB
/
wiki.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Download, preprocess and serve the TinyStories dataset as a DataLoader.
"""
import argparse
import glob
import json
import os
import random
from typing import List
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import numpy as np
import requests
import sentencepiece as spm
import torch
import torch.distributed as dist
from tqdm import tqdm
from tokenizer import Tokenizer
DATA_CACHE_DIR = "data"
DATA_NAME_DIRS = [
"wikipedia_en",
"wiki_zh"
]
class PretokDataset(torch.utils.data.IterableDataset):
"""Loads pretokenized examples from disk and yields them as PyTorch tensors."""
def __init__(self, split, max_seq_len, vocab_size, vocab_source):
super().__init__()
self.split = split
self.max_seq_len = max_seq_len
self.vocab_size = vocab_size
self.vocab_source = vocab_source
def __iter__(self):
# get worker info within a DataLoader
worker_info = torch.utils.data.get_worker_info()
worker_id = worker_info.id if worker_info else 0
# get DDP rank info
rank = dist.get_rank() if dist.is_initialized() else 0
# combine the worker_id and worker_rank to create a unique seed for rng
seed = 42 + worker_id + 1337 * rank
rng = random.Random(seed)
print(f"Created a PretokDataset with rng seed {seed}")
shard_filenames = []
for data_name in DATA_NAME_DIRS:
if self.vocab_source == "llama2":
# the .bin files are right along the .json files
bin_dir = os.path.join(DATA_CACHE_DIR, data_name)
sub_shards = sorted(glob.glob(os.path.join(bin_dir, "*.bin")))
elif self.vocab_source == "custom":
# the .bin files are in tok{N} directory
bin_dir = os.path.join(DATA_CACHE_DIR, data_name, f"tok{self.vocab_size}")
sub_shards = sorted(glob.glob(os.path.join(bin_dir, "*.bin")))
sub_shards = sub_shards[1:] if self.split == "train" else sub_shards[:1]
shard_filenames.extend(sub_shards)
# train/test split. let's use only shard 0 for test split, rest train
assert len(shard_filenames)>0, f"No bin files found in {bin_dir}"
while True:
rng.shuffle(shard_filenames)
for shard in shard_filenames:
# open the dataset for reading but keep it on disk with memmap
m = np.memmap(shard, dtype=np.uint16, mode="r")
num_batches = len(m) // self.max_seq_len
num_batches -= 1 # drop the last partial batch
assert num_batches > 0, "this shard is way too small? investigate."
ixs = list(range(num_batches))
rng.shuffle(ixs)
for ix in ixs:
start = ix * self.max_seq_len
end = start + self.max_seq_len + 1
# calling .astype will copy the data into a new numpy array, now in RAM
chunk = torch.from_numpy((m[start:end]).astype(np.int64))
x = chunk[:-1]
y = chunk[1:]
yield x, y
class Task:
@staticmethod
def iter_batches(batch_size, device, num_workers=0, **dataset_kwargs):
ds = PretokDataset(**dataset_kwargs)
dl = torch.utils.data.DataLoader(
ds, batch_size=batch_size, pin_memory=True, num_workers=num_workers
)
for x, y in dl:
x = x.to(device, non_blocking=True)
y = y.to(device, non_blocking=True)
yield x, y