Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Unify LevelDB usage to single instance #57

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/machi.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
%% @doc Now 4GiBytes, could be up to 64bit due to PB message limit of
%% chunk size
-define(DEFAULT_MAX_FILE_SIZE, ((1 bsl 32) - 1)).
-define(MINIMUM_OFFSET, 1024).
-define(MINIMUM_OFFSET, 0).

%% 0th draft of checksum typing with 1st byte.
-define(CSUM_TAG_NONE, 0). % No csum provided by client
Expand Down
295 changes: 195 additions & 100 deletions src/machi_csum_table.erl

Large diffs are not rendered by default.

166 changes: 86 additions & 80 deletions src/machi_file_proxy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@
filename :: string() | undefined,
data_path :: string() | undefined,
wedged = false :: boolean(),
csum_file :: string()|undefined,
csum_path :: string()|undefined,
data_filehandle :: file:io_device(),
csum_table :: machi_csum_table:table(),
eof_position = 0 :: non_neg_integer(),
Expand All @@ -102,12 +100,14 @@

%% Public API

% @doc Start a new instance of the file proxy service. Takes the filename
% and data directory as arguments. This function is typically called by the
% `machi_file_proxy_sup:start_proxy/2' function.
-spec start_link(FluName :: atom(), Filename :: string(), DataDir :: string()) -> any().
start_link(FluName, Filename, DataDir) ->
gen_server:start_link(?MODULE, {FluName, Filename, DataDir}, []).
% @doc Start a new instance of the file proxy service. Takes the
% filename and data directory as arguments. This function is typically
% called by the `machi_file_proxy_sup:start_proxy/2'
% function. Checksum table is also passed at startup.
-spec start_link(Filename :: string(),
DataDir :: string(), CsumTable :: machi_csum_table:table()) -> any().
start_link(Filename, DataDir, CsumTable) ->
gen_server:start_link(?MODULE, {Filename, DataDir, CsumTable}, []).

% @doc Request to stop an instance of the file proxy service.
-spec stop(Pid :: pid()) -> ok.
Expand Down Expand Up @@ -218,26 +218,24 @@ checksum_list(Pid) ->
%% gen_server callbacks

% @private
init({FluName, Filename, DataDir}) ->
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
init({Filename, DataDir, CsumTable}) ->
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = filelib:ensure_dir(CsumFile),
ok = filelib:ensure_dir(DPath),
{ok, CsumTable} = machi_csum_table:open(CsumFile, []),
UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable),
CsumTable1 = case machi_csum_table:is_file_trimmed(CsumTable, list_to_binary(Filename)) of
false -> CsumTable;
true -> trimmed
end,

UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable, iolist_to_binary(Filename)),
{Eof, infinity} = lists:last(UnwrittenBytes),
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
%% Reserve for EC and stuff, to prevent eof when read
ok = file:pwrite(FHd, 0, binary:copy(<<"so what?">>, ?MINIMUM_OFFSET div 8)),
Tref = schedule_tick(),
St = #state{
fluname = FluName,
filename = Filename,
data_dir = DataDir,
data_path = DPath,
csum_file = CsumFile,
data_filehandle = FHd,
csum_table = CsumTable,
csum_table = CsumTable1,
tref = Tref,
eof_position = Eof,
max_file_size = machi_config:max_file_size()},
Expand Down Expand Up @@ -281,6 +279,13 @@ handle_call({read, _Offset, _Length, _}, _From,
}) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};

handle_call({read, _Offset, _Length, _Opts}, _From,
State = #state{
csum_table = trimmed,
reads = {T, Err}
}) ->
{reply, {error, trimmed}, State#state{reads = {T+1, Err+1}}};

handle_call({read, Offset, Length, _Opts}, _From,
State = #state{eof_position = Eof,
reads = {T, Err}
Expand Down Expand Up @@ -325,6 +330,11 @@ handle_call({write, _Offset, _ClientMeta, _Data}, _From,
}) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};

