Skip to content

Commit

Permalink
Load search index asynchronously
Browse files Browse the repository at this point in the history
Depending on the computer and size of the project being indexed, it was
possible for indexing to take longer than the previous 20 second
timeout. Additionally, since indexing was blocking the store GenServer,
any calls made to the Store during a long indexing would be likely to
time out (default GenServer call timeout is 5 seconds).

This commit updates the `Store.State` API to load asynchronously so that
the store can continue to respond while indexing occurs in a task, while
the indexing itself has had its timeout changed to `:infinity`.
  • Loading branch information
zachallaun committed Sep 16, 2023
1 parent df49aab commit ec30636
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule Lexical.RemoteControl.Search.Indexer do
# function takes around 4 seconds to reindex all Lexical's
# modules, making it 5x faster than Enum and 4x faster than
# Task.async_stream
defp async_chunks(data, processor, timeout \\ 20_000) do
defp async_chunks(data, processor, timeout \\ :infinity) do
data
|> Stream.chunk_every(System.schedulers_online())
|> Enum.map(fn chunk ->
Expand Down
46 changes: 27 additions & 19 deletions apps/remote_control/lib/lexical/remote_control/search/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ defmodule Lexical.RemoteControl.Search.Store do
{:ok, new_entries} | {:error, term()})

@typedoc """
A function that takes existing entries and refreshes them if necessary
A function that takes existing entries and updates them if necessary
"""
@type refresh_index ::
@type update_index ::
(project :: Project.t(), entries :: existing_entries ->
{:ok, new_entries, paths_to_delete} | {:error, term()})

Expand All @@ -42,6 +42,10 @@ defmodule Lexical.RemoteControl.Search.Store do
GenServer.call(__MODULE__, :all)
end

def loaded? do
GenServer.call(__MODULE__, :loaded?)
end

def replace(entries) do
GenServer.call(__MODULE__, {:replace, entries})
end
Expand All @@ -66,34 +70,34 @@ defmodule Lexical.RemoteControl.Search.Store do
GenServer.call(__MODULE__, {:unique_fields, fields})
end

@spec start_link(Project.t(), create_index, refresh_index) :: GenServer.on_start()
def start_link(%Project{} = project, create_index, refresh_index) do
start_link([project, create_index, refresh_index])
@spec start_link(Project.t(), create_index, update_index) :: GenServer.on_start()
def start_link(%Project{} = project, create_index, update_index) do
start_link([project, create_index, update_index])
end

def start_link([%Project{} = project, create_index, refresh_index]) do
GenServer.start_link(__MODULE__, [project, create_index, refresh_index], name: __MODULE__)
def start_link([%Project{} = project, create_index, update_index]) do
GenServer.start_link(__MODULE__, [project, create_index, update_index], name: __MODULE__)
end

def start_link([create_index, refresh_index]) do
start_link(Lexical.RemoteControl.get_project(), create_index, refresh_index)
def start_link([create_index, update_index]) do
start_link(Lexical.RemoteControl.get_project(), create_index, update_index)
end

def init([%Project{} = project, create_index, update_index]) do
state = State.new(project, create_index, update_index)
state =
project
|> State.new(create_index, update_index)
|> State.async_load()

{:ok, state, {:continue, :load}}
{:ok, state}
end

def handle_continue(:load, %State{} = state) do
case State.load(state) do
{:ok, state} ->
{:noreply, state}
def handle_info({ref, result}, %State{async_load_ref: ref} = state) do
{:noreply, State.async_load_complete(state, result)}
end

{:error, _} ->
Logger.warning("Could not load nor build search state")
{:noreply, state}
end
def handle_info(_, state) do
{:noreply, state}
end

def handle_call({:replace, entities}, _from, %State{} = state) do
Expand Down Expand Up @@ -164,4 +168,8 @@ defmodule Lexical.RemoteControl.Search.Store do
State.drop(state)
{:reply, :ok, state}
end

def handle_call(:loaded?, _, %State{loaded?: loaded?} = state) do
{:reply, loaded?, state}
end
end
138 changes: 90 additions & 48 deletions apps/remote_control/lib/lexical/remote_control/search/store/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ defmodule Lexical.RemoteControl.Search.Store.State do

@index_path Path.join("indexes", "source.index.ets")

defstruct [:project, :index_path, :create_index, :update_index, :loaded?, :fuzzy, :ets_table]
defstruct [
:project,
:index_path,
:create_index,
:update_index,
:loaded?,
:fuzzy,
:ets_table,
:async_load_ref
]

