Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race Condition in Event Queuing When MODIFIED Arrive After CREATE but Before last-handled-configuration Was Written #729

Open
paxbit opened this issue Apr 1, 2021 · 14 comments
Labels
bug Something isn't working

Comments

@paxbit
Copy link

paxbit commented Apr 1, 2021

Long story short

MODIFIED events on just CREATED resources might arrive before last-handled-configuration was written. This leads to the MODIFIED event being treated as Reason.CREATE b/c its old version is still empty.


Loading the (empty) old manifest is tried here:

old = settings.persistence.diffbase_storage.fetch(body=body)


Falsely setting the cause reason to CREATE as a result of the empty old manifest is done here:

if old is None: # i.e. we have no essence stored
kwargs['initial'] = False
return ResourceChangingCause(reason=handlers.Reason.CREATE, **kwargs)


The handler is not being called b/c its cause does not match the resource changing cause:

if handler.reason is None or handler.reason == cause.reason:

Description

If the handler creating a resource via 3rd-party means like pykube still spends a small amount of time after creating the resource before returning, a quick update-after-create to the resource will queue up MODIFIED events before kopf had a chance to write its last-handled-config.

The following code snipped reproduces this. We had a situation where a 2-container pod had one container immediately crashing after creation. When this happened quickly enough after the pod was created the handler designated to deal with crashing containers was never called. Since I'm working from home via a DSL link to the data center where the cluster lives, the varying connection latency over the day through the VPN gateway is sometimes enough to trigger this. But only after today's lucky setting of a break-point (introducing a sufficient handler delay) right after the pod creation I was able to reliably reproduce it and find the root cause.

All the handler does btw after creating the pod is creating an event about the fact as well as setting kopf's patch dict.

I believe this one to be broken at the queuing design level and have no good idea how to fix this. After looking at this I'm not sure the current implementation can be fixed for correctness without substantial rewrites (memories, maybe?). The assumptions currently made around last-handled-configuration can never be fully upheld as long as third parties other than kopf (a.k.a. pykube, kubernetes itself) modify resources too - which will of course always be true.
However I'd be very happily proven wrong. Maybe the already queued MODIFIED events sans kopf storage annotations can be augmented in-memory with the missing data by remembering the CREATED event long enough. IDK.

The following script:

  1. Creates a pod with two containers. One of them crashes after 1s.
  2. Then the handler time.sleeps for 2s.
  3. on_update(...) is never called and "wonky's status was updated: ..." is missing from the output.

To make it work:
Comment the time.sleep(2) after pod creation. The on_update(...) handler will be called.

Note
Running this script the first time might actually trigger on_update. This would be b/c the alpine image might need to be pulled. If this takes longer than the 2s sleep, there will be MODIFIED events after that and kopf might have had enough time to write a last-handled-config. If the image is already there it should fail the first time - except maybe when run on very slow or loaded clusters so the container takes longer to crash. Simply increase the sleep to 3-4s then to still trigger it.

event_race_bug.py
import time

import kopf
import pykube

podspec = {
    "apiVersion": "v1",
    "kind": "Pod",
    "metadata": {"name": "wonky", "namespace": "default"},
    "spec": {
        "containers": [
            {
                "args": [
                    "-c",
                    "\"echo 'Hello, sleeping for 1s'; sleep 1; echo 'Falling over now...'\"",
                ],
                "command": ["/bin/sh"],
                "image": "alpine:latest",
                "imagePullPolicy": "IfNotPresent",
                "name": "broken",
            },
            {
                "args": [
                    "-c",
                    "\"echo 'Hello, I'll stay alive much longer'; sleep 3600; echo 'Falling over now...'\"",
                ],
                "image": "alpine:latest",
                "imagePullPolicy": "IfNotPresent",
                "name": "sane",
            },
        ],
        "dnsPolicy": "ClusterFirst",
        "restartPolicy": "Never",
        "terminationGracePeriodSeconds": 30,
    },
}

k_api: pykube.HTTPClient = pykube.HTTPClient(pykube.KubeConfig.from_env())

@kopf.on.startup()
async def create_pod(**_):
    pod = pykube.Pod(k_api, podspec)
    # uncomment this if you're running the script multiple times and do not want to manually delete the pod each time
    # pod.delete() 
    pod.create()
    # comment the following line to make the example work and allow on_update being called
    time.sleep(2)


