Skip to content

Commit

Permalink
feat: sanitizing store keys - store path v2 (#918)
Browse files Browse the repository at this point in the history
## Description

This PR Migrates the Store to a v2 data path, migrates keys from
previous version before a read of the CR, patches them to v2, then
starts a watch. It also adds fuzzing, property-based, more unit and
journey tests.

<img width="690" alt="image"
src="https://github.com/defenseunicorns/pepr/assets/1096507/9b9522ad-2724-4579-866d-f73424e34991">


## Related Issue

Fixes #915 
<!-- or -->
Relates to PR
defenseunicorns/pepr-excellent-examples#46
updating the e2e test to expect the keys base64 encoded

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Other (security config, docs update, etc)

## Checklist before merging

- [x] Test, docs, adr added or updated as needed
- [x] [Contributor Guide
Steps](https://docs.pepr.dev/main/contribute/#submitting-a-pull-request)
followed

---------

Signed-off-by: Case Wylie <cmwylie19@defenseunicorns.com>
Co-authored-by: Barrett <81570928+btlghrants@users.noreply.github.com>
  • Loading branch information
cmwylie19 and btlghrants authored Jul 23, 2024
1 parent 035d097 commit f78666a
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 38 deletions.
8 changes: 8 additions & 0 deletions journey/k8s.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ export async function waitForDeploymentReady(namespace: string, name: string) {
}
}

export async function noWaitPeprStoreKey(name: string, matchKey: string) {
const store = await K8s(PeprStore).InNamespace("pepr-system").Get(name);
if (store.data[matchKey]) {
return store.data[matchKey];
}
}


export async function waitForPeprStoreKey(name: string, matchKey: string) {
try {
const store = await K8s(PeprStore).InNamespace("pepr-system").Get(name);
Expand Down
63 changes: 55 additions & 8 deletions journey/pepr-deploy.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2023-Present The Pepr Authors

import { beforeAll, jest, afterAll, describe, expect, it } from "@jest/globals";
import { execSync, spawnSync, spawn, ChildProcess } from "child_process";
import { describe, expect, it } from "@jest/globals";
import { execSync, spawnSync, spawn } from "child_process";
import { K8s, kind } from "kubernetes-fluent-client";
import { resolve } from "path";

import { destroyModule } from "../src/lib/assets/destroy";
import { cwd } from "./entrypoint.test";
import {
deleteConfigMap,
noWaitPeprStoreKey,
waitForConfigMap,
waitForDeploymentReady,
waitForNamespace,
Expand All @@ -23,7 +23,18 @@ export function peprDeploy() {
// Purge the Pepr module from the cluster before running the tests
destroyModule("pepr-static-test");


it("should deploy the Pepr controller into the test cluster", async () => {
// Apply the store crd and pepr-system ns
await applyStoreCRD();

// Apply the store
await applyLegacyStoreResource();

/*
* when controller starts up, it will migrate the store
* and later on the keys will be tested to validate the migration
*/
execSync("npx pepr deploy -i pepr:dev --confirm", { cwd, stdio: "inherit" });

// Wait for the deployments to be ready
Expand All @@ -50,7 +61,7 @@ export function peprDeploy() {

it("npx pepr monitor should display validation results to console", async () => {
await testValidate();

const cmd = ['pepr', 'monitor', 'static-test']

const proc = spawn('npx', cmd, { shell: true })
Expand All @@ -68,7 +79,7 @@ export function peprDeploy() {
proc.stderr.destroy()
}
})

proc.on('exit', () => state.done = true);

await until(() => state.done)
Expand Down Expand Up @@ -164,6 +175,7 @@ function testIgnore() {
expect(cm.metadata?.labels?.["pepr"]).toBeUndefined();
});
}

async function testValidate() {
// Apply the sample yaml for the HelloPepr capability
const applyOut = spawnSync("kubectl apply -f hello-pepr.samples.yaml", {
Expand Down Expand Up @@ -260,22 +272,57 @@ function testMutate() {
});
}


function testStore() {
it("should create the PeprStore", async () => {
const resp = await waitForPeprStoreKey("pepr-static-test-store", "__pepr_do_not_delete__");
expect(resp).toBe("k-thx-bye");
});

it("should write the correct data to the PeprStore", async () => {
const key1 = await waitForPeprStoreKey("pepr-static-test-store", "hello-pepr-example-1");
const key1 = await waitForPeprStoreKey("pepr-static-test-store", `hello-pepr-v2-example-1`);
expect(key1).toBe("was-here");

const key2 = await waitForPeprStoreKey("pepr-static-test-store", "hello-pepr-example-1-data");
// Should have been migrated and removed
const nullKey1 = await noWaitPeprStoreKey("pepr-static-test-store", `hello-pepr-example-1`);
expect(nullKey1).toBeUndefined();

const key2 = await waitForPeprStoreKey("pepr-static-test-store", `hello-pepr-v2-example-1-data`);
expect(key2).toBe(JSON.stringify({ key: "ex-1-val" }));

// Should have been migrated and removed
const nullKey2 = await noWaitPeprStoreKey("pepr-static-test-store", `hello-pepr-example-1-data`);
expect(nullKey1).toBeUndefined();
});

it("should write the correct data to the PeprStore from a Watch Action", async () => {
const key = await waitForPeprStoreKey("pepr-static-test-store", "hello-pepr-watch-data");
const key = await waitForPeprStoreKey("pepr-static-test-store", `hello-pepr-v2-watch-data`);
expect(key).toBe("This data was stored by a Watch Action.");
});
}



async function applyStoreCRD() {
// Apply the store crd
const appliedStoreCRD = spawnSync("kubectl apply -f journey/resources/pepr-store-crd.yaml", {
shell: true, // Run command in a shell
encoding: "utf-8", // Encode result as string
cwd: resolve(cwd, ".."),
});
const { stdout } = appliedStoreCRD;

expect(stdout).toContain("customresourcedefinition.apiextensions.k8s.io/peprstores.pepr.dev");
}

async function applyLegacyStoreResource() {
// Apply the store
const appliedStore = spawnSync("kubectl apply -f journey/resources/non-migrated-peprstore.yaml", {
shell: true, // Run command in a shell
encoding: "utf-8", // Encode result as string
cwd: resolve(cwd, ".."),
});
const { stdout } = appliedStore;

expect(stdout).toContain("peprstore.pepr.dev/pepr-static-test-store");
}
5 changes: 2 additions & 3 deletions journey/pepr-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { ChildProcessWithoutNullStreams, spawn } from "child_process";
import { Agent } from "https";
import { fetch } from "kubernetes-fluent-client";
import { RequestInit } from "node-fetch";

import { cwd } from "./entrypoint.test";
import { sleep } from "./k8s";

Expand All @@ -25,8 +24,8 @@ let expectedLines = [
"Validate Action configured for CREATE",
"Server listening on port 3000",
"Controller startup complete",
`"hello-pepr-example-1-data": "{\\"key\\":\\"ex-1-val\\"}"`,
`"hello-pepr-watch-data": "This data was stored by a Watch Action."`,
`"hello-pepr-v2-example-1-data": "{\\"key\\":\\"ex-1-val\\"}"`,
`"hello-pepr-v2-watch-data": "This data was stored by a Watch Action."`,
];

export function peprDev() {
Expand Down
1 change: 1 addition & 0 deletions journey/resources/clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
Expand Down
12 changes: 12 additions & 0 deletions journey/resources/non-migrated-peprstore.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: pepr.dev/v1
data:
__pepr_do_not_delete__: k-thx-bye
hello-pepr-example-1: was-here
hello-pepr-example-1-data: '{"key":"ex-1-val"}'
hello-pepr-watch-data: "This data was stored by a Watch Action."
kind: PeprStore
metadata:
name: pepr-static-test-store
namespace: pepr-system

36 changes: 36 additions & 0 deletions journey/resources/pepr-store-crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: peprstores.pepr.dev
spec:
conversion:
strategy: None
group: pepr.dev
names:
kind: PeprStore
listKind: PeprStoreList
plural: peprstores
singular: peprstore
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
data:
additionalProperties:
type: string
type: object
type: object
served: true
storage: true
---
apiVersion: v1
kind: Namespace
metadata:
labels:
kubernetes.io/metadata.name: pepr-system
name: pepr-system
spec: {}

24 changes: 24 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"@types/ramda": "0.30.1",
"express": "4.19.2",
"fast-json-patch": "3.1.1",
"json-pointer": "^0.6.2",
"kubernetes-fluent-client": "2.6.5",
"pino": "9.3.1",
"pino-pretty": "11.2.1",
Expand All @@ -48,6 +49,7 @@
"@jest/globals": "29.7.0",
"@types/eslint": "8.56.10",
"@types/express": "4.17.21",
"@types/json-pointer": "^1.0.34",
"@types/node": "18.x.x",
"@types/node-forge": "1.3.11",
"@types/prompts": "2.4.9",
Expand Down
90 changes: 88 additions & 2 deletions src/lib/controller/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ export class PeprControllerStore {
K8s(PeprStore)
.InNamespace(namespace)
.Get(this.#name)
// If the get succeeds, setup the watch
.then(this.#setupWatch)
// If the get succeeds, migrate and setup the watch
.then(async (store: PeprStore) => await this.#migrateAndSetupWatch(store))
// Otherwise, create the resource
.catch(this.#createStoreResource),
Math.random() * 3000,
Expand All @@ -74,6 +74,91 @@ export class PeprControllerStore {
watcher.start().catch(e => Log.error(e, "Error starting Pepr store watch"));
};

#migrateAndSetupWatch = async (store: PeprStore) => {
Log.debug(store, "Pepr Store migration");
const data: DataStore = store.data || {};
const migrateCache: Record<string, Operation> = {};

// Send the cached updates to the cluster
const flushCache = async () => {
const indexes = Object.keys(migrateCache);
const payload = Object.values(migrateCache);

// Loop over each key in the cache and delete it to avoid collisions with other sender calls
for (const idx of indexes) {
delete migrateCache[idx];
}

try {
// Send the patch to the cluster
await K8s(PeprStore, { namespace, name: this.#name }).Patch(payload);
} catch (err) {
Log.error(err, "Pepr store update failure");

if (err.status === 422) {
Object.keys(migrateCache).forEach(key => delete migrateCache[key]);
} else {
// On failure to update, re-add the operations to the cache to be retried
for (const idx of indexes) {
migrateCache[idx] = payload[Number(idx)];
}
}
}
};

const fillCache = (name: string, op: DataOp, key: string[], val?: string) => {
if (op === "add") {
// adjust the path for the capability
const path = `/data/${name}-v2-${key}`;
const value = val || "";
const cacheIdx = [op, path, value].join(":");

// Add the operation to the cache
migrateCache[cacheIdx] = { op, path, value };

return;
}

if (op === "remove") {
if (key.length < 1) {
throw new Error(`Key is required for REMOVE operation`);
}

for (const k of key) {
const path = `/data/${name}-${k}`;
const cacheIdx = [op, path].join(":");

// Add the operation to the cache
migrateCache[cacheIdx] = { op, path };
}

return;
}

// If we get here, the operation is not supported
throw new Error(`Unsupported operation: ${op}`);
};

for (const name of Object.keys(this.#stores)) {
// Get the prefix offset for the keys
const offset = `${name}-`.length;

// Loop over each key in the store
for (const key of Object.keys(data)) {
// Match on the capability name as a prefix for non v2 keys
if (startsWith(name, key) && !startsWith(`${name}-v2`, key)) {
// populate migrate cache
fillCache(name, "remove", [key.slice(offset)], data[key]);
fillCache(name, "add", [key.slice(offset)], data[key]);
}
}

// await K8s(PeprStore, { namespace, name: this.#name }).Patch(payload);
}
await flushCache();
this.#setupWatch();
};

#receive = (store: PeprStore) => {
Log.debug(store, "Pepr Store update");

Expand Down Expand Up @@ -121,6 +206,7 @@ export class PeprControllerStore {
// Load the sendCache with patch operations
const fillCache = (op: DataOp, key: string[], val?: string) => {
if (op === "add") {
// adjust the path for the capability
const path = `/data/${capabilityName}-${key}`;
const value = val || "";
const cacheIdx = [op, path, value].join(":");
Expand Down
Loading

0 comments on commit f78666a

Please sign in to comment.