Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure operator's peering via centralised settings #572

Merged
merged 5 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 77 additions & 20 deletions docs/peering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ To set the operator's priority, use :option:`--priority`:

kopf run --priority=100 ...

Or:

.. code-block:: python

import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.priority = 100

As a shortcut, there is a :option:`--dev` option, which sets
the priority to ``666``, and is intended for the development mode.

Expand Down Expand Up @@ -59,25 +69,11 @@ Create the peering objects as needed with one of:

.. note::

Previously, ``KopfPeering`` was the only CRD, and it was cluster-scoped.
Now, it is namespaced. For the new users, it all will be fine and working.

If the old ``KopfPeering`` CRD is already deployed to your cluster,
it will also continue to work as before without re-configuration:
though there will be no namespace isolation as documented here ---
it will be cluster peering regardless of :option:`--namespace`
(as it was before the changes).

When possible (but strictly after the Kopf's version upgrade),
please delete the old CRD, and re-create from scratch:

.. code-block:: bash

kubectl delete crd kopfpeerings.zalando.org
# give it 1-2 minutes to cleanup, or repeat until succeeded:
kubectl create -f peering.yaml

Then re-deploy your custom peering objects of your apps.
In ``kopf<0.11`` (until May'2019), ``KopfPeering`` was the only CRD,
and it was cluster-scoped. In ``kopf>=0.11,<0.29`` (until Oct'2020),
this mode was deprecated but supported if the old CRD existed.
Since ``kopf>=0.29`` (Nov'2020), it is not supported anymore.
To upgrade, delete and re-create the peering CRDs to the new ones.


Custom peering
Expand All @@ -88,11 +84,27 @@ The operator can be instructed to use alternative peering objects::
kopf run --peering=example ...
kopf run --peering=example --namespace=some-ns ...

Or:

.. code-block:: python

import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.name = "example"
settings.peering.mandatory = True

Depending on :option:`--namespace`, either ``ClusterKopfPeering``
or ``KopfPeering`` will be used (in the operator's namespace).

If the peering object does not exist, the operator will fail to start.
Using :option:`--peering` assumes that the peering is required.
Using :option:`--peering` assumes that the peering is mandatory.

Please note that in the startup handler, this is not exactly the same:
the mandatory mode must be set explicitly. Otherwise, the operator will try
to auto-detect the presence of the custom peering object, but will not fail
if it is absent -- unlike with the ``--peering=`` CLI option.

The operators from different peering objects do not see each other.

Expand All @@ -108,6 +120,16 @@ the standalone mode can be enabled::

kopf run --standalone ...

Or:

.. code-block:: python

import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.standalone = True

In that case, the operator will not freeze if other operators with
the higher priority will start handling the objects, which may lead
to the conflicting changes and reactions from multiple operators
Expand Down Expand Up @@ -145,10 +167,45 @@ operator in the deployment or replicaset:

kopf run --priority=$RANDOM ...

Or:

.. code-block:: python

import random
import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.priority = random.randint(0, 32767)

``$RANDOM`` is a feature of bash
(if you use another shell, see its man page for an equivalent).
It returns a random integer in the range 0..32767.
With high probability, 2-3 pods will get their unique priorities.

You can also use the pod's IP address in its numeric form as the priority,
or any other source of integers.


Stealth keep-alives
===================

Every few seconds (60 by default), the operator will send a keep-alive update
to the chosen peering, showing that it is still functioning. Other operators
will notice that and make decisions on their freezing or resuming.

The operator also logs a keep-alive activity to its own logs. This can be
distracting. To disable:

.. code-block:: python

import random
import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.stealth = True

There is no equivalent CLI option for that.

Please note that it only affects logging. The keep-alive are sent anyway.
2 changes: 1 addition & 1 deletion docs/walkthrough/starting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ The output looks like this:
.. code-block:: none

[2019-05-31 10:42:11,870] kopf.config [DEBUG ] configured via kubeconfig file
[2019-05-31 10:42:11,913] kopf.reactor.peering [WARNING ] Default peering object not found, falling back to the standalone mode.
[2019-05-31 10:42:11,913] kopf.reactor.peering [WARNING ] Default peering object is not found, falling back to the standalone mode.
[2019-05-31 10:42:12,037] kopf.reactor.handlin [DEBUG ] [default/my-claim] First appearance: {'apiVersion': 'zalando.org/v1', 'kind': 'EphemeralVolumeClaim', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"zalando.org/v1","kind":"EphemeralVolumeClaim","metadata":{"annotations":{},"name":"my-claim","namespace":"default"}}\n'}, 'creationTimestamp': '2019-05-29T00:41:57Z', 'generation': 1, 'name': 'my-claim', 'namespace': 'default', 'resourceVersion': '47720', 'selfLink': '/apis/zalando.org/v1/namespaces/default/ephemeralvolumeclaims/my-claim', 'uid': '904c2b9b-81aa-11e9-a202-a6e6b278a294'}}
[2019-05-31 10:42:12,038] kopf.reactor.handlin [DEBUG ] [default/my-claim] Adding the finalizer, thus preventing the actual deletion.
[2019-05-31 10:42:12,038] kopf.reactor.handlin [DEBUG ] [default/my-claim] Patching with: {'metadata': {'finalizers': ['KopfFinalizerMarker']}}
Expand Down
2 changes: 1 addition & 1 deletion examples/12-embedded/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Starting the main app.

[DEBUG ] Pykube is configured via kubeconfig file.
[DEBUG ] Client is configured via kubeconfig file.
[WARNING ] Default peering object not found, falling back to the standalone mode.
[WARNING ] Default peering object is not found, falling back to the standalone mode.
[WARNING ] OS signals are ignored: running not in the main thread.

Do the main app activity here. Step 1/3.
Expand Down
45 changes: 24 additions & 21 deletions kopf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def main() -> None:
@main.command()
@logging_options
@click.option('-n', '--namespace', default=None)
@click.option('--standalone', is_flag=True, default=False)
@click.option('--standalone', is_flag=True, default=None)
@click.option('--dev', 'priority', type=int, is_flag=True, flag_value=666)
@click.option('-L', '--liveness', 'liveness_endpoint', type=str)
@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_RUN_PEERING')
@click.option('-p', '--priority', type=int, default=0)
@click.option('-P', '--peering', 'peering_name', type=str, envvar='KOPF_RUN_PEERING')
@click.option('-p', '--priority', type=int)
@click.option('-m', '--module', 'modules', multiple=True)
@click.argument('paths', nargs=-1)
@click.make_pass_decorator(CLIControls, ensure=True)
Expand All @@ -77,8 +77,8 @@ def run(
paths: List[str],
modules: List[str],
peering_name: Optional[str],
priority: int,
standalone: bool,
priority: Optional[int],
standalone: Optional[bool],
namespace: Optional[str],
liveness_endpoint: Optional[str],
) -> None:
Expand Down Expand Up @@ -108,7 +108,7 @@ def run(
@click.option('-n', '--namespace', default=None)
@click.option('-i', '--id', type=str, default=None)
@click.option('--dev', 'priority', flag_value=666)
@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_FREEZE_PEERING')
@click.option('-P', '--peering', 'peering_name', required=True, envvar='KOPF_FREEZE_PEERING')
@click.option('-p', '--priority', type=int, default=100, required=True)
@click.option('-t', '--lifetime', type=int, required=True)
@click.option('-m', '--message', type=str)
Expand All @@ -121,46 +121,49 @@ def freeze(
priority: int,
) -> None:
""" Freeze the resource handling in the cluster. """
ourserlves = peering.Peer(
id=id or peering.detect_own_id(manual=True),
name=peering_name,
namespace=namespace,
priority=priority,
lifetime=lifetime,
)
identity = peering.Identity(id) if id else peering.detect_own_id(manual=True)
registry = registries.SmartOperatorRegistry()
settings = configuration.OperatorSettings()
settings.peering.name = peering_name
settings.peering.priority = priority
vault = credentials.Vault()
auth.vault_var.set(vault)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait({
activities.authenticate(registry=registry, settings=settings, vault=vault),
ourserlves.keepalive(),
peering.touch(
identity=identity,
settings=settings,
namespace=namespace,
lifetime=lifetime,
),
}))


@main.command()
@logging_options
@click.option('-n', '--namespace', default=None)
@click.option('-i', '--id', type=str, default=None)
@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_RESUME_PEERING')
@click.option('-P', '--peering', 'peering_name', required=True, envvar='KOPF_RESUME_PEERING')
def resume(
id: Optional[str],
namespace: Optional[str],
peering_name: str,
) -> None:
""" Resume the resource handling in the cluster. """
ourselves = peering.Peer(
id=id or peering.detect_own_id(manual=True),
name=peering_name,
namespace=namespace,
)
identity = peering.Identity(id) if id else peering.detect_own_id(manual=True)
registry = registries.SmartOperatorRegistry()
settings = configuration.OperatorSettings()
settings.peering.name = peering_name
vault = credentials.Vault()
auth.vault_var.set(vault)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait({
activities.authenticate(registry=registry, settings=settings, vault=vault),
ourselves.disappear(),
peering.touch(
identity=identity,
settings=settings,
namespace=namespace,
lifetime=0,
),
}))
24 changes: 0 additions & 24 deletions kopf/clients/fetching.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,6 @@ class _UNSET(enum.Enum):
token = enum.auto()


@auth.reauthenticated_request
async def read_crd(
*,
resource: resources.Resource,
default: Union[_T, _UNSET] = _UNSET.token,
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> Union[bodies.RawBody, _T]:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

try:
response = await context.session.get(
url=CRD_CRD.get_url(server=context.server, name=resource.name),
)
await errors.check_response(response)
respdata = await response.json()
return cast(bodies.RawBody, respdata)

except aiohttp.ClientResponseError as e:
if e.status in [403, 404] and not isinstance(default, _UNSET):
return default
raise


@auth.reauthenticated_request
async def read_obj(
*,
Expand Down
Loading