def new(%Project{} = project, create_index, update_index) do
%__MODULE__{
Expand All @@ -19,42 +28,58 @@ defmodule Lexical.RemoteControl.Search.Store.State do
|> ensure_index_directory_exists()
end

def drop(%__MODULE__{}) do
Ets.drop()
end

def metadata(%__MODULE__{} = state) do
with {:ok, state} <- load(state) do
{:ok, Ets.find_metadata(), state}
end
end

def unique_fields(%__MODULE__{} = state, fields) do
with {:ok, state} <- load(state) do
{:ok, Ets.select_unique_fields(fields), state}
end
end

def load(%__MODULE__{loaded?: true} = state) do
{:ok, state}
end

def load(%__MODULE__{} = state) do
def async_load(%__MODULE__{loaded?: false, async_load_ref: nil} = state) do
case Ets.new(state.index_path) do
{:ok, :empty, table_name} ->
new_state = %__MODULE__{state | ets_table: table_name}
reindex_and_load(new_state)
create_index_async(new_state)

{:ok, :stale, table_name} ->
new_state = %__MODULE__{state | ets_table: table_name}
refresh_entries_and_load(new_state)
update_index_async(new_state)

error ->
Logger.error("Could not initialize index due to #{inspect(error)}")
error
end
end

def async_load(%__MODULE__{} = state) do
{:ok, state}
end

def async_load_complete(%__MODULE__{} = state, result) do
new_state = Map.merge(state, %{loaded?: true, async_load_ref: nil})

case result do
{:create_index, result} ->
create_index_complete(new_state, result)

{:update_index, result} ->
update_index_complete(new_state, result)
end
end

def drop(%__MODULE__{}) do
Ets.drop()
end

def metadata(%__MODULE__{loaded?: true} = state) do
{:ok, Ets.find_metadata(), state}
end

def metadata(%__MODULE__{}) do
{:error, :not_loaded}
end

def unique_fields(%__MODULE__{loaded?: true} = state, fields) do
{:ok, Ets.select_unique_fields(fields), state}
end

def unique_fields(%__MODULE__{}, _fields) do
{:error, :not_loaded}
end

def index_path(%Project{} = project) do
Project.workspace_path(project, @index_path)
end
Expand Down Expand Up @@ -120,35 +145,52 @@ defmodule Lexical.RemoteControl.Search.Store.State do
state
end

defp refresh_entries_and_load(%__MODULE__{} = state) do
entries = all(state)
defp create_index_async(%__MODULE__{async_load_ref: nil} = state) do
task = Task.async(fn -> {:create_index, state.create_index.(state.project)} end)
%__MODULE__{state | async_load_ref: task.ref}
end

defp create_index_complete(%__MODULE__{} = state, {:ok, entries}) do
case replace(state, entries) do
{:ok, state} ->
state

with {:ok, updated_entries, deleted_paths} <- state.update_index.(state.project, entries) do
fuzzy = Fuzzy.from_entries(entries)
starting_state = %__MODULE__{state | fuzzy: fuzzy, loaded?: true}
{:error, _} ->
Logger.warning("Could not replace entries")
state
end
end

new_state =
updated_entries
|> Enum.group_by(& &1.path)
|> Enum.reduce(starting_state, fn {path, entry_list}, state ->
{:ok, new_state} = update_nosync(state, path, entry_list)
new_state
end)
defp create_index_complete(%__MODULE__{} = state, {:error, _} = error) do
Logger.warning("Could not create index, got: #{inspect(error)}")
state
end

new_state =
Enum.reduce(deleted_paths, new_state, fn path, state ->
{:ok, new_state} = update_nosync(state, path, [])
new_state
end)
defp update_index_async(%__MODULE__{async_load_ref: nil} = state) do
task = Task.async(fn -> {:update_index, state.update_index.(state.project, all(state))} end)
%__MODULE__{state | async_load_ref: task.ref}
end

{:ok, %__MODULE__{new_state | loaded?: true}}
end
defp update_index_complete(%__MODULE__{} = state, {:ok, entries, deleted_paths}) do
fuzzy = Fuzzy.from_entries(entries)
starting_state = %__MODULE__{state | fuzzy: fuzzy, loaded?: true}

new_state =
entries
|> Enum.group_by(& &1.path)
|> Enum.reduce(starting_state, fn {path, entry_list}, state ->
{:ok, new_state} = update_nosync(state, path, entry_list)
new_state
end)

Enum.reduce(deleted_paths, new_state, fn path, state ->
{:ok, new_state} = update_nosync(state, path, [])
new_state
end)
end

defp reindex_and_load(%__MODULE__{} = state) do
with {:ok, entries} <- state.create_index.(state.project),
{:ok, state} <- replace(state, entries) do
{:ok, %__MODULE__{state | loaded?: true, fuzzy: Fuzzy.from_entries(entries)}}
end
defp update_index_complete(%__MODULE__{} = state, {:error, _} = error) do
Logger.warning("Could not update index, got: #{inspect(error)}")
state
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Lexical.RemoteControl.Dispatch.Handlers.IndexingTest do

import Api.Messages
import Lexical.Test.CodeSigil
import Lexical.Test.EventualAssertions
import Lexical.Test.Fixtures

use ExUnit.Case
Expand All @@ -20,6 +21,8 @@ defmodule Lexical.RemoteControl.Dispatch.Handlers.IndexingTest do
start_supervised!({Search.Store, [project, create_index, update_index]})
start_supervised!(Document.Store)

assert_eventually Search.Store.loaded?()

{:ok, state} = Indexing.init([])
{:ok, state: state, project: project}
end
Expand Down
Loading

0 comments on commit ec30636

Please sign in to comment.