Skip to content

Commit

Permalink
more updates for IO
Browse files Browse the repository at this point in the history
  • Loading branch information
zoj613 committed Dec 25, 2024
1 parent de070e8 commit 845a0fc
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 26 deletions.
2 changes: 1 addition & 1 deletion zarr-sync/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module MemoryStore : Zarr.Memory.S with type 'a io := 'a
module ZipStore : Zarr.Zip.S with type 'a io := 'a

(** A blocking I/O Http storage backend for a Zarr v3 hierarchy. *)
module HttpStore : Zarr.Http.S with module Deferred = Deferred
module HttpStore : Zarr.Http.S with type 'a io := 'a

(** A blocking I/O local filesystem storage backend for a Zarr v3 hierarchy. *)
module FilesystemStore : sig
Expand Down
2 changes: 1 addition & 1 deletion zarr-sync/test/test_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ let test_storage

module type SYNC_PARTIAL_STORE = sig
exception Not_implemented
include Zarr.Storage.STORE with type 'a Deferred.t = 'a
include Zarr.Storage.S with type 'a io := 'a
end

let test_readable_writable_only
Expand Down
45 changes: 21 additions & 24 deletions zarr/src/storage/http.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module type S = sig
exception Not_implemented
exception Request_failed of int * string
include Storage.STORE
include Storage.S

type auth = {user : string; pwd : string}

Expand All @@ -11,8 +11,8 @@ module type S = sig
?tries:int ->
?timeout:int ->
string ->
(t -> 'a Deferred.t) ->
'a Deferred.t
(t -> 'a io) ->
'a io
(** [with_open url f] connects to the Zarr store described by the url [url]
and applies function [f] to the store's open handle.
Expand All @@ -37,20 +37,17 @@ module type C = sig
include Ezcurl_core.S
end

module Make
(Deferred : Types.Deferred)
(C : C with type 'a io = 'a Deferred.t) : S with module Deferred = Deferred = struct
module Make (IO : Types.IO) (C : C with type 'a io = 'a IO.t) : S with type 'a io := 'a IO.t = struct
exception Not_implemented
exception Request_failed of int * string
open Deferred.Syntax
open IO.Syntax

let raise_error (code, s) = raise (Request_failed (Curl.int_of_curlCode code, s))
let fold_result = Result.fold ~error:raise_error ~ok:Fun.id

module IO = struct
module Deferred = Deferred

module Store = struct
type t = {tries : int; client : C.t; base_url : string; config : Ezcurl_core.Config.t}
type 'a io = 'a IO.t

let get t key =
let tries = t.tries and client = t.client and config = t.config in
Expand All @@ -60,29 +57,29 @@ module Make
| {code; body; _} when code = 200 -> body
| {code; body; _} -> raise (Request_failed (code, body))

let size t key = try Deferred.map String.length (get t key) with
| Request_failed (404, _) -> Deferred.return 0
let size t key = try IO.map String.length (get t key) with
| Request_failed (404, _) -> IO.return 0
(*let size t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in
let headers = [("Content-Type", "application/" ^ type')] in
let* res = C.http ~headers ~tries ~client ~config ~url ~meth:HEAD () in
match res with
| Error _ -> Deferred.return 0
| Ok {code; _} when code = 404 -> Deferred.return 0
| Error _ -> IO.return 0
| Ok {code; _} when code = 404 -> IO.return 0
| Ok {headers; code; _} when code = 200 ->
begin match List.assoc_opt "content-length" headers with
| Some "0" -> Deferred.return 0
| Some l -> Deferred.return @@ int_of_string l
| Some "0" -> IO.return 0
| Some l -> IO.return @@ int_of_string l
| None ->
begin try print_endline "empty content-length header";
Deferred.map String.length (get t key) with
| Request_failed (404, _) -> Deferred.return 0 end
IO.map String.length (get t key) with
| Request_failed (404, _) -> IO.return 0 end
end
| Ok {code; body; _} -> raise (Request_failed (code, body)) *)

let is_member t key = Deferred.map (fun s -> if s > 0 then true else false) (size t key)
let is_member t key = IO.map (fun s -> if s > 0 then true else false) (size t key)

let get_partial_values t key ranges =
let tries = t.tries and client = t.client and config = t.config and url = t.base_url ^ key in
Expand All @@ -91,9 +88,9 @@ module Make
let read_range acc (ofs, len) =
let none = Printf.sprintf "%d-" ofs in
let range = Option.fold ~none ~some:(end_index ofs) len in
Deferred.map (fun r -> (fold_result r).body :: acc) (fetch range)
IO.map (fun r -> (fold_result r).body :: acc) (fetch range)
in
Deferred.fold_left read_range [] (List.rev ranges)
IO.fold_left read_range [] (List.rev ranges)

let set t key data =
let tries = t.tries and client = t.client and config = t.config
Expand All @@ -110,7 +107,7 @@ module Make
let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| 0 -> IO.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
Expand Down Expand Up @@ -149,9 +146,9 @@ module Make
|> Ezcurl_core.Config.username basic_auth.user
|> Ezcurl_core.Config.password basic_auth.pwd
in
f IO.{tries; client; config; base_url = url ^ "/"}
f Store.{tries; client; config; base_url = url ^ "/"}
in
C.with_client ~set_opts perform

include Storage.Make(IO)
include Storage.Make(IO)(Store)
end

0 comments on commit 845a0fc

Please sign in to comment.