@kopf.on.update(
    "",
    "v1",
    "pods",
    field="status",
)
async def on_update(name, status, **_):
    print(f"{name}'s status was updated: {status.get('phase')}")
The exact command to reproduce the issue
kopf run event_race_bug.py

I hope somebody proves me wrong with my analysis, I really do, because if I'm correct it means that by definition I'll never be able to implement a correctly behaving operator using kopf as I would have to expect subtle errors like this one without any way to detect them through kopfs API.

Environment

  • Kopf version: 1.30.3
  • Kubernetes version: 1.17
  • Python version: 3.9.2
  • OS/platform: Linux
@paxbit paxbit added the bug Something isn't working label Apr 1, 2021
@paxbit paxbit changed the title Race Condition in Event Queuing When Modified Arrive After Create but Before Last-Handled-Configuration Was Written Race Condition in Event Queuing When MODIFIED Arrive After CREATE but Before last-handled-configuration Was Written Apr 2, 2021
@nolar
Copy link
Owner

nolar commented Apr 3, 2021

Hello, @paxbit.

Yes, you might be right with this behaviour. Kopf has some protection measures against quickly arriving events — the "batch window" (settings.batching.batch_window, defaults to 0.1 seconds). All events that arrived within this time window are reduced only to the latest event, which means the latest object state. All intermediate events/states are ignored.

One sure problem that I see in your operator is using async def handler for startup and a synchronous time.sleep() call inside. This would block the whole asyncio event loop for 2 seconds, not only this handlers and not only the operator. This goes against the rules of async programming and is documented in https://kopf.readthedocs.io/en/stable/async/.

Such blocking freeze of the whole process is functionally equivalent to the operator process/pod being terminated/suspended for 2 seconds (or, say, 2 minutes), and then restarted/resumed — to find out that the pod has changed.

A better and proper way would be to either do await asyncio.sleep(2) in the async def handler, or define the startup handler as def (not async).

# EITHER:
@kopf.on.startup()
async def create_pod(**_):   # <<< async!
    await asyncio.sleep(2)   # <<< async!

# OR:
@kopf.on.startup()
def create_pod(**_):         # <<< not async!
    time.sleep(2)            # <<< not async!

This will sufficiently unblock the event loop to react to the pod's creation fast enough, and to putting the last-handled configuration also fast enough. As a result, the update handler will be called properly in the majority of cases.

Since pykube itself is synchronous, I recommend the sync-way: def & time.sleep().


However, there will be a short period when this issue can appear — maybe, as long as a fraction of a second on slow connections (operator<->cluster), and near-instant (but non-zero) on fast connections.

Specifically, if a pod is changed by external actors between its actual creation and within the above-mentioned "batch window". In that case, an already changed pod will arrive to Kopf, and will be seen as a creation with an already modified state.

Almost nothing can be done in that case. Kopf implements the state-driven approach of Kubernetes, so the first time it "sees" the object's state is when it is already quick-modified. The initial non-modified state does not exist for Kopf, so there is nothing to compare against.