handle_call({write, _, _, _}, _From,
State = #state{writes = {T, Err},
csum_table = trimmed}) ->
{reply, {error, trimmed}, State#state{writes = {T + 1, Err + 1}}};

handle_call({write, Offset, ClientMeta, Data}, _From,
State = #state{filename = F,
writes = {T, Err},
Expand All @@ -348,7 +358,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
{Error, Err + 1}
end
end,
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable, iolist_to_binary(F))),
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
[iolist_size(Data), Offset, F, NewEof]),
{reply, Resp, State#state{writes = {T+1, NewErr},
Expand All @@ -365,35 +375,33 @@ handle_call({trim, _Offset, _ClientMeta, _Data}, _From,

handle_call({trim, Offset, Size, _TriggerGC}, _From,
State = #state{data_filehandle=FHd,
filename=Filename,
ops = Ops,
trims = {T, Err},
csum_table = CsumTable}) ->

case machi_csum_table:all_trimmed(CsumTable, Offset, Offset+Size) of
true ->
NewState = State#state{ops=Ops+1, trims={T, Err+1}},
%% All bytes of that range was already trimmed returns ok
%% here, not {error, trimmed}, which means the whole file
%% was trimmed
F = iolist_to_binary(Filename),
LUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_leftneighbor(CsumTable,
F,
Offset)),
RUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_rightneighbor(CsumTable,
F,
Offset+Size)),

case machi_csum_table:trim(CsumTable, F, Offset,
Size, LUpdate, RUpdate) of
ok ->
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable, F)),
NewState = State#state{ops=Ops+1,
trims={T+1, Err},
eof_position=NewEof},
maybe_gc(ok, NewState);
false ->
LUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_leftneighbor(CsumTable, Offset)),
RUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)),

case machi_csum_table:trim(CsumTable, Offset, Size, LUpdate, RUpdate) of
ok ->
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
NewState = State#state{ops=Ops+1,
trims={T+1, Err},
eof_position=NewEof},
maybe_gc(ok, NewState);
Error ->
{reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}}
end
Error ->
{reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}}
end;

%% APPENDS
Expand Down Expand Up @@ -435,8 +443,9 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
{reply, Resp, State#state{appends = {T+1, NewErr},
eof_position = NewEof}};

handle_call({checksum_list}, _FRom, State = #state{csum_table=T}) ->
All = machi_csum_table:all(T),
handle_call({checksum_list}, _FRom, State = #state{filename=Filename,
csum_table=T}) ->
All = machi_csum_table:all(T,iolist_to_binary(Filename)),
{reply, {ok, All}, State};

handle_call(Req, _From, State) ->
Expand Down Expand Up @@ -528,7 +537,6 @@ handle_info(Req, State) ->
% @private
terminate(Reason, #state{filename = F,
data_filehandle = FHd,
csum_table = T,
reads = {RT, RE},
writes = {WT, WE},
appends = {AT, AE}
Expand All @@ -544,14 +552,7 @@ terminate(Reason, #state{filename = F,
_ ->
ok = file:sync(FHd),
ok = file:close(FHd)
end,
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:close(T)
end,
ok.
end.

% @private
code_change(_OldVsn, State, _Extra) ->
Expand Down Expand Up @@ -622,7 +623,8 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
do_read(FHd, Filename, CsumTable, Offset, Size, _, _) ->
%% Note that find/3 only returns overlapping chunks, both borders
%% are not aligned to original Offset and Size.
ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size),
ChunkCsums = machi_csum_table:find(CsumTable, iolist_to_binary(Filename),
Offset, Size),
read_all_ranges(FHd, Filename, ChunkCsums, [], []).

-spec read_all_ranges(file:io_device(), string(),
Expand Down Expand Up @@ -700,7 +702,7 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, Trimm
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
Size = iolist_size(Data),

case machi_csum_table:find(CsumTable, Offset, Size) of
case machi_csum_table:find(CsumTable, iolist_to_binary(Filename), Offset, Size) of
[] -> %% Nothing should be there
try
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data)
Expand All @@ -723,6 +725,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
ok;
{ok, _Other} ->
%% TODO: leave some debug/warning message here?
io:format(user, "baposdifa;lsdfkj<<<<<<<~n", []),
{error, written}
end;
[{Offset, Size, OtherCsum}] ->
Expand All @@ -731,11 +734,13 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
" a check for unwritten bytes gave us checksum ~p"
" but the data we were trying to write has checksum ~p",
[Offset, Filename, OtherCsum, TaggedCsum]),
io:format(user, "baposdifa;lsdfkj*************8~n", []),
{error, written};
_Chunks ->
%% TODO: Do we try to read all continuous chunks to see
%% wether its total checksum matches client-provided checksum?
case machi_csum_table:any_trimmed(CsumTable, Offset, Size) of
case machi_csum_table:any_trimmed(CsumTable, iolist_to_binary(Filename),
Offset, Size) of
true ->
%% More than a byte is trimmed, besides, do we
%% have to return exact written bytes? No. Clients
Expand All @@ -744,6 +749,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
{error, trimmed};
false ->
%% No byte is trimmed, but at least one byte is written
io:format(user, "baposdifa;lsdfkj*************8 ~p~n", [_Chunks]),
{error, written}
end
end.
Expand All @@ -761,6 +767,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
case file:pwrite(FHd, Offset, Data) of
ok ->
F = iolist_to_binary(Filename),
lager:debug("Successful write in file ~p at offset ~p, length ~p",
[Filename, Offset, Size]),

Expand All @@ -769,11 +776,15 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
%% as server_sha
LUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_leftneighbor(CsumTable, Offset)),
machi_csum_table:find_leftneighbor(CsumTable,
F,
Offset)),
RUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)),
ok = machi_csum_table:write(CsumTable, Offset, Size,
machi_csum_table:find_rightneighbor(CsumTable,
F,
Offset+Size)),
ok = machi_csum_table:write(CsumTable, F, Offset, Size,
TaggedCsum, LUpdate, RUpdate),
lager:debug("Successful write to checksum file for ~p",
[Filename]),
Expand Down Expand Up @@ -838,32 +849,27 @@ maybe_gc(Reply, S = #state{eof_position = Eof,
lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n",
[Eof, MaxFileSize]),
{reply, Reply, S};
maybe_gc(Reply, S = #state{fluname=FluName,
data_filehandle = FHd,
maybe_gc(Reply, S = #state{data_filehandle = FHd,
data_dir = DataDir,
filename = Filename,
eof_position = Eof,
csum_table=CsumTable}) ->
case machi_csum_table:all_trimmed(CsumTable, ?MINIMUM_OFFSET, Eof) of
true ->
lager:debug("GC? Let's do it: ~p.~n", [Filename]),
%% Before unlinking a file, it should inform
%% machi_flu_filename_mgr that this file is
%% deleted and mark it as "trimmed" to avoid
%% filename reuse and resurrection. Maybe garbage
%% will remain if a process crashed but it also
%% should be recovered at filename_mgr startup.

%% Also, this should be informed *before* file proxy
%% deletes files.
ok = machi_flu_metadata_mgr:trim_file(FluName, {file, Filename}),
lager:debug("GC? Let's try it: ~p.~n", [Filename]),

case machi_csum_table:maybe_trim_file(CsumTable, iolist_to_binary(Filename), Eof) of
{ok, trimmed} ->
%% Checksum table entries are all trimmed now, unlinking
%% file from operating system
ok = file:close(FHd),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = file:delete(DPath),
machi_csum_table:delete(CsumTable),
{stop, normal, Reply,
S#state{data_filehandle=undefined,
csum_table=undefined}};
false ->
lager:info("File ~s has been unlinked as all chunks"
" were trimmed.", [Filename]),
{stop, normal, Reply, S#state{data_filehandle=undefined}};
{ok, not_trimmed} ->
{reply, Reply, S};
{error, _} = Error ->
lager:error("machi_csum_table:maybe_trim_file/4 has been "
"unexpectedly failed: ~p", [Error]),
{reply, Reply, S}
end.
3 changes: 2 additions & 1 deletion src/machi_file_proxy_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ start_link(FluName) ->
supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []).

start_proxy(FluName, DataDir, Filename) ->
{ok, CsumTable} = machi_flu_filename_mgr:get_csum_table(FluName),
supervisor:start_child(make_proxy_name(FluName),
[FluName, Filename, DataDir]).
[Filename, DataDir, CsumTable]).

init([]) ->
SupFlags = {simple_one_for_one, 1000, 10},
Expand Down
19 changes: 8 additions & 11 deletions src/machi_flu1_net_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -392,17 +392,14 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
case sanitize_file_string(File) of
ok ->
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
{ok, Pid} ->
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
Other -> Other
end;
{error, trimmed} = Error ->
Error
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
Other ->
Other
end;
_ ->
{error, bad_arg}
Expand Down
Loading