-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheccorun.py
169 lines (146 loc) · 4.96 KB
/
eccorun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import argparse
import gzip
import json
import glob
import re
import itertools
def yield_examples(args):
with gzip.open(args.ecco_jsonl,"rt") as f:
for idx,line in enumerate(f):
if idx%args.total_workers==args.worker_rank:
example_dict=json.loads(line)
yield example_dict
def gather_all_completed(args):
"""gathers everything completed by this rank"""
done=set()
all_job_files=glob.glob(f"{args.out_dir}/rank_{args.worker_rank}_*.completed.txt")
for fname in all_job_files:
with open(fname) as f:
for url in f:
url=url.strip()
done.add(url)
return done
def gather_all_failed(args):
"""gathers everything failed by this rank"""
failed={}
all_job_files=glob.glob(f"{args.out_dir}/rank_{args.worker_rank}_*.failed.txt")
for fname in all_job_files:
with open(fname) as f:
for url in f:
url=url.strip()
failed[url]=failed.get(url,0)+1
return failed
def save_completed(example_dict,args):
with gzip.open(f"{args.out_dir}/rank_{args.worker_rank}_of_{args.total_workers}_{args.jobid}.completed.jsonl.gz","at") as f:
print(json.dumps(example_dict),file=f,flush=True)
with open(f"{args.out_dir}/rank_{args.worker_rank}_of_{args.total_workers}_{args.jobid}.completed.txt","at") as f:
print(example_dict["url"],file=f,flush=True)
def save_failed(example_dict,args):
with open(f"{args.out_dir}/rank_{args.worker_rank}_of_{args.total_workers}_{args.jobid}.failed.txt","at") as f:
print(example_dict["url"],file=f,flush=True)
def split_text(txt,args):
txt=txt.strip()
txt=re.sub(" +"," ",txt)
txt=re.sub(r"\n{3,}","\n\n",txt)
words=txt.split(" ")
chunks=grouper(words,args.chunk_length,fillvalue="")
final=[]
for c in chunks:
final.append((" ".join(c)).strip())
return final
def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
"Collect data into non-overlapping fixed-length chunks or blocks."
# grouper('ABCDEFG', 3, fillvalue='x') → ABC DEF Gxx
# grouper('ABCDEFG', 3, incomplete='strict') → ABC DEF ValueError
# grouper('ABCDEFG', 3, incomplete='ignore') → ABC DEF
iterators = [iter(iterable)] * n
match incomplete:
case 'fill':
return itertools.zip_longest(*iterators, fillvalue=fillvalue)
case 'strict':
return zip(*iterators, strict=True)
case 'ignore':
return zip(*iterators)
case _:
raise ValueError('Expected fill, strict, or ignore')
def test_loop(args):
examples=yield_examples(args)
done=gather_all_completed(args)
failed=gather_all_failed(args)
for e in examples:
if e["url"] in done:
continue
if failed.get(e["url"],0)>args.max_fails:
continue
text_pieces=split_text(e["text"],args)
fixed={"url":e["url"],"texts_fixed":["hi","ho"]}
save_completed(fixed,args)
def parse_args():
parser = argparse.ArgumentParser(
description="Process and manage job examples with specified parameters.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Path to the ECCO JSONL file
parser.add_argument(
'--ecco-jsonl',
type=str,
default="all_ecco.jsonl.gz",
help='Path to the ECCO .jsonl.gz input file'
)
# Total number of worker processes
parser.add_argument(
'--total-workers',
type=int,
default=1000,
help='Total number of worker processes participating in the job'
)
# Rank of the current worker
parser.add_argument(
'--worker-rank',
type=int,
required=True,
help='Rank identifier for the current worker (0 to total_workers-1)'
)
# Output directory for storing results
parser.add_argument(
'--out-dir',
type=str,
default="ecco_run_out",
help='Directory where output files will be saved'
)
# Job identifier
parser.add_argument(
'--jobid',
type=str,
required=True,
help='Unique identifier for the current job'
)
# Length of each text chunk when splitting
parser.add_argument(
'--chunk-length',
type=int,
default=300,
help='Number of words per text chunk'
)
# Maximum number of allowed failures per example
parser.add_argument(
'--max-fails',
type=int,
default=3,
help='Maximum number of allowed failures per example'
)
parser.add_argument(
'--model-name',
default="meta-llama/Llama-3.1-70B-Instruct",
help='Model to run'
)
parser.add_argument(
'--max-time',
default=None,
type=int,
help='Max time in sec this job should run, None for forever'
)
return parser.parse_args()
if __name__=="__main__":
args=parse_args()
test_loop(args)