Skip to content

Commit

Permalink
WIP a repro with artificial inconsistency
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Vasilyev <nolar@nolar.info>
  • Loading branch information
nolar committed Oct 3, 2021
1 parent 6199777 commit e856f7d
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 7 deletions.
85 changes: 82 additions & 3 deletions examples/01-minimal/example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,86 @@
import logging
import kopf
import dataclasses


# @kopf.on.login()
# def delayed_k3s(**_):
# conn = kopf.login_via_pykube(logger=logging.getLogger('xxx'))
# if conn:
# return dataclasses.replace(conn, server=conn.server.rsplit(':', 1)[0] + ':11223')


@kopf.on.create('kopfexamples')
def create_fn(spec, **kwargs):
print(f"And here we are! Creating: {spec}")
return {'message': 'hello world'} # will be the new status
@kopf.on.update('kopfexamples')
def create_fn(meta, spec, reason, logger, **kwargs):
rv = meta.get('resourceVersion')
logger.warning(f">>> {rv=} And here we are! {reason=}: {spec}")


# @kopf.on.create('kopfexamples')
# def create_fn2(spec, **kwargs):
# print(f"And here we are! Creating2: {spec}")

"""
=======================================================================================================================
Trigger with (delete the object first!):
$ kubectl apply -f examples/obj.yaml && sleep 1 && kubectl patch -f examples/obj.yaml --type merge -p '{"spec": {"field": 2}}'
=======================================================================================================================
Timeline with the watch-stream latency 3s:
/-- kubectl create, object is created (a=s0)
| ... sleep 1s
| /-- kubectl patch, object is patched (b=s0+p1)
| | /-- c=s0+p1+p2
↓ ↓ |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
↓ ↓ ↑↓
| | |\----3s----\
| | | |
| \----3s+---\ |
| | | |
\----3s----\| | |
↓↑ ↓ ↓
----+-//------------aaaaabbbbbbbbcccccc-> which state is seen by the operator
↓ ↓↑ ↓ ↓
| || | |
| || | \-- operator gets a watch-event (patched2)
| || |
| || \-- operator gets a watch-event of state "b" (patched1)
| || !BUG!: "c" (p2) is not seen yet, though implied; executes AGAIN!
| ||
| |\-- handlers execute, insta-patches as done (p2), created state "c"
| \-- operator gets a watch-event (state a)
\-- watching started
Presumable fix:
/-- kubectl create, object is created (a=s0)
| ... sleep 1s
| /-- kubectl patch, object is patched (b=s0+p1)
| | /-- c=s0+p1+p2
↓ ↓ |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
↓ ↓ ↑↓
| | |\----3s----\
| | | |
| \----3s+---\ |
| | | |
\----3s----\| | |
↓↑⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶↓ (patching p2 sets the consistency expectation for N seconds)
----+-//------------aaaaabbbbbbbbcccccc-> which state does the operator see
↓ ↓↑ ⌇ ↓
| || ⌇ |
| || ⌇ \-- operator gets a consistent watch-event (patched1+patched2), execute everything
| || ⌇
| || \-- operator executes ONLY the low-level handlers (seen as patched1)
| || \~~~~~~⨳ inconsistency mode: wait until a new event (then discard it) OR timeout (then process it)
| |
| |\-- operator reacts, starts patching (p2)
| \-- operator gets a watch-event (state a)
\-- watching started
"""
3 changes: 3 additions & 0 deletions kopf/_core/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ async def process_resource_causes(
# If the wait exceeds its time and no new consistent events arrive, then fake the consistency.
# However, if a patch is accumulated by now, skip waiting and apply it instantly (by exiting).
# In that case, we are guaranteed to be inconsistent, so also skip the state-dependent handlers.
unslept = 'unset'
consistency_is_required = changing_cause is not None
consistency_is_achieved = consistency_time is None # i.e. preexisting consistency
if consistency_is_required and not consistency_is_achieved and not patch and consistency_time:
Expand All @@ -316,6 +317,8 @@ async def process_resource_causes(
consistency_is_achieved = unslept is None # "woke up" vs. "timed out"
if consistency_is_required and not consistency_is_achieved:
changing_cause = None # exit to PATCHing and/or re-iterating over new events.
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
local_logger.debug(f'>>> {rv=} {consistency_is_required=} {consistency_is_achieved=} {unslept=} {stream_pressure.is_set()=} {changing_cause=}')

# Now, the consistency is either pre-proven (by receiving or not expecting any resource version)
# or implied (by exceeding the allowed consistency-waiting timeout while getting no new events).
Expand Down
27 changes: 23 additions & 4 deletions kopf/_core/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def exception_handler(exc: BaseException) -> None:
exception_handler=exception_handler)
streams: Streams = {}

loop = asyncio.get_running_loop()
try:
# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
Expand All @@ -192,12 +193,21 @@ def exception_handler(exc: BaseException) -> None:
if isinstance(raw_event, watching.Bookmark):
continue

# TODO: REMOVE: only for debugging!
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
fld = raw_event.get('object', {}).get('spec', {}).get('field')
knd = raw_event.get('object', {}).get('kind')
nam = raw_event.get('object', {}).get('metadata', {}).get('name')
logger.debug(f'STREAM GOT {knd=} {nam=} {rv=} // {fld=} ')

# Multiplex the raw events to per-resource workers/queues. Start the new ones if needed.
key: ObjectRef = (resource, get_uid(raw_event))
try:
# Feed the worker, as fast as possible, no extra activities.
streams[key].pressure.set() # interrupt current sleeps, if any.
await streams[key].backlog.put(raw_event)
loop.call_later(3.0, streams[key].pressure.set)
loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event)
# streams[key].pressure.set() # interrupt current sleeps, if any.
# await streams[key].backlog.put(raw_event)
except KeyError:

# Block the operator's readiness for individual resource's index handlers.
Expand All @@ -211,8 +221,10 @@ def exception_handler(exc: BaseException) -> None:

# Start the worker, and feed it initially. Starting can be moderately slow.
streams[key] = Stream(backlog=asyncio.Queue(), pressure=asyncio.Event())
streams[key].pressure.set() # interrupt current sleeps, if any.
await streams[key].backlog.put(raw_event)
# streams[key].pressure.set() # interrupt current sleeps, if any.
# await streams[key].backlog.put(raw_event)
loop.call_later(3.0, streams[key].pressure.set)
loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event)
await scheduler.spawn(
name=f'worker for {key}',
coro=worker(
Expand Down Expand Up @@ -316,6 +328,13 @@ async def worker(
if isinstance(raw_event, EOS):
break # out of the worker.

# TODO: REMOVE: only for debugging!
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
fld = raw_event.get('object', {}).get('spec', {}).get('field')
knd = raw_event.get('object', {}).get('kind')
nam = raw_event.get('object', {}).get('metadata', {}).get('name')
logger.debug(f'QUEUED GOT {knd=} {nam=} {rv=} exp={expected_version!r} // {fld=} ')

# Keep track of the resource's consistency for high-level (state-dependent) handlers.
# See `settings.persistence.consistency_timeout` for the explanation of consistency.
if expected_version is not None and expected_version == get_version(raw_event):
Expand Down

0 comments on commit e856f7d

Please sign in to comment.