You can try setting settings.batching.batch_window to 0 to disable batching. But I cannot state it is a good idea and which side effects it will produce. Perhaps, some significant load on the operator side (because pods' statuses change a lot and often).


Blindly assuming the goals of your operator, I would rather recommend one of these 3 approaches instead:

Approach 1: React to all kinds of resource changes for that field, including its creation; or, better said, its first appearance.

@kopf.on.create('pods', field='status')
@kopf.on.update('pods', field='status')
#@kopf.on.resume('pods', field='status')  # might be a good idea
async def status_updated(name, status, **_):
    print(f"{name}'s status was updated: {status.get('phase')}")

Approach 2: React to the low-level events only. It still is the latest state within the batch window, but you can reduce the codebase for that. A downside: such handlers are fire-and-forget, they are not retried in case of errors.

@kopf.on.event('pods', field='status')
async def status_updated(name, status, **_):
    print(f"{name}'s status was updated: {status.get('phase')}")

Approach 3: Use the on-field handlers, which is the equivalent of on-creation + on-update as above, with some subtle nuances.

@kopf.on.field('pods', field='status')
async def status_updated(name, status, old, new, **_):
    print(f"{name}'s status was updated: {status.get('phase')}")

In all approaches, on the first appearance of the object, it will be a change of .status from None to {...whatever-is-there..., "phase": ...}. On all further changes, it will be a change from {...} to {...} (from a dict to another dict).

@paxbit
Copy link
Author

paxbit commented Apr 6, 2021

Hello, @nolar,

One sure problem that I see in your operator is using async def handler for startup and a synchronous time.sleep() call inside. This would block the whole asyncio event loop for 2 seconds, not only this handlers and not only the operator. This goes against the rules of async programming and is documented in https://kopf.readthedocs.io/en/stable/async/.

Such blocking freeze of the whole process is functionally equivalent to the operator process/pod being terminated/suspended for 2 seconds (or, say, 2 minutes), and then restarted/resumed — to find out that the pod has changed.

A better and proper way would be to either do await asyncio.sleep(2) in the async def handler, or define the startup handler as def (not async).

Errm, yes I'm quite aware of that and I actually though about mentioning this but thought it'd be too obvious ;) But I guess I may understand your maintainer perspective and why pointing out loop blocking comes to mind first. Maybe, I could have been more clear about that.
The code is not in my operator but in this example for reproduction purposes to simulate "a delay". That's why I wrote about the actual situation we observe in the second paragraph of the description. This was only to make the point that in the presence of async handler tasks running longer than a HTTP round-trip to the cluster API, kopf does not guarantee correctness of its own assumptions about state storage. Those two (the async handler and any mutation through the API) are always racing against each other in the current implementation. "Usually" kopf wins. But when it does not it fails in a very unpleasant way. There is no error, the handler is simply ignored.
The situations where this can happen are entirely non-esoteric and do happen in the real world "frequently". Imagine a load-balanced Kube API behind a VRRP domain. If one API endpoint behind the VIP(s) in the group fails, within the fail-over window, kopfs storage patch might be delayed at best or lost at worst. Simple temporary network congestion, data-center fail-overs or partitions and many other net-related things might do the same. Or some third party software might simply be quicker in triggering a MODIFIED, maybe b/c the node where an operator based on kopf is currently scheduled is simply CPU stalled and swapping like crazy for some misconfiguration reason.

However, there will be a short period when this issue can appear — maybe, as long as a fraction of a second on slow connections (operator<->cluster), and near-instant (but non-zero) on fast connections.

Yes, this is what I see.

Specifically, if a pod is changed by external actors between its actual creation and within the above-mentioned "batch window". In that case, an already changed pod will arrive to Kopf, and will be seen as a creation with an already modified state.

That's what I meant.

Almost nothing can be done in that case. Kopf implements the state-driven approach of Kubernetes, so the first time it "sees" the object's state is when it is already quick-modified. The initial non-modified state does not exist for Kopf, so there is nothing to compare against.

But it's still the case that kopf sees the CREATE event before the MODIFIED event. It's just that all of kopfs iterations necessary to patch a storage annotation into the resource did not happen yet. This can be detected I think. The way I read kopfs code is such that a MODIFIED event without a storage annotation is undefined in terms of triggering any discrete handling of that fact. Couldn't the batching (debounce) implementation maybe track event sequences until pending storage patching is done? I mean as soon as kopf sees a resource event it could, while debouncing, decide if it is managed by it. After doing that it could remember the pending storage patch in-memory and implicitly apply it to every consecutive event for the resource until it sees the patch in the data of an incoming event. I think that would go a long way in putting the two racers on separate tracks.

