Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Clean up shape folders and remove unrecoverable shapes #2051

Merged
merged 8 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/hungry-apples-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@core/sync-service": patch
---

Clean up directories when removing shapes.
Remove corrupted shapes from store when recovery fails.
7 changes: 6 additions & 1 deletion packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ defmodule Electric.ShapeCache do
try do
{:ok, _pid, _snapshot_xmin, _latest_offset} = start_shape(shape_handle, shape, state)
rescue
e -> Logger.error("Failed to recover shape #{shape_handle}: #{inspect(e)}")
e ->
Logger.error("Failed to recover shape #{shape_handle}: #{inspect(e)}")

# clean up corrupted data to avoid persisting bad state
Electric.ShapeCache.Storage.for_shape(shape_handle, state.storage)
|> Electric.ShapeCache.Storage.unsafe_cleanup!()
end
end)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Electric.ShapeCache.CrashingFileStorage do
defdelegate get_log_stream(offset, max_offset, opts), to: FileStorage
defdelegate get_chunk_end_log_offset(offset, opts), to: FileStorage
defdelegate cleanup!(opts), to: FileStorage
defdelegate unsafe_cleanup!(opts), to: FileStorage

def shared_opts(opts) do
opts
Expand Down
35 changes: 24 additions & 11 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ defmodule Electric.ShapeCache.FileStorage do
:base_path,
:shape_handle,
:db,
:data_dir,
:cubdb_dir,
:shape_definition_dir,
:snapshot_dir,
:stack_id,
:extra_opts,
Expand All @@ -48,13 +48,15 @@ defmodule Electric.ShapeCache.FileStorage do
shape_handle,
%{base_path: base_path, stack_id: stack_id} = opts
) do
data_dir = Path.join([base_path, shape_handle])

%FS{
base_path: base_path,
shape_handle: shape_handle,
db: name(stack_id, shape_handle),
cubdb_dir: Path.join([base_path, shape_handle, "cubdb"]),
snapshot_dir: Path.join([base_path, shape_handle, "snapshots"]),
shape_definition_dir: Path.join([base_path, shape_handle]),
data_dir: data_dir,
cubdb_dir: Path.join([data_dir, "cubdb"]),
snapshot_dir: Path.join([data_dir, "snapshots"]),
stack_id: stack_id,
extra_opts: Map.get(opts, :extra_opts, %{})
}
Expand All @@ -81,7 +83,7 @@ defmodule Electric.ShapeCache.FileStorage do
end

defp initialise_filesystem(opts) do
with :ok <- File.mkdir_p(opts.shape_definition_dir),
with :ok <- File.mkdir_p(opts.data_dir),
:ok <- File.mkdir_p(opts.cubdb_dir),
:ok <- File.mkdir_p(opts.snapshot_dir) do
:ok
Expand All @@ -95,7 +97,7 @@ defmodule Electric.ShapeCache.FileStorage do
if stored_version != opts.version || snapshot_xmin(opts) == nil ||
not File.exists?(shape_definition_path(opts)) ||
not CubDB.has_key?(opts.db, @snapshot_meta_key) do
cleanup!(opts)
cleanup_internals!(opts)
end

