-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfiles_db.ml
653 lines (619 loc) · 22.8 KB
/
files_db.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
open Lwt.Syntax
open Lwt.Infix
let branch = Db_config.branch
let inputs = Db_config.inputs
let outputs = Db_config.outputs
let scripts_folder = Db_config.scripts
let config = Db_config.config
type config = {
step : int;
confidence : float;
time_left_bound : int option;
time_right_bound : int option;
curve : string option;
column : string option;
value : string option;
script : string;
typ : [ `Custom | `Default ];
}
let typ_to_str = function `Custom -> "custom" | `Default -> "default"
let opt_to_str = function Some s -> s | None -> "none"
let print_config c =
Format.printf
"step=%d\nconfidence=%.2f\ncolumn=%s\nvalue=%s\nscript=%s\ntyp=%s@." c.step
c.confidence (c.column |> opt_to_str) (c.value |> opt_to_str) c.script
(c.typ |> typ_to_str)
let parse_config s typ =
let extract_value s pref f =
match String.split_on_char '=' s with
| [ p; "" ] when p = pref -> None
| [ p; v ] when p = pref -> Some (f v)
| _ -> assert false
in
match s |> String.trim |> String.split_on_char '\n' with
| [
step;
confidence;
time_left_bound;
time_right_bound;
curve;
column;
value;
script;
] ->
{
step = extract_value step "step" int_of_string |> Option.get;
confidence =
extract_value confidence "confidence" float_of_string |> Option.get;
time_left_bound =
extract_value time_left_bound "time_left_bound" int_of_string;
time_right_bound =
extract_value time_right_bound "time_right_bound" int_of_string;
curve = extract_value curve "curve" (fun x -> x);
column = extract_value column "column" (fun x -> x);
value = extract_value value "value" (fun x -> x);
script = extract_value script "script" (fun x -> x) |> Option.get;
typ;
}
| _ -> failwith "Failed to parse config"
let create_config step confidence time_left_bound time_right_bound curve column
value script =
let column, value =
(Option.value ~default:"" column, Option.value ~default:"" value)
in
let time_left_bound, time_right_bound =
( Option.map string_of_int time_left_bound |> Option.value ~default:"",
Option.map string_of_int time_right_bound |> Option.value ~default:"" )
in
let curve = Option.value ~default:"" curve in
Format.sprintf
"step=%d\n\
confidence=%.02f\n\
time_left_bound=%s\n\
time_right_bound=%s\n\
curve=%s\n\
column=%s\n\
value=%s\n\
script=%s"
step confidence time_left_bound time_right_bound curve column value script
let fetch branch =
Files.ctx () >>= fun ctx ->
let* repo = Files.Store.Repo.v Db_config.Pipeline.c in
Files.fetch ~ctx ~repo ~branch >|= fun node -> (node, repo)
let push branch =
Files.ctx () >>= fun ctx ->
let* repo = Files.Store.Repo.v Db_config.Pipeline.c in
Files.push ~ctx ~repo ~branch
let fetch_all () =
Files.ctx () >>= fun ctx ->
let* repo = Files.Store.Repo.v Db_config.Pipeline.c in
Files.fetch_all ~ctx ~repo >|= fun res -> (res, repo)
let branch_names () =
fetch_all () >|= fun (res, repo) ->
match res with
| Ok _ -> Ok (Files.Store.Branch.list repo)
| Error _ as err -> err
let add_branch ~name ~folder_name =
fetch Db_config.branch >>= fun (node, repo) ->
match node with
| Ok _ -> begin
let* t = Files.Store.main repo in
let* c = Files.Store.kind t [ inputs; folder_name ] in
if c = Some `Node then
Error "A folder with that name already exists!" |> Lwt.return
else
let* () = Files.Store.Head.get t >>= Files.Store.Branch.set repo name in
push name
>|= begin
function
| Ok () -> Ok (node, repo)
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
end
| Error (`Msg msg) -> Error msg |> Lwt.return
let handle_write_error (e : (_, Files.Store.write_error) result) =
match e with
| Error (`Conflict str) -> Error ("Database merging conflict! " ^ str)
| Error (`Too_many_retries i) -> Error ("Too many retries: " ^ string_of_int i)
| Error (`Test_was _) | Ok _ -> assert false
(* GETS *)
let all_files ?(branch = branch) (node, repo) =
match node with
| Ok _ ->
let* tree =
Files.Store.of_branch repo branch >>= fun t -> Files.Store.get_tree t []
in
Files.Folder.get_custom_tree ~order:`Sorted ~tree ~where:[ inputs ]
| Error (`Msg msg) -> failwith msg
let folder ?(branch = branch) folder_name (node, repo) : Files.Folder.t Lwt.t =
match node with
| Ok _ ->
let* tree =
Files.Store.of_branch repo branch >>= fun t -> Files.Store.get_tree t []
in
Files.Folder.get_custom_tree ~order:`Sorted ~tree
~where:(inputs :: folder_name)
| Error (`Msg msg) -> failwith msg
let config_file ?(branch = branch) folder_name (node, repo) : config Lwt.t =
match node with
| Ok _ ->
Files.Store.of_branch repo branch >>= fun t ->
let* b = Files.Store.mem t ([ inputs ] @ folder_name @ [ config ]) in
if b then begin
Files.Store.get t ([ inputs ] @ folder_name @ [ config ])
>|= fun content -> parse_config content `Custom
end
else begin
Files.Store.get t [ config ] >|= fun content ->
parse_config content `Default
end
| Error (`Msg msg) -> failwith msg
let scripts ?(branch = branch) script_folder (node, repo) : string list Lwt.t =
match node with
| Ok _ ->
let* tree =
Files.Store.of_branch repo branch >>= fun t -> Files.Store.get_tree t []
in
Files.Folder.file_names ~order:`Undefined ~tree ~where:[ script_folder ]
>|= List.filter (( <> ) "README.md")
| Error (`Msg msg) -> failwith msg
let get_file ?(branch = branch) ~in_where ~out_where ~name (node, repo) =
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
Files.File.get_file ~t ~in_where ~out_where ~name
| Error (`Msg msg) -> failwith msg
let get_output ?(branch = branch) ~where ~name (node, repo) =
match node with
| Ok _ ->
let* tree =
Files.Store.of_branch repo branch >>= fun t ->
Files.Store.get_tree t where
in
Files.Store.Tree.get tree [ name ]
| Error (`Msg msg) -> failwith msg
let historic_config_from_commit_hash ~hash ~in_where (node, repo) =
match node with
| Ok _ -> begin
let* copt = Files.Store.Commit.of_hash repo hash in
match copt with
| Some commit -> begin
Files.Store.of_commit commit >>= fun t ->
let config_file = in_where @ [ config ] in
let* b = Files.Store.mem t config_file in
if b then
Files.Store.get t config_file >|= fun str ->
Ok (parse_config str `Custom)
else
Files.Store.get t [ config ] >|= fun str ->
Ok (parse_config str `Default)
end
| None -> Error "No commit matches hash" |> Lwt.return
end
| Error (`Msg msg) -> Error msg |> Lwt.return
let find_earliest_commit_with_file ?(branch = branch) ~path (node, repo) =
match node with
| Ok _ -> begin
Files.Store.of_branch repo branch >>= fun t ->
Files.Store.last_modified t path >|= fun lst -> Ok (List.hd lst)
end
| Error (`Msg msg) -> Error msg |> Lwt.return
let historic_input_config_and_script ~branch ~in_where ~out_where ~name np =
let input_file =
let name =
(* name.script.date.csv *)
name |> Fpath.v |> Fpath.rem_ext ~multi:true
(* name *) |> Fpath.add_ext ".csv" (* name.csv *)
|> Fpath.to_string
in
in_where @ [ name ]
in
let script_file =
(* n.script.date.csv or n.script.type.date.pdf *)
let name_fpath = name |> Fpath.v in
let tmp =
name_fpath |> Fpath.rem_ext
|> Fpath.rem_ext (* n.script or n.script.type *)
in
let name =
(if name_fpath |> Fpath.get_ext = ".csv" then tmp
else tmp |> Fpath.rem_ext)
(* n.script *) |> Fpath.get_ext (* .script *)
|> Files.remove_dot |> Fpath.v (* script *)
|> Fpath.add_ext ".r" (* script.r *)
|> Fpath.to_string
in
[ scripts_folder; name ]
in
let output_file = out_where @ [ name ] in
let config_file = in_where @ [ config ] in
find_earliest_commit_with_file ~branch ~path:output_file np >>= function
| Ok commit -> begin
Files.Store.of_commit commit >>= fun t ->
let+ config =
let* b = Files.Store.mem t config_file in
if b then
Files.Store.get t config_file >|= fun str -> parse_config str `Custom
else
Files.Store.get t [ config ] >|= fun str -> parse_config str `Default
and+ input_content = Files.Store.get t input_file
and+ script_content = Files.Store.get t script_file in
Ok (input_content, config, script_content)
end
| Error _ as err -> err |> Lwt.return
(* SETS / REMOVES / UPDATES *)
let add_config ?(branch = branch) ~user_email ~path ~step ~confidence
~time_left_bound ~time_right_bound ~curve ~column ~value ~script (node, repo)
=
let config_path = path @ [ "config" ] in
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let config =
create_config step confidence time_left_bound time_right_bound curve
column value script
in
let* c = Files.Store.kind t path in
if c = Some `Node then
Files.Store.set t
~info:(Db_config.Pipeline.info "Adding config" user_email)
config_path config
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Parent directory does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
let update_config ?(branch = branch) ~user_email ~path ~step ~confidence
~time_left_bound ~time_right_bound ~curve ~column ~value ~script (node, repo)
=
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let config =
create_config step confidence time_left_bound time_right_bound curve
column value script
in
let* c = Files.Store.mem t path in
if c then
Files.Store.set t
~info:(Db_config.Pipeline.info "Updating config" user_email)
path config
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Config does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
let remove_config ?(branch = branch) ~user_email ~path (node, repo) =
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.mem t path in
if c then
Files.Store.remove t
~info:(Db_config.Pipeline.info "Removing config" user_email)
path
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Config does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
(* TODO: Add output folder *)
let add_folder ?(branch = branch) ~user_email ~in_where ~out_where ~name
(node, repo) =
let in_where, out_where =
let f = List.filter (( <> ) "") in
(f in_where, f out_where)
in
let in_new_file_path = in_where @ [ name; ".ign" ] in
let out_new_file_path = out_where @ [ name; ".ign" ] in
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.kind t in_where in
let* root_tree = Files.Store.tree t in
if c = Some `Node then
Files.Store.Tree.add root_tree in_new_file_path "" >>= fun tree ->
Files.Store.Tree.add tree out_new_file_path "" >>= fun tree ->
Files.Store.set_tree
~info:(Db_config.Pipeline.info "Adding folder" user_email)
t [] tree
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Parent directory does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
(* TODO: Remove output folder (MAYBE?) *)
let remove_folder ?(branch = branch) ~user_email ~path (node, repo) =
let path = List.filter (( <> ) "") path in
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.kind t path in
if c = Some `Node then
Files.Store.remove t
~info:(Db_config.Pipeline.info "Removing folder" user_email)
path
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Parent directory does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
(* TODO: Rename output folder*)
let rename_folder ?(branch = branch) ~user_email ~in_where ~out_where ~old_name
~new_name (node, repo) =
let in_where, out_where =
let f = List.filter (( <> ) "") in
(f in_where, f out_where)
in
let in_old_folder_path = in_where @ [ old_name ] in
let in_new_folder_path = in_where @ [ new_name ] in
let out_old_folder_path = out_where @ [ old_name ] in
let out_new_folder_path = out_where @ [ new_name ] in
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.kind t in_old_folder_path in
let* root_tree = Files.Store.tree t in
if c = Some `Node then
let* in_file_tree =
Files.Store.Tree.get_tree root_tree in_old_folder_path
in
let* out_file_tree =
Files.Store.Tree.get_tree root_tree out_old_folder_path
in
Files.Store.Tree.update_tree root_tree in_new_folder_path (fun _ ->
Some in_file_tree)
>>= fun tree ->
Files.Store.Tree.update_tree tree out_new_folder_path (fun _ ->
Some out_file_tree)
>>= fun tree ->
Files.Store.Tree.remove tree in_old_folder_path >>= fun tree ->
Files.Store.Tree.remove tree out_old_folder_path >>= fun tree ->
Files.Store.set_tree
~info:(Db_config.Pipeline.info "Renaming folder" user_email)
t [] tree
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Directory does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
let add_file ?(branch = branch) ~user_email ~path ~name ~content (node, repo) =
let path = List.filter (( <> ) "") path in
let new_file_path = path @ [ name ] in
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.kind t path in
if c = Some `Node then
Files.Store.set t
~info:(Db_config.Pipeline.info "Adding file" user_email)
new_file_path content
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Parent directory does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
let add_files ?(branch = branch) ~user_email ~path ~name ~contents (node, repo)
=
let path = List.filter (( <> ) "") path in
match node with
| Ok _ -> begin
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.kind t path in
let* root_tree = Files.Store.tree t in
if c = Some `Node then
let* tree = Files.Store.Tree.get_tree root_tree path in
(* loop over every sheet and write them all as name_sheet_number.csv
if something errors out we warn the user but the previously written csvs stay there...
we might want to change that *)
let* tree, _ =
Lwt_list.fold_left_s
(fun (tree, counter) r ->
match r with
| Ok content ->
(* removing any extensions from this
file like .xlsx and others *)
let name_no_ext =
name |> Fpath.v |> Fpath.rem_ext ~multi:true
|> Fpath.to_string
in
let name =
Format.sprintf "%s_sheet_%d.csv" name_no_ext counter
in
let content = Files.Csv.sanitize content in
let+ tree = Files.Store.Tree.add tree [ name ] content in
(tree, counter + 1)
| Error _ -> assert false)
(tree, 0) contents
in
Files.Store.set_tree
~info:(Db_config.Pipeline.info "Adding files" user_email)
t path tree
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Parent directory does not exist" |> Lwt.return
end
| Error (`Msg msg) -> Error msg |> Lwt.return
let edit_file ?(branch = branch) ~user_email ~path content (node, repo) =
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.mem t path in
if c then
Files.Store.set t
~info:(Db_config.Pipeline.info "Editing file" user_email)
path content
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "File does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
let remove_file ?(branch = branch) ~user_email ~in_where ~out_where ~name
(node, repo) =
let outputs = List.hd out_where in
let in_path = in_where @ [ name ] in
match node with
| Ok _ ->
let* t = Files.Store.of_branch repo branch in
let* c = Files.Store.kind t in_where in
let* root_tree = Files.Store.get_tree t [] in
if c = Some `Node then
let* file = Files.File.get_file ~t ~in_where ~out_where ~name in
Files.Store.Tree.remove root_tree in_path |> fun tree ->
(* iterate through all values of the output hashtbl
which can be a list of `Pdf (_, _, path) and `Csv (_, _, path)
*)
let* tree =
Hashtbl.fold
(fun _ v tree ->
let* tree = tree in
let tree =
Lwt_list.fold_left_s
(fun tree vin ->
let _, _, output_path =
match vin with `Csv x | `Pdf x -> x
in
let output_path =
outputs
:: (output_path |> Fpath.to_string
|> String.split_on_char '/')
in
Files.Store.Tree.remove tree output_path)
tree v
in
tree)
file.outputs tree
in
Files.Store.set_tree
~info:(Db_config.Pipeline.info "Removing file" user_email)
t [] tree
>>= function
| Error _ as err -> handle_write_error err |> Lwt.return
| Ok () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
else Error "Parent directory does not exist" |> Lwt.return
| Error (`Msg msg) -> Error msg |> Lwt.return
(* this will add the new folder from the contribution into the main branch, and not the rest,
as it could have been created before deleting some inputs
*)
let merge_branches ~into ~from ~name ~user_email (node, repo) =
match node with
| Ok _ -> begin
let* into_t = Files.Store.of_branch repo into
and* from_t = Files.Store.of_branch repo from in
let* into_tree = Files.Store.tree into_t
and* from_tree = Files.Store.tree from_t in
let* into_tree_folder_kind =
Files.Store.Tree.kind into_tree [ inputs; name ]
and* from_tree_input_folder =
Files.Store.Tree.get_tree from_tree [ inputs; name ]
and* from_tree_output_folder =
Files.Store.Tree.get_tree from_tree [ outputs; name ]
in
match into_tree_folder_kind with
| Some `Node ->
Error
"A folder with that name already exists, this should not happen!"
|> Lwt.return
| None -> begin
let* tree =
Files.Store.Tree.add_tree into_tree [ inputs; name ]
from_tree_input_folder
>>= fun tree ->
Files.Store.Tree.add_tree tree [ outputs; name ]
from_tree_output_folder
in
Files.Store.set_tree_exn
~info:
(Db_config.Pipeline.info
"Merging branches %s and %s (input folder)" into from
user_email)
into_t [] tree
>>= fun () ->
push branch
>|= begin
function
| Ok () -> Ok ()
| Error (`Msg msg) -> Error msg
| Error `Detached_head -> Error "Detached head"
end
end
| _ ->
let msg =
"The folder name is the same as an existing file. Not able to \
merge."
in
Error msg |> Lwt.return
end
| Error (`Msg msg) -> Error msg |> Lwt.return