You can try setting settings.batching.batch_window to 0 to disable batching. But I cannot state it is a good idea and which side effects it will produce. Perhaps, some significant load on the operator side (because pods' statuses change a lot and often).

So, then, I'd rather not ;)

@paxbit
Copy link
Author

paxbit commented Apr 8, 2021

About the 3 approaches you suggested:

  1. Would collide with already existing discrete on.create handlers doing something entirely different. Also, the code would then "lie" to any future developer but me. I'd have to write a comment like: "No, this is not actually a CREATE handler but a workaround for Race Condition in Event Queuing When MODIFIED Arrive After CREATE but Before last-handled-configuration Was Written #729"
  2. I would not like to go with those because of the impaired durability implication.
  3. How would I discern CREATED from MODIFIED events when using on.field(...)? Due to the nature of this bug I cannot rely on old is None to detect creation.

I would say we have not yet seen or imagined all possible side-effects of this race. For instance I think it should be possible to not just lose handler invocations but to trigger more handler invocations than desired. If an in-flight handler progress patch is preceded by one or more MODIFIED events in quick succession kopf might not see that e.g. the handler was already started, or did already terminate, or has already exhausted all configured retries. Does that sound reasonable or would that somehow be prevented?

@paxbit
Copy link
Author

paxbit commented Apr 8, 2021

Hi @nolar,

I had a look at the batching and storage implementations. I did not implement anything yet but what do you think about the following ideas?

  1. Make the debouncing optional by allowing settings.batching.batch_window = None and then guard it by if settings.batching.batch_window is not None: before the try here:

    try:
    while True:
    prev_event = raw_event
    next_event = await asyncio.wait_for(
    backlog.get(),
    timeout=settings.batching.batch_window)
    shouldstop = shouldstop or isinstance(next_event, EOS)
    raw_event = prev_event if isinstance(next_event, EOS) else next_event
    except asyncio.TimeoutError:
    pass
    entirely

  2. For each of the ProgressStorage write method implementations implement something like memory.put_storage_for(body, patch, some_version_counter) and for the fetch method do sth. like:

resource_storage=storage.get_from_body(body)
in_memory_storage=memory.for_resource(body)

if not in_memory_storage and not resource_storage:
  # no storage at all
  return empty

if in_memory_storage and not resource_storage:
  # 1st patch, might still be underway, or lost. Doesn't matter.
  return in_memory_storage

if not in_memory_storage and resource_storage:
  # patch "arrived" at the resource and memory was already invalidated.
  return resource_storage

if in_memory_storage is_older_or_equals resource_storage:
  # patch "arrived" at the resource. Invalidate memory.
  purge in_memory_storage
  return resource_storage

if in_memory_storage is_newer_than resource_storage:
  # patch might still be underway, or lost. Doesn't matter.
  return in_memory_storage

What do you think?

@nolar
Copy link
Owner

nolar commented Apr 8, 2021

Errm, yes I'm quite aware of that and I actually thought about mentioning this but thought it'd be too obvious ;)

Sorry, I didn't have any intention to question your skills. But this was an obvious problem, so indeed, I just reacted to that.

kopf might not see that e.g. the handler was already started
Does that sound reasonable or would that somehow be prevented?

On the one hand, while individual objects are parallelised (via the multiplexing logic), each individual object is always handled sequentially. It means the "worker" for that individual object is not doing anything because its task is somewhere in the handler currently, so it will not start a new handler until the previous one is finished.

The events are accumulated in the backlog queue for that worker. Once the handler is finished, the call stack for that individual resource goes back to the top, i.e. to the worker, where it depletes the whole backlog queue to get the latest seen state to be processed.

The "batch window" was added to ensure that even changes introduced by that handler are taken into account.


On the other hand, you are right. If the operator<->cluster latency is high and exceeds the batching window, it might be so that we have a sequence of states A B pause C D; the handlers for the state B are triggered after the first pause; while they work, states C D arrive into the queue. If the handlers do patch the resource, they create a new state E. In normal cases, it should end up with the sequence "A B pause C D E pause", and the new E state will be processed as desired.

However, if the latency is high, it might be so that the sequence looks like "A B pause C D latency E pause". As a result, the handlers will be invoked for state D when the resource is already in state E, and Kopf just does not know about that. And so, the double-execution is possible.

As a possible fix, patching the resource by Kopf should remember its resource version (r.v.) and return it back to the worker. The worker must deplete the backlog "at least" till that reported r.v. is seen — before switching to the batching mode. In other words, if it sees the backlog as depleted, but the r.v. is not yet seen, it should wait an extra time.

Since resource versions are documented as "opaque", we cannot even compare them with < or >. It should really be a binary seen/not-seen flag.

