Skip to content

Commit

Permalink
captures/rescues 410 Gone inside chunk plus some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Ruoss committed May 12, 2022
1 parent 6a74b12 commit f287479
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 59 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Request BOOKMARK events and process them when watching resource collectons.
- Fix 410 Gone not rescued in `watch_and_stream/2`
- Request BOOKMARK events and process them when watching resource collections.

## [1.1.4] - 2022-03-15

Expand Down
122 changes: 64 additions & 58 deletions lib/k8s/client/runner/watch/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,12 @@ defmodule K8s.Client.Runner.Watch.Stream do
@spec resource(K8s.Conn.t(), K8s.Operation.t(), keyword()) :: Enumerable.t() | {:error, any()}
def resource(conn, operation, http_opts) do
Stream.resource(
fn -> {:start, %{conn: conn, operation: operation, http_opts: http_opts}} end,
fn -> {:start, %__MODULE__{conn: conn, operation: operation, http_opts: http_opts}} end,
&next_fun/1,
fn _state -> :ok end
)
end

@spec get_latest_rv_and_watch(K8s.Conn.t(), K8s.Operation.t(), keyword()) ::
{:ok, t()} | {:error, any()}
defp get_latest_rv_and_watch(conn, operation, http_opts) do
with {:ok, resource_version} <- Watch.get_resource_version(conn, operation) do
watch(conn, operation, resource_version, http_opts)
end
end

@spec watch(K8s.Conn.t(), K8s.Operation.t(), binary(), keyword()) ::
{:ok, t()} | {:error, any()}
defp watch(conn, operation, resource_version, http_opts) do
http_opts =
http_opts
|> Keyword.put_new(:params, [])
|> put_in([:params, :resourceVersion], resource_version)
|> put_in([:params, :allowWatchBookmarks], true)
|> put_in([:params, :watch], true)
|> Keyword.put(:stream_to, self())
|> Keyword.put(:async, :once)

with {:ok, ref} <- Base.run(conn, operation, http_opts) do
{:ok,
%__MODULE__{
resp: %HTTPoison.AsyncResponse{id: ref},
conn: conn,
operation: operation,
resource_version: resource_version,
http_opts: http_opts
}}
end
end

@docp """
Producing the next elements in the stream.
* If the accumulator is {:recv, state}, receives and processes events from the HTTPoison process
Expand All @@ -85,7 +53,8 @@ defmodule K8s.Client.Runner.Watch.Stream do
library: :k8s
)

{[], {:start, state}}
new_state = struct!(state, resource_version: nil)
{[], {:start, new_state}}

%HTTPoison.AsyncHeaders{} ->
HTTPoison.stream_next(state.resp)
Expand All @@ -97,11 +66,12 @@ defmodule K8s.Client.Runner.Watch.Stream do

%HTTPoison.AsyncStatus{code: 410} ->
Logger.warn(
@log_prefix <> "410 Gone received from watcher - trying to restart",
@log_prefix <> "410 Gone received from watcher - tryin to restart watcher",
library: :k8s
)

{[], {:start, state}}
new_state = struct!(state, resource_version: nil)
{[], {:start, new_state}}

%HTTPoison.AsyncStatus{code: _} = error ->
Logger.warn(
Expand All @@ -125,16 +95,7 @@ defmodule K8s.Client.Runner.Watch.Stream do
library: :k8s
)

%{
conn: conn,
operation: operation,
http_opts: http_opts,
resource_version: resource_version
} = state

new_http_opts = http_opts |> put_in([:params, :resourceVersion], resource_version)
{:ok, ref} = Base.run(conn, operation, new_http_opts)
{[], {:recv, %{state | resp: %HTTPoison.AsyncResponse{id: ref}}}}
{[], {:start, state}}

other ->
Logger.debug(
Expand All @@ -152,10 +113,41 @@ defmodule K8s.Client.Runner.Watch.Stream do
Tries to make new HTTPoison watcher request (self-healing)
"""
@spec next_fun({:start, map()}) :: {[map()], {:recv, t()}} | {:halt, nil}
defp next_fun({:start, %{conn: conn, operation: operation, http_opts: http_opts}}) do
case get_latest_rv_and_watch(conn, operation, http_opts) do
{:ok, state} ->
{[], {:recv, state}}
defp next_fun({:start, state}) do
%{conn: conn, operation: operation, http_opts: http_opts} = state

#  get latest resource version if it is missing in the state
resource_version =
case Map.get(state, :resource_version) do
nil ->
{:ok, resource_version} = Watch.get_resource_version(conn, operation)
resource_version

rv ->
rv
end

#  prepare http_opts
http_opts =
http_opts
|> Keyword.put_new(:params, [])
|> put_in([:params, :resourceVersion], resource_version)
|> put_in([:params, :allowWatchBookmarks], true)
|> put_in([:params, :watch], true)
|> Keyword.put(:stream_to, self())
|> Keyword.put(:async, :once)

#  start the watcher
case Base.run(conn, operation, http_opts) do
{:ok, ref} ->
new_state =
struct!(
state,
resp: %HTTPoison.AsyncResponse{id: ref},
resource_version: resource_version
)

{[], {:recv, new_state}}

error ->
Logger.error(
Expand Down Expand Up @@ -213,6 +205,28 @@ defmodule K8s.Client.Runner.Watch.Stream do

{events, acc}

{:ok, %{"type" => "ERROR", "object" => %{"message" => message, "code" => 410} = object}},
{events, {_, state}} ->
Logger.notice(
@log_prefix <> "#{message} - tryin to restart watcher",
library: :k8s,
object: object
)

new_state = struct!(state, resource_version: nil)
{events, {:start, new_state}}

{:ok, %{"object" => %{"message" => message} = object}}, {events, {_, state}} ->
Logger.error(
@log_prefix <>
"Erronous event received from watcher: #{message} - tryin to restart watcher",
library: :k8s,
object: object
)

new_state = struct!(state, resource_version: nil)
{events, {:start, new_state}}

{:ok, %{"type" => "BOOKMARK", "object" => object}}, {events, {:recv, state}} ->
{events,
{:recv, %__MODULE__{state | resource_version: object["metadata"]["resourceVersion"]}}}
Expand All @@ -226,14 +240,6 @@ defmodule K8s.Client.Runner.Watch.Stream do
# new resource_version => append new event to the stream
{events ++ [new_event],
{:recv, %__MODULE__{state | resource_version: object["metadata"]["resourceVersion"]}}}

{:ok, %{"object" => %{"message" => message}}}, {events, {_, state}} ->
Logger.error(
@log_prefix <> "Erronous event received from watcher: #{message}",
library: :k8s
)

{events, {:start, state}}
end)
end
end

0 comments on commit f287479

Please sign in to comment.