Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement storage job limiter #2123

Merged
merged 27 commits into from
Nov 20, 2024

Conversation

Matovidlo
Copy link
Contributor

@Matovidlo Matovidlo commented Oct 23, 2024

Jira: PSGO-783

Changes:

  • Notes https://keboolaglobal.slack.com/archives/C055WKPHYSD/p1730796121823789
  • Include adjustements for DD tags env for k8s benchmarks.
  • Add new condition, which does not cause filerotation called sinkThrottled.
  • Add 2 new configuration options.
    • Basically sinkLimit inherits from jobLimit, but it is configurable for filerotation tests.
  • Add new collection, OnSinkDelete which is connected with storage job repository module.
    • Use the collection within plugin of storage job repository. When sink is deleted, remove all jobs under that sinkKey.
  • Add new mocks for keboola bridge for inspecting jobs and import-async POST endpoint interaction.
  • Add 2 new schemas
  • Keboola storage job schema, without repository. Represents currently running jobs in keboola
    • This have to be stored in ETCD, so we do not waste repeadetly calls on storage API.
  • Storage job schema including repository. It represents limit of sink
    • It was created because of I needed mirror it within filerotation module, which is independent of keboolasink/bridge module.
    • This can be possibly refactored in this way:
      • Aggregate Keboola storage jobs as sinkLimit when created under sinkKey. (mirror could be removed)
      • Change naming ? (But file is within keboolasink and model modules.
  • Extend metadata cleanup, which cleans both Keboola storage job and Storage jobs.
    • Added bridge into metadata cleanup module, as I need to check whether job already successed and can be removed from ETCD.
  • Add Metadata cleanup in keboola_test.go E2E test. We need it to unblock the sink imports. Also we want to test that the storage job limiter works correctly.
    • TODO: When sink deletion is performed, the metadata cleanup, should clear only keboolaBridge jobs. Other jobs are already removed from ETCD.

Questions:

  • Currently the storage jobs are stored under active prefix. It can be omitted but don't, know whether we would like to inspect it ?
  • I don't know whether it is useful to store sinkLimit also under file entity. Probablly will be used by next PR when we would like to propagate error into UI.
  • Check metadata cleanup. Now it needs support for Bridge and PublicAPI. Possibly it can use only bridge, when using some bridge functionality.

@Matovidlo Matovidlo force-pushed the mv-PSGO-783-implement-storage-job-rate-limiter branch 5 times, most recently from 69affdf to c1ac629 Compare November 8, 2024 11:38
@Matovidlo Matovidlo marked this pull request as ready for review November 8, 2024 12:22
Used in job repository, when deleting sink, delete all jobs under the
sinkKey
The job is deleted once success status is returned from GET jobs connection API endpoint
When exceeding jobs limit, do not rotate file, instead return and
accumulate slices further. When storage job is deleted by metadata
cleanup module, the import is executed
When not present, the test never ends due to set limit
@Matovidlo Matovidlo force-pushed the mv-PSGO-783-implement-storage-job-rate-limiter branch from 60557bb to 1b280ed Compare November 11, 2024 14:24
Copy link
Contributor

@hosekpeter hosekpeter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Create mirror on top of bridge component. Add new throttle plugin and
evaluate it in filerotation module. Adjust tests
call it in coordinator. Fix fileroration operator to evaluate sink
throttling after import conditions are satisfied
@Matovidlo Matovidlo force-pushed the mv-PSGO-783-implement-storage-job-rate-limiter branch from 0332f7e to c78fba5 Compare November 19, 2024 11:50
@Matovidlo Matovidlo force-pushed the mv-PSGO-783-implement-storage-job-rate-limiter branch from c78fba5 to 41c0053 Compare November 19, 2024 12:34
Copy link

Stream Kubernetes Diff [CI]

Between base e375dc2 ⬅️ head 665e97a.

Expand
--- /tmp/artifacts/test-k8s-state.old.json.processed.kv	2024-11-20 10:33:47.730683078 +0000
+++ /tmp/artifacts/test-k8s-state.new.json.processed.kv	2024-11-20 10:33:48.210681617 +0000
@@ -200 +200 @@
-<Deployment/stream-api>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Deployment/stream-api>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -364 +364 @@
-<Deployment/stream-http-source>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Deployment/stream-http-source>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -525 +525 @@
-<Deployment/stream-storage-coordinator>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Deployment/stream-storage-coordinator>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -602 +602 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].hostname = "stream-etcd-1";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].hostname = "stream-etcd-2";
@@ -606 +606 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].targetRef.name = "stream-etcd-1";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].targetRef.name = "stream-etcd-2";
@@ -609 +609 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].hostname = "stream-etcd-2";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].hostname = "stream-etcd-1";
@@ -613 +613 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].targetRef.name = "stream-etcd-2";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].targetRef.name = "stream-etcd-1";
@@ -653 +653 @@
-<Endpoints/stream-etcd>.subsets[0].addresses[0].targetRef.name = "stream-etcd-1";
+<Endpoints/stream-etcd>.subsets[0].addresses[0].targetRef.name = "stream-etcd-2";
@@ -659 +659 @@
-<Endpoints/stream-etcd>.subsets[0].addresses[1].targetRef.name = "stream-etcd-2";
+<Endpoints/stream-etcd>.subsets[0].addresses[1].targetRef.name = "stream-etcd-1";
@@ -717 +717 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].hostname = "stream-storage-writer-reader-0";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].hostname = "stream-storage-writer-reader-1";
@@ -721 +721 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].targetRef.name = "stream-storage-writer-reader-0";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].targetRef.name = "stream-storage-writer-reader-1";
@@ -724 +724 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].hostname = "stream-storage-writer-reader-1";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].hostname = "stream-storage-writer-reader-0";
@@ -728 +728 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].targetRef.name = "stream-storage-writer-reader-1";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].targetRef.name = "stream-storage-writer-reader-0";
@@ -1214,2 +1214,2 @@
-<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
-<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
+<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -1534 +1534 @@
-<Pod/stream-etcd-0>.spec.containers[0].env[21].value = "new";
+<Pod/stream-etcd-0>.spec.containers[0].env[21].value = "existing";
@@ -1780 +1780 @@
-<Pod/stream-etcd-1>.spec.containers[0].env[21].value = "new";
+<Pod/stream-etcd-1>.spec.containers[0].env[21].value = "existing";
@@ -2026 +2026 @@
-<Pod/stream-etcd-2>.spec.containers[0].env[21].value = "new";
+<Pod/stream-etcd-2>.spec.containers[0].env[21].value = "existing";
@@ -2350,2 +2350,2 @@
-<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
-<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
+<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -2742,2 +2742,2 @@
-<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
-<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
+<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -2978 +2978 @@
-<Pod/stream-storage-writer-reader-0>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-storage-writer-reader-0>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3061 +3061 @@
-<Pod/stream-storage-writer-reader-0>.spec.containers[1].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-storage-writer-reader-0>.spec.containers[1].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3231 +3231 @@
-<Pod/stream-storage-writer-reader-1>.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-storage-writer-reader-1>.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3314 +3314 @@
-<Pod/stream-storage-writer-reader-1>.spec.containers[1].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<Pod/stream-storage-writer-reader-1>.spec.containers[1].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3552 +3552 @@
-<ReplicaSet/stream-api-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<ReplicaSet/stream-api-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3723 +3723 @@
-<ReplicaSet/stream-http-source-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<ReplicaSet/stream-http-source-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3891 +3891 @@
-<ReplicaSet/stream-storage-coordinator-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<ReplicaSet/stream-storage-coordinator-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -3932,0 +3933,12 @@
+<Secret/sh.helm.release.v1.stream-etcd.v2> = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.apiVersion = "v1";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.data = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.kind = "Secret";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels.name = "stream-etcd";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels.owner = "helm";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels.version = "2";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.name = "sh.helm.release.v1.stream-etcd.v2";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.namespace = "stream";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.type = "helm.sh/release.v1";
@@ -4228 +4240 @@
-<StatefulSet/stream-etcd>.spec.template.spec.containers[0].env[21].value = "new";
+<StatefulSet/stream-etcd>.spec.template.spec.containers[0].env[21].value = "existing";
@@ -4478 +4490 @@
-<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:665e97a-1732098303";
@@ -4558 +4570 @@
-<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[1].image = "docker.io/keboola/stream-api:e375dc2-1732097778";
+<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[1].image = "docker.io/keboola/stream-api:665e97a-1732098303";


(see artifacts in the Github Action for more information)

@Matovidlo Matovidlo merged commit e81023a into main Nov 20, 2024
13 checks passed
@Matovidlo Matovidlo deleted the mv-PSGO-783-implement-storage-job-rate-limiter branch November 20, 2024 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants