Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

canary-cloud: Use Redpanda Cloud #29195

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ case "$cmd" in
--env GIT_AUTHOR_NAME
--env GIT_COMMITTER_EMAIL
--env GIT_COMMITTER_NAME
# For cloud canary
--env REDPANDA_CLOUD_CLIENT_ID
--env REDPANDA_CLOUD_CLIENT_SECRET
)

if [[ $detach_container == "true" ]]; then
Expand Down
121 changes: 121 additions & 0 deletions misc/python/materialize/redpanda_cloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import os
import time
from typing import Any

import requests


def get_result(response: requests.Response) -> dict[str, Any]:
if response.status_code not in (200, 201, 202):
raise ValueError(
f"Redpanda API call failed: {response.status_code} {response.text}"
)
result = response.json()
print(result)
return result


class RedpandaCluster:
def __init__(self, token: str, dataplane_api_url: str) -> None:
self.token = token
self.dataplane_api_url = dataplane_api_url

def headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}

def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]:
return get_result(
requests.post(
f"{self.dataplane_api_url}/v1alpha1/{object}",
json=content,
headers=self.headers(),
)
)


class RedpandaCloud:
def __init__(self) -> None:
client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"]
client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"]

result = get_result(
requests.post(
"https://auth.prd.cloud.redpanda.com/oauth/token",
json={
"client_id": client_id,
"client_secret": client_secret,
"audience": "cloudv2-production.redpanda.cloud",
"grant_type": "client_credentials",
},
)
)
# Can't finish our test otherwise
assert result["expires_in"] >= 3600, result
self.token = result["access_token"]
self.controlplane_api_url = "https://api.redpanda.com"

def headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}

def wait(self, result: dict[str, Any]) -> dict[str, Any]:
operation_id = result["operation"]["id"]
while True:
time.sleep(10)
result = get_result(
requests.get(
f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}",
headers=self.headers(),
)
)
if result["operation"]["state"] == "STATE_COMPLETED":
return result["operation"]
if result["operation"]["state"] == "STATE_FAILED":
raise ValueError(result)
if result["operation"]["state"] != "STATE_IN_PROGRESS":
raise ValueError(result)

def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
return get_result(
requests.post(
f"{self.controlplane_api_url}/v1beta2/{object}",
json=content,
headers=self.headers(),
)
)

def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
return get_result(
requests.patch(
f"{self.controlplane_api_url}/v1beta2/{object}",
json=content,
headers=self.headers(),
)
)

def get(self, object: str) -> dict[str, Any]:
return get_result(
requests.get(
f"{self.controlplane_api_url}/v1beta2/{object}",
headers=self.headers(),
)
)

def delete(self, object: str, id: str) -> dict[str, Any]:
return get_result(
requests.delete(
f"{self.controlplane_api_url}/v1beta2/{object}/{id}",
headers=self.headers(),
)
)

def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster:
return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
$ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes repeat=100
abc:abc

> DROP SOURCE IF EXISTS bytes CASCADE;
> DROP SOURCE IF EXISTS kafka_bytes CASCADE;
> DROP CONNECTION IF EXISTS kafka_conn;
> DROP SECRET IF EXISTS confluent_username
> DROP SECRET IF EXISTS confluent_password
Expand All @@ -30,17 +30,15 @@ abc:abc
SASL PASSWORD = SECRET confluent_password
);

> CREATE CLUSTER canary_sources SIZE '3xsmall';

> CREATE SOURCE bytes
> CREATE SOURCE kafka_bytes
IN CLUSTER canary_sources
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bytes-${testdrive.seed}')
FORMAT BYTES
ENVELOPE NONE;

> CREATE MATERIALIZED VIEW bytes_view AS SELECT COUNT(*) AS cnt FROM bytes;
> CREATE MATERIALIZED VIEW kafka_bytes_view AS SELECT COUNT(*) AS cnt FROM kafka_bytes;

> CREATE DEFAULT INDEX ON bytes_view;
> CREATE DEFAULT INDEX ON kafka_bytes_view;

> SELECT cnt > 0 from bytes_view
> SELECT cnt > 0 from kafka_bytes_view
true
44 changes: 44 additions & 0 deletions test/cloud-canary/canary-redpanda-privatelink-sources.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

$ kafka-create-topic topic=bytes-privatelink replication-factor=3

$ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes-privatelink repeat=100
abc:abc

> DROP SOURCE IF EXISTS redpanda_privatelink_bytes CASCADE;
> DROP CONNECTION IF EXISTS redpanda_privatelink_conn CASCADE;
> DROP SECRET IF EXISTS redpanda_privatelink_password CASCADE;

> CREATE SECRET redpanda_privatelink_password AS '${arg.redpanda-password}';

> VALIDATE CONNECTION privatelink_conn;

# When run immediately: ERROR: Meta data fetch error: BrokerTransportFailure
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=60s

> CREATE CONNECTION redpanda_privatelink_conn TO KAFKA (
AWS PRIVATELINK privatelink_conn (PORT 30292),
SASL MECHANISMS = 'SCRAM-SHA-512',
SASL USERNAME = '${arg.redpanda-username}',
SASL PASSWORD = SECRET redpanda_privatelink_password
);

> CREATE SOURCE redpanda_privatelink_bytes
IN CLUSTER canary_sources
FROM KAFKA CONNECTION redpanda_privatelink_conn (TOPIC 'testdrive-bytes-privatelink-${testdrive.seed}')
FORMAT BYTES
ENVELOPE NONE;

> CREATE MATERIALIZED VIEW redpanda_privatelink_bytes_view AS SELECT COUNT(*) AS cnt FROM redpanda_privatelink_bytes;

> CREATE DEFAULT INDEX ON redpanda_privatelink_bytes_view;

> SELECT cnt from redpanda_privatelink_bytes_view
100
39 changes: 39 additions & 0 deletions test/cloud-canary/canary-redpanda-sources.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

$ kafka-create-topic topic=bytes replication-factor=3

$ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes repeat=100
abc:abc

> DROP SOURCE IF EXISTS redpanda_bytes CASCADE;
> DROP CONNECTION IF EXISTS redpanda_conn CASCADE;
> DROP SECRET IF EXISTS redpanda_password CASCADE;

> CREATE SECRET redpanda_password AS '${arg.redpanda-password}';

> CREATE CONNECTION redpanda_conn TO KAFKA (
BROKER '${testdrive.kafka-addr}',
SASL MECHANISMS = 'SCRAM-SHA-512',
SASL USERNAME = '${arg.redpanda-username}',
SASL PASSWORD = SECRET redpanda_password
);

> CREATE SOURCE redpanda_bytes
IN CLUSTER canary_sources
FROM KAFKA CONNECTION redpanda_conn (TOPIC 'testdrive-bytes-${testdrive.seed}')
FORMAT BYTES
ENVELOPE NONE;

> CREATE MATERIALIZED VIEW redpanda_bytes_view AS SELECT COUNT(*) AS cnt FROM redpanda_bytes;

> CREATE DEFAULT INDEX ON redpanda_bytes_view;

> SELECT cnt from redpanda_bytes_view
100
Loading
Loading