CubDB.put(opts.db, @version_key, @version)
Expand Down Expand Up @@ -129,7 +131,7 @@ defmodule Electric.ShapeCache.FileStorage do
Enum.reduce(shape_handles, %{}, fn shape_handle, acc ->
shape_def_path =
shape_definition_path(%{
shape_definition_dir: Path.join([opts.base_path, shape_handle])
data_dir: Path.join([opts.base_path, shape_handle])
})

with {:ok, shape_def_encoded} <- File.read(shape_def_path),
Expand Down Expand Up @@ -312,8 +314,7 @@ defmodule Electric.ShapeCache.FileStorage do
|> Enum.at(0)
end

@impl Electric.ShapeCache.Storage
def cleanup!(%FS{} = opts) do
defp cleanup_internals!(%FS{} = opts) do
[
@snapshot_meta_key,
@xmin_key,
Expand All @@ -330,8 +331,20 @@ defmodule Electric.ShapeCache.FileStorage do
:ok
end

defp shape_definition_path(%{shape_definition_dir: shape_definition_dir} = _opts) do
Path.join(shape_definition_dir, @shape_definition_file_name)
@impl Electric.ShapeCache.Storage
def cleanup!(%FS{} = opts) do
:ok = cleanup_internals!(opts)
{:ok, _} = File.rm_rf(opts.data_dir)
:ok
end

@impl Electric.ShapeCache.Storage
def unsafe_cleanup!(%FS{} = opts) do
{:ok, _} = File.rm_rf(opts.data_dir)
end

defp shape_definition_path(%{data_dir: data_dir} = _opts) do
Path.join(data_dir, @shape_definition_file_name)
end

defp keys_from_range(min_key, max_key, opts) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ defmodule Electric.ShapeCache.InMemoryStorage do
:ok
end

@impl Electric.ShapeCache.Storage
def unsafe_cleanup!(%MS{} = opts), do: cleanup!(opts)

# Turns a LogOffset into a tuple representation
# for storing in the ETS table
defp storage_offset(offset) do
Expand Down
13 changes: 13 additions & 0 deletions packages/sync-service/lib/electric/shape_cache/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ defmodule Electric.ShapeCache.Storage do
@doc "Clean up snapshots/logs for a shape handle"
@callback cleanup!(shape_opts()) :: :ok

@doc """
Clean up snapshots/logs for a shape handle by deleting whole directory.

Does not require any extra storage processes to be running, but should only
be used if the shape is known to not be in use to avoid concurrency issues.
"""
@callback unsafe_cleanup!(shape_opts()) :: :ok

@behaviour __MODULE__

@last_log_offset LogOffset.last()
Expand Down Expand Up @@ -187,4 +195,9 @@ defmodule Electric.ShapeCache.Storage do
def cleanup!({mod, shape_opts}) do
mod.cleanup!(shape_opts)
end

@impl __MODULE__
def unsafe_cleanup!({mod, shape_opts}) do
mod.unsafe_cleanup!(shape_opts)
end
end
2 changes: 1 addition & 1 deletion packages/sync-service/test/electric/plug/utils_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ defmodule Electric.Plug.UtilsTest do
|> Electric.Plug.Utils.hold_conn_until_stack_ready([])
end)

Process.sleep(50)
Process.sleep(100)

Electric.StackSupervisor.dispatch_stack_event(Registry.StackEvents, ctx.stack_id, :ready)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,49 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do
Electric.ShapeCache.Storage.get_all_stored_shapes({storage, opts})
end
end

describe "#{module_name}.unsafe_cleanup/1" do
setup do
{:ok, %{module: unquote(module)}}
end

setup :start_storage

test "should remove entire data directory without requiring process to run", %{
module: storage,
opts: opts,
pid: pid
} do
lsn = Lsn.from_integer(1000)

log_items =
[
%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "123", "name" => "Test"},
log_offset: LogOffset.new(lsn, 0)
}
]
|> changes_to_log_items()

storage.append_to_log!(log_items, opts)

Process.exit(pid, :normal)

assert File.exists?(opts.data_dir)

storage.unsafe_cleanup!(opts)

refute File.exists?(opts.data_dir)
end
end
end

defp start_storage(%{module: module} = context) do
opts = module |> opts(context) |> module.shared_opts()
shape_opts = module.for_shape(@shape_handle, opts)
{:ok, _} = module.start_link(shape_opts)
{:ok, %{module: module, opts: shape_opts}}
{:ok, pid} = module.start_link(shape_opts)
{:ok, %{module: module, opts: shape_opts, pid: pid}}
end

defp opts(InMemoryStorage, %{stack_id: stack_id}) do
Expand Down
6 changes: 6 additions & 0 deletions packages/sync-service/test/support/test_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,10 @@ defmodule Support.TestStorage do
send(parent, {__MODULE__, :cleanup!, shape_handle})
Storage.cleanup!(storage)
end

@impl Electric.ShapeCache.Storage
def unsafe_cleanup!({parent, shape_handle, _, storage}) do
send(parent, {__MODULE__, :unsafe_cleanup!, shape_handle})
Storage.unsafe_cleanup!(storage)
end
end
Loading