However, as a safety measure from skipping the states (I don't trust the K8s watch-stream either), a secondary time-window should be added: "deplete the backlog until a specific r.v. is seen, but at most for 5.0 seconds, before considering the batch as over; then, switch to regular batching with the 0.1s window". Something like this.


This issue seems complicated and deep, but very interesting 😅 I will try to reproduce the latency issues locally (I remember there were some tools to simulate slow TCP) — and to see how these cases can be optimized.

Sadly, I have to do my regular work in the daytime. So I can get to this problem only on the coming weekend at the earliest, maybe the next one. Luckily, there are no big major roadmap features to add now, so I can focus on stability & optimizations.

@nolar
Copy link
Owner

nolar commented Apr 8, 2021

  1. Yes, batch_window=None seems like a good option for those who want it. The docstring must explain the consequences briefly.

  2. I didn't fully get what is happening in this example, but I prefer to not touch memory at all. First of all, I am not happy that I had to introduce per-resource memories in the first place, as it explodes the RAM usage for large clusters — but that was needed for some non-serialisable mini-objects. Putting the whole body or even its state is too much (UPD: but it does so for daemons/timers — again, there was no other choice).

  3. However, a new class InMemoryProgressStorage(kopf.ProgressStorage): ... can be easily implemented and combined with kopf.AnnotationsProgressStorage via a kopf.MultiProgressStorage (there is an example for "Smart/Transitional" storage or something, which is now used by default). As I understand the intention, that might help, and it does not require changing the framework — it can be extended in the operator initially.

@paxbit
Copy link
Author

paxbit commented Apr 9, 2021

Hi @nolar,

Thanks for the answer and also for taking the time to look into it!

  1. In my opinion it is an error to generalize event debouncing for all operators using kopf. So I think the docstring should explain both situations and I think the default should be to not debounce at all. Also it should explain the current situation and state the trade-off being made with the batching_window. Namely the debouncing introduces a lossiness into the event stream of a resource which might break some use-cases. It trades verity for performance and implementation simplicity. I think it is wrong to assume everybody is OK in every situation to only receive the last event of a burst.
    I make the case there should be a facility in kopf allowing to make discrete decisions about debouncing, maybe per resource type. So one can say things like: I'm OK with only ever seeing the latest version of my ConfigMaps but I definitely want each and every event of my Pods.
    Two years ago for another client I wrote an operator from scratch using rxJava. This made me familiar with situations like this and the option to fully debounce in some situations while requiring FIFO buffering in others. That operator use-case had, among other things, to observe ConfigMaps and Pods. The ConfigMaps were managed by CD pipelines. As finalizers those pipelines were running a barrage of patch and apply calls for labels, annotations and data against the ConfigMaps. In this situation it did not hamper correctness to debounce those ConfigMaps in the operator for 10s and only ever use the "latest" one.
    For the Pods however it was not OK to ignore any event. They needed to be monitored for Ready > NotReady > Ready transitions. A crashing and quickly restarting container in a pod might do this. When using kopf you might never see those.

  2. It was just some pseudo-code trying to explain that every writing ProgressStorage method unconditionally and always stores the patch it just updated also in-memory. Then, in the fetch method, it waits for that in-memory representation to appear on a resource for which fetch is called. The if-statement chain was not supposed to be real code but to illustrate and talk about the order of precedence and invalidation logic when dealing with the in-memory representation of a patch in conjunction with its representation in the actual resource manifest data.
    So if you don't want to use memory, OK. This never meant to suggest using precisely that class. Also just for this functionality I don't think the memory footprint would be too large. The patch would only be kept in-memory for the time the actual patch is "underway" to the API and again back to kopf via the new watch event the patch triggered. If only in-flight patches are stored the memory footprint should be in the negligible megabytes region.

  3. Agreed. It would be easy to implement an InMemoryProgressStorage. However I think the current design with all of them storage implementations should be hardened against the race condition this bug is about. So that it not only works probably in most cases but by definition.

@nolar
Copy link
Owner

nolar commented Apr 10, 2021

As a side-note: the latency can be added by using toxiproxy by Shopify.

toxiproxy-server
toxiproxy-cli create kopf_k3s -l localhost:11223 -u localhost:50016
toxiproxy-cli toxic add kopf_k3s -t latency -a latency=3000

where port 50016 comes from kubectl config view with K3d/K3s currently used, and port 11223 is our random choice.

$ toxiproxy-cli inspect kopf_k3s
Name: kopf_k3s	Listen: 127.0.0.1:11223	Upstream: localhost:50016
======================================================================
Upstream toxics:
Proxy has no Upstream toxics enabled.

Downstream toxics:
latency_downstream:	type=latency	stream=downstream	toxicity=1.00	attributes=[	jitter=0	latency=3000	]

When injected into kubeconfig by:

kubectl config set clusters.k3d-k3s-default.server https://0.0.0.0:11223

Then the communication with kubectl is slowed down:

$ time kubectl get pod
………
real	0m6.125s
user	0m0.111s
sys	0m0.051s

(At least, 2 requests are made: one for API resources available; one for the actual pod list; total 2*3s=6s; sometimes 18s for reasons unknown.)

Only the "downstream" (client->server) is slowed down, not the "upstream" (server->client).


To do this to Kopf or the operator only, without affecting kubectl:

import logging
import kopf
import dataclasses

@kopf.on.login()
def delayed_k3s(**_):
    conn = kopf.login_via_pykube(logger=logging.getLogger('xxx'))
    if conn:
        return dataclasses.replace(conn, server=conn.server.rsplit(':', 1)[0] + ':11223')

# from examples/01-minimal:
@kopf.on.create('kopfexamples')
def create_fn(spec, **kwargs):
    print(f"And here we are! Creating: {spec}")
    return {'message': 'hello world'}  # will be the new status

Then we get a 3-second delay on every "client->server" call (~0s on "server->client" streaming!):

[2021-04-10 11:26:58,853] kopf.reactor.activit [INFO    ] Initial authentication has been initiated.
[2021-04-10 11:26:58,854] kopf.activities.auth [DEBUG   ] Activity 'delayed_k3s' is invoked.
[2021-04-10 11:26:58,882] xxx                  [DEBUG   ] Pykube is configured via kubeconfig file.
[2021-04-10 11:26:58,886] kopf.activities.auth [INFO    ] Activity 'delayed_k3s' succeeded.
[2021-04-10 11:26:58,886] kopf.reactor.activit [INFO    ] Initial authentication has finished.

# here, scanning the API groups/versions/resources available (in parallel, but 2-3 levels of API hierarchy).

[2021-04-10 11:27:11,129] kopf.clients.watchin [DEBUG   ] Starting the watch-stream for customresourcedefinitions.v1.apiextensions.k8s.io cluster-wide.
[2021-04-10 11:27:11,131] kopf.clients.watchin [DEBUG   ] Starting the watch-stream for kopfexamples.v1.kopf.dev cluster-wide.

# here, starting the watch-streams for all involved resources/namespaces.

[2021-04-10 11:27:14,253] kopf.objects         [DEBUG   ] [kube-system/kopf-example-1] Creation is in progress: {'apiVersion': 'kopf.dev/v1', 'kind': 'KopfExample', ………
[2021-04-10 11:27:14,254] kopf.objects         [DEBUG   ] [kube-system/kopf-example-1] Handler 'create_fn' is invoked.
[2021-04-10 11:27:14,256] kopf.objects         [INFO    ] [kube-system/kopf-example-1] Handler 'create_fn' succeeded.
[2021-04-10 11:27:14,258] kopf.objects         [INFO    ] [kube-system/kopf-example-1] Creation is processed: 1 succeeded; 0 failed.
[2021-04-10 11:27:14,259] kopf.objects         [DEBUG   ] [kube-system/kopf-example-1] Patching with: {'status': {'create_fn': {'message': 'hello world'}}, 'metadata': {'annotations': {'kopf.zalando.org/last-handled-configuration': '{"spec":{"duration":"1m","field":"value","items":["item1","item2"]},"metadata":{"labels":{"somelabel":"somevalue"},"annotations":{"someannotation":"somevalue"}}}\n'}}}
And here we are! Creating: {'duration': '1m', 'field': 'value', 'items': ['item1', 'item2']}

# here, patching: the downstream call takes 3s; the upstream watch-events take ~0s to arrive back.

[2021-04-10 11:27:17,389] kopf.objects         [DEBUG   ] [kube-system/kopf-example-1] Something has changed, but we are not interested (the essence is the same).
[2021-04-10 11:27:17,389] kopf.objects         [DEBUG   ] [kube-system/kopf-example-1] Handling cycle is finished, waiting for new changes since now.

@nolar
Copy link
Owner

nolar commented Apr 10, 2021

In response to the comments:

To no.1.

I agree in principle. Though, I have some doubts about the implementation. Per-resource configuration looks questionable to me. The batching window is a low-level hack mostly for latency issues, not for operating logic.

Where I see the distinction line, is event-watching vs. change-detecting handlers (let's keep daemons & timers & indices aside for now).

The change-detecting handlers (on-create/update/delete/resume) are state-driven — the same as Kubernetes is often described to be. If you have a state change of Ready→NotReady→Ready, and the handler fails on Ready→NotReady, it will be retried, but the state will be (2nd)Ready again. More on that, Kopf will have to remember the whole chain of changes and process/retry the old changes while the resource can be in a much newer state for a long time. So, by design, these handlers should not see all events, only the differences between the last-processed & current states. Worth mentioning, that change-detecting handlers are not a Kubernetes feature, it is Kopf's own addon, so there is no baseline to align to.

However, the event-watching handlers (on-event) give no promises on which events will be seen or not seen. In fact, there might be an implicit promise and an expectation that ALL events are seen as sent by Kubernetes. And that implicit promise is obviously broken due to batching. That didn't cause any problems by now, but I would not claim that it is how it should be forever. It could be changed. For those rare cases when every interim event counts, the operator developers can implement their queues, and put events to queues in the on-event handlers, and process them elsewhere (in daemons?) — with the guarantee that the events are delivered precisely as K8s does that (except for downtimes).

A possible solution might be a redesign of the worker()->processor() interaction so that each and every watch-event is processed via on-event handlers (no batching!), while only the latest state of a batch is processed via on-create/update/delete/resume. This is still backwards compatible with the current promises & documentation about all of the mentioned handler types.

Would such a logic solve the limitations of Kopf for the use-cases you mentioned?

@nolar
Copy link
Owner

nolar commented Apr 10, 2021

To no.2:

A similar approach was once offered for some other unrelated problem. There are reasons why patches should not be handled any other way than by applying them to the K8s API.

Reason 1: No matter how much Kopf adheres to RFCs, there might be differences in how this patching is implemented in Kopf and how it is implemented in Kubernetes. I would not trust myself here in doing things correctly. Instead, I prefer to send it to the server-side always and get back the patched body of a resource.

Reason 2: Even if we would be able to implement proper patching in-memory the same way as Kubernetes does, or even comparison of multiple/combined patches to the resulting body, there are server-side changes outside of Kopf's control: e.g., mutating admission webhooks. Even in one single PATCH operation, the resulting resource can be different from what was in the payload of the same request — because some external admission webhooks decided so.

The latter is basically a showstopper: patches cannot be interpreted client-side in any way. The whole patching logic must be done server-side.

And so, to track the resource changes, we can only go for the "opaque" .metadata.resourceVersion — as I suggested above.

@nolar
Copy link
Owner

nolar commented Apr 10, 2021

Notes for myself:


It seems that ToxiProxy cannot help here. It slows down all requests, including patching. If patching is slowed down, the operator blocks for the whole duration of the request sent & response received. As a result, all changes arriving into Kubernetes during the patch, end up in a single batch on the operator side:

Timeline with upstream latency 0s, downstream latency 3s:

         /-- kubectl create, object is created (a=s0)
         | ... sleep 1s
         |    /-- kubectl patch, object is patched (b=s0+p1)
         |    |      /-- c=s0+p1+p2
         ↓    ↓      |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
         ↓    ↓      ↑↓
         |    |      |\----3s----\
         |    |      |           |
         |    \----3s+---\       |
         |           |   |       |
         \----3s----\|   \------\|
                    ↓↑⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶↓ (blocked by patching, ends with batching)
----+-//------------aaaaabbbbbbbbcccccc-> which state does the operator see
    ↓               ↓↑   X       ↓
    |               ||   X       |
    |               ||   X       \-- operator gets a watch-event (batch: patched1+patched2)
    |               ||   X-- operator WOULD get a watch-event (patched1), but it does not
    |               |\-- operator reacts, starts patching (p2)
    |               \-- operator gets a watch-event (state a)
    \-- watching started

It is impossible to introduce latency only for watching streams via ToxyProxy.


However, it is possible with strategic placement of sleeps:

# kopf.clients.watching::watch_objs()
...
            async with response:
                async for line in _iter_jsonlines(response.content):
                    await asyncio.sleep(3.0)  # <<<<<<<<<<<<<<<<<<<<<<<<<<<
                    raw_input = cast(bodies.RawInput, json.loads(line.decode("utf-8")))
                    yield raw_input
...

WIth that single-line hack, the double-execution is reproducible with a regular operator (examples/01-minimal) with no authentication handlers, and the following CLI script:

kubectl apply -f examples/obj.yaml && sleep 1 && kubectl patch -f examples/obj.yaml --type merge -p '{"spec": {"field": 2}}'

This happens 100% of the time, without freezing the event loop artificially.

The timeline looks like this (with patch-blocking removed):

Timeline with only the watch-stream latency 3s:


         /-- kubectl create, object is created (a=s0)
         | ... sleep 1s
         |    /-- kubectl patch, object is patched (b=s0+p1)
         |    |      /-- c=s0+p1+p2
         ↓    ↓      |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
         ↓    ↓      ↑↓
         |    |      |\----3s----\
         |    |      |           |
         |    \----3s+---\       |
         |           |   |       |
         \----3s----\|   |       |
                    ↓↑   ↓       ↓
----+-//------------aaaaabbbbbbbbcccccc-> which state is seen by the operator
    ↓               ↓↑   ↓       ↓
    |               ||   |       |
    |               ||   |       \-- operator gets a watch-event (patched2)
    |               ||   |
    |               ||   \-- operator gets a watch-event of state "b" (patched1)
    |               ||       !BUG!: "c" (p2) is not seen yet, though implied; executes AGAIN!
    |               ||
    |               |\-- handlers execute, insta-patches as done (p2), creates state "c"
    |               \-- operator gets a watch-event (state a)
    \-- watching started

@paxbit
Copy link
Author

paxbit commented Apr 12, 2021

A possible solution might be a redesign of the worker()->processor() interaction so that each and every watch-event is processed via on-event handlers (no batching!), while only the latest state of a batch is processed via on-create/update/delete/resume. This is still backwards compatible with the current promises & documentation about all of the mentioned handler types.

Would such a logic solve the limitations of Kopf for the use-cases you mentioned?

Yes, I think it would. This is how I was thinking about the topic too. Leave the change detection via old/new diff in place while modifying the stream multiplexing so that it it feeds every resource event to the change detection so that it can make correct decisions about actual changes. Or, put differently, I think it is OK, actually desirable, to drop events in well known circumstances. Detecting nothing changed of the essence would be one. This of course presumes good knowledge about what is of the essence and that all assumptions kopf makes about this ideally overlap 100% with the assumptions an operator implementor.
The latter one being a moving target is making me think of this as related to #715 b/c that talks about what the diff change detection deems "essential". Do you think it would make sense to change the default at least for pods to always include status.* when detecting changes?

@paxbit
Copy link
Author

paxbit commented Apr 12, 2021

To no.2:
...
Reason 1: No matter how much Kopf adheres to RFCs, ...

Reason 2: Even if we would be able to implement proper patching in-memory the same way as Kubernetes does, ...

The latter is basically a showstopper: patches cannot be interpreted client-side in any way. The whole patching logic must be done server-side.

I completely agree in principle. However I think this small change to my proposal should make this work: Simply do never patch in-memory when a resource is still missing a storage structure but use the in-memory representation in change detection and handler progress calculation. At the end of the day it cannot matter by definition if kopf got its progress/diffbase storage from the actual resource data or from a cached in-memory representation. kopf today already expects the storage it patches onto the resource to come back verbatim. I think of it as a cache. The idea to patch the resource in-memory came from my desire to fully encapsulate the proposal in the ProgressStorage/DiffBaseStorage classes and make the change fully transparent to all existing parts of kopf dealing with the resource. Basically make it so that the resource looks like it always had the expected storage structure on it. But maybe that was never necessary. I did not actually check if access to the storage structures on the resource leak outside of the ProgressStorage/DiffBaseStorage classes. If it does not and the only accessors are those two classes and all its implementations, then everything should be fine as-is without patching in-memory.

About the resource-version thing. I never thought about using kube's resourcesVersion for this. I'm aware it cannot be used for anything else but !=. What I meant was "some_version_counter", controlled by kopf. I think it should work to define an incrementing counter field controlled by kopf which becomes part of the progress storage stanza. This way kopf could decide if the progress storage it sees on the actual resource is older or equal to the in-memory representation it has for that resource.

What do you think. Does that fix the idea?

@paxbit
Copy link
Author

paxbit commented Apr 13, 2021

FYI I just noticed a mistake I made. Of course it is not enough to just deal with ProgressStorage. DiffBaseStorage would need to be cached as well. The mistake was in my thinking about "the patch" as the entirety of all kopf managed resource stanzas and my false memory of ProgressStorage to actually cover all those stanzas - which it does not.
Sorry if that seemed confusing! I updated my previous post.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants