Skip to content

Commit

Permalink
Merge branch 'develop' into 1.7
Browse files Browse the repository at this point in the history
Conflicts:
	rebar.config
  • Loading branch information
engelsanchez committed Jun 19, 2015
2 parents fc0c8ee + 1b5ad68 commit f3b4352
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 205 deletions.
9 changes: 5 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
{erl_opts, [debug_info, warn_untyped_record, warnings_as_errors]}.
{erl_opts, [debug_info, warn_untyped_record, warnings_as_errors,
{platform_define, "^[0-9]+", namespaced_types}]}.
{port_specs, [{"priv/bitcask.so", ["c_src/*.c"]}]}.

{deps, [
{meck, "0.8.1", {git, "git://github.com/basho/meck.git", {tag, "0.8.1"}}},
{cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "2.0"}}}
{meck, "0.8.2", {git, "git://github.com/basho/meck.git", {tag, "0.8.2"}}},
{cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {tag, "2.0.1"}}}
]}.

{port_env,
[
{"DRV_CFLAGS",
"-g -Wall -fPIC -errors $ERL_CFLAGS"},
"-g -Wall -fPIC $ERL_CFLAGS"},

%% Solaris specific flags
{"solaris.*-64$", "CFLAGS", "-D_REENTRANT -m64"},
Expand Down
229 changes: 200 additions & 29 deletions src/bitcask.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,17 @@
tombstone_version = 2 :: 0 | 2
}).

-ifdef(namespaced_types).
-type bitcask_set() :: sets:set().
-else.
-type bitcask_set() :: set().
-endif.

-record(mstate, { dirname :: string(),
merge_lock :: reference(),
max_file_size :: integer(),
input_files :: [#filestate{}],
input_file_ids :: set(),
input_file_ids :: bitcask_set(),
min_file_id :: non_neg_integer(),
tombstone_write_files :: [#filestate{}],
out_file :: 'fresh' | #filestate{},
Expand Down Expand Up @@ -398,11 +404,19 @@ fold(State, Fun, Acc0, MaxAge, MaxPut, SeeTombstonesP) ->
{ok, Files, FoldEpoch} ->
ExpiryTime = expiry_time(State#bc_state.opts),
SubFun = fun(K0,V,TStamp,{_FN,FTS,Offset,_Sz},Acc) ->
K = KT(K0),
case (TStamp < ExpiryTime) of
true ->
K = try
KT(K0)
catch
KeyTxErr ->
{key_tx_error, {K0, KeyTxErr}}
end,
case {K, (TStamp < ExpiryTime)} of
{{key_tx_error, TxErr}, _} ->
error_logger:error_msg("Error converting key ~p: ~p", [K0, TxErr]),
Acc;
false ->
{_, true} ->
Acc;
{_, false} ->
case bitcask_nifs:keydir_get(
State#bc_state.keydir, K,
FoldEpoch) of
Expand Down Expand Up @@ -1175,17 +1189,34 @@ scan_key_files([Filename | Rest], KeyDir, Acc, CloseFile, KT) ->
%% tombstones or data errors. Otherwise we risk of
%% reusing the file id for new data.
_ = bitcask_nifs:increment_file_id(KeyDir, FileTstamp),
F = fun({tombstone, K}, _Tstamp, {_Offset, _TotalSz}, _) ->
_ = bitcask_nifs:keydir_remove(KeyDir, KT(K));
(K, Tstamp, {Offset, TotalSz}, _) ->
bitcask_nifs:keydir_put(KeyDir,
KT(K),
FileTstamp,
TotalSz,
Offset,
Tstamp,
bitcask_time:tstamp(),
false)
F = fun({tombstone, K0}, _Tstamp, {_Offset, _TotalSz}, _) ->
K = try KT(K0) catch TxErr -> {key_tx_error, TxErr} end,
case K of
{key_tx_error, KeyTxErr} ->
error_logger:error_msg("Invalid key on load ~p: ~p",
[K0, KeyTxErr]),
ok;
_ ->
bitcask_nifs:keydir_remove(KeyDir, KT(K))
end,
ok;
(K0, Tstamp, {Offset, TotalSz}, _) ->
K = try KT(K0) catch TxErr -> {key_tx_error, TxErr} end,
case K of
{key_tx_error, KeyTxErr} ->
error_logger:error_msg("Invalid key on load ~p: ~p",
[K0, KeyTxErr]);
_ ->
bitcask_nifs:keydir_put(KeyDir,
K,
FileTstamp,
TotalSz,
Offset,
Tstamp,
bitcask_time:tstamp(),
false)
end,
ok
end,
bitcask_fileops:fold_keys(File, F, undefined, recovery),
if CloseFile == true ->
Expand Down Expand Up @@ -1346,8 +1377,21 @@ merge_files(#mstate { dirname = Dirname,
key_transform = KT
} = State) ->
FileId = bitcask_fileops:file_tstamp(File),
F = fun(K, V, Tstamp, Pos, State0) ->
merge_single_entry(KT(K), V, Tstamp, FileId, Pos, State0)
F = fun(K0, V, Tstamp, Pos, State0) ->
K = try
KT(K0)
catch
Err ->
{key_tx_error, Err}
end,
case K of
{key_tx_error, TxErr} ->
error_logger:error_msg("Invalid key on merge ~p: ~p",
[K0, TxErr]),
State0;
_ ->
merge_single_entry(K, V, Tstamp, FileId, Pos, State0)
end
end,
State2 = try bitcask_fileops:fold(File, F, State) of
#mstate{delete_files = DelFiles} = State1 ->
Expand Down Expand Up @@ -1924,9 +1968,16 @@ expiry_merge([File | Files], LiveKeyDir, KT, Acc0) ->
FileId = bitcask_fileops:file_tstamp(File),
Fun = fun({tombstone, _}, _, _, Acc) ->
Acc;
(K, Tstamp, {Offset, _TotalSz}, Acc) ->
bitcask_nifs:keydir_remove(LiveKeyDir, KT(K), Tstamp, FileId,
Offset),
(K0, Tstamp, {Offset, _TotalSz}, Acc) ->
K = try KT(K0) catch TxErr -> {key_tx_error, TxErr} end,
case K of
{key_tx_error, KeyTxErr} ->
error_logger:error_msg("Invalid key on merge ~p: ~p",
[K0, KeyTxErr]);
_ ->
bitcask_nifs:keydir_remove(LiveKeyDir, K, Tstamp,
FileId, Offset)
end,
Acc
end,
case bitcask_fileops:fold_keys(File, Fun, ok, default) of
Expand Down Expand Up @@ -2116,35 +2167,69 @@ fold_corrupt_file_test_() ->

fold_corrupt_file_test2() ->
TestDir = "/tmp/bc.test.fold_corrupt_file_test",
TestDataFile = TestDir ++ "/1.bitcask.data",

os:cmd("rm -rf " ++ TestDir),
B = bitcask:open(TestDir, [read_write]),
ok = bitcask:put(B,<<"k">>,<<"v">>),
close(B),

DataList = [{<<"k">>, <<"v">>}],
CreateFun = fun(KVList) ->
os:cmd("rm -rf " ++ TestDir),
B = bitcask:open(TestDir, [read_write]),
[ begin
ok = bitcask:put(B, K, V)
end || {K, V} <- KVList],
close(B),
bitcask:readable_files(TestDir)
end,

[File1] = CreateFun(DataList),
FoldFun = fun (K, V, Acc) -> [{K, V} | Acc] end,

B2 = bitcask:open(TestDir),
?assertEqual(DataList, bitcask:fold(B2, FoldFun,[])),
close(B2),

% Incomplete header
{ok, F} = file:open(TestDataFile, [append, raw, binary]),
{ok, File1Before} = file:read_file(File1),
{ok, F} = file:open(File1, [append, raw, binary]),
ok = file:write(F, <<100:32, 100:32, 100:16>>),
file:close(F),
{ok, File1After} = file:read_file(File1),
?assert(File1Before /= File1After),
B3 = bitcask:open(TestDir),
?assertEqual(DataList, bitcask:fold(B3, FoldFun,[])),
close(B3),

% Re-create before trying next corruption
[File2] = CreateFun(DataList),
{ok, File2Before} = file:read_file(File2),
% Header without any data
{ok, F2} = file:open(TestDataFile, [append, raw, binary]),
{ok, F2} = file:open(File2, [append, raw, binary]),
ok = file:write(F2, <<100:32>>),
file:close(F2),
{ok, File2After} = file:read_file(File2),
?assert(File2Before /= File2After),
B4 = bitcask:open(TestDir),
?assertEqual(DataList, bitcask:fold(B4, FoldFun,[])),
close(B4),

%% If record is corrupted, the key should not be loaded.
DataList2 = [{<<"k1">>, <<"v1">>}, {<<"k2">>, <<"v2">>}],
[File3] = CreateFun(DataList2),
{ok, File3Before} = file:read_file(File3),
Hintfile = bitcask_fileops:hintfile_name(File3),
?assertEqual(true, filelib:is_regular(File3)),
?assertEqual(true, filelib:is_regular(Hintfile)),
ok = file:delete(Hintfile),
?assertEqual(false, filelib:is_regular(Hintfile)),
%% Change last byte of value for k2 to invalidate its CRC.
{ok, F3} = file:open(File3, [binary, read, write]),
ok = file:pwrite(F3, {eof, -1}, <<"3">>),
ok = file:close(F3),
{ok, File3After} = file:read_file(File3),
?assert(File3Before /= File3After),
B5 = bitcask:open(TestDir),
LoadedKeys = bitcask:list_keys(B5),
?assertEqual([<<"k1">>], LoadedKeys),
bitcask:close(B5),

ok.

% Issue puts until the entries hash in the keydir can not be updated anymore
Expand Down Expand Up @@ -2750,6 +2835,73 @@ delete_keydir_test2() ->
bitcask:close(KDB),
ok.

retry_until_true(_, _, N) when N =< 0 ->
false;
retry_until_true(F, Delay, Retries) ->
case F() of
true ->
true;
_ ->
timer:sleep(Delay),
retry_until_true(F, Delay, Retries - 1)
end.

no_pending_delete_bottleneck_test_() ->
{timeout, 30, fun no_pending_delete_bottleneck_test2/0}.

no_pending_delete_bottleneck_test2() ->
% Populate B1 and B2. Then put till frozen both after reopening.
% Delete all keys in both. Merge in both. Unfreeze B2. With the bug
% B2 files will not be deleted.

Dir1 = "/tmp/bc.test.del.block.1",
Dir2 = "/tmp/bc.test.del.block.2",

Data = default_dataset(),
bitcask:close(init_dataset(Dir1, [{max_file_size, 1}], Data)),
bitcask:close(init_dataset(Dir2, [{max_file_size, 1}], Data)),

B1 = bitcask:open(Dir1, [read_write]),
B2 = bitcask:open(Dir2, [read_write]),

Files2 = readable_files(Dir2),

FileDeleted = fun(Fname) ->
not filelib:is_regular(Fname)
end,

FilesDeleted = fun() ->
lists:all(FileDeleted, Files2)
end,

try
bitcask:iterator(B1, -1, -1),
put_till_frozen(B1),
[begin
?assertEqual(ok, bitcask:delete(B1, K))
end || {K, _} <- Data],
bitcask:merge(Dir1),

bitcask:iterator(B2, -1, -1),
put_till_frozen(B2),
[begin
?assertEqual(ok, bitcask:delete(B2, K))
end || {K, _} <- Data],
bitcask:merge(Dir2),
bitcask:iterator_release(B2),

%% Timing is everything. This test will timeout in 60 seconds.
%% The merge delete worker works on a 1 second tick. So it should
%% have at least a chance to delete the files in 10 seconds.
%% Famous last words.
?assert(retry_until_true(FilesDeleted, 1000, 10))
after
catch bitcask:close(B1),
catch bitcask:close(B2)
end.



frag_status_test_() ->
{timeout, 60, fun frag_status_test2/0}.

Expand Down Expand Up @@ -3306,6 +3458,25 @@ scan_err_test_() ->
ok = bitcask:close(B)
end]}.

%% Make sure that an error in the user provided key transformation function
%% does not bring the whole thing down. It's possible to hit this in Riak
%% when finding keys from a downgrade and such.
no_crash_on_key_transform_test() ->
Dir = "/tmp/bc.key.tx.crash",
CrashTx = fun(_K) ->
throw(naughty_key_transform_failure)
end,
B = init_dataset(Dir, [read_write, {key_transform, CrashTx}],
default_dataset()),
bitcask:list_keys(B),
bitcask:fold(B, fun(K, _V, Acc0) -> [K|Acc0] end, [], -1, -1, true),
bitcask:merge(Dir, [{key_transform, CrashTx}]),
ok = bitcask:close(B),
B2 = bitcask:open(Dir, [{key_transform, CrashTx}]),
ok = bitcask:close(B2),
ok.


total_byte_stats_test_() ->
{timeout, 60, fun total_byte_stats_test2/0}.

Expand Down
Loading

0 comments on commit f3b4352

Please sign in to comment.