Skip to content

Commit

Permalink
canary-cloud: Use Redpanda Cloud with AWS PrivateLink
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Aug 27, 2024
1 parent 5a12775 commit 47e0e1a
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 27 deletions.
3 changes: 3 additions & 0 deletions bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ case "$cmd" in
--env GIT_AUTHOR_NAME
--env GIT_COMMITTER_EMAIL
--env GIT_COMMITTER_NAME
# For cloud canary
--env REDPANDA_CLOUD_CLIENT_ID
--env REDPANDA_CLOUD_CLIENT_SECRET
)

if [[ $detach_container == "true" ]]; then
Expand Down
121 changes: 121 additions & 0 deletions misc/python/materialize/redpanda_cloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import os
import time
from typing import Any

import requests


def get_result(response: requests.Response) -> dict[str, Any]:
if response.status_code not in (200, 201, 202):
raise ValueError(
f"Redpanda API call failed: {response.status_code} {response.text}"
)
result = response.json()
print(result)
return result


class RedpandaCluster:
def __init__(self, token: str, dataplane_api_url: str) -> None:
self.token = token
self.dataplane_api_url = dataplane_api_url

def headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}

def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]:
return get_result(
requests.post(
f"{self.dataplane_api_url}/v1alpha1/{object}",
json=content,
headers=self.headers(),
)
)


class RedpandaCloud:
def __init__(self) -> None:
client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"]
client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"]

result = get_result(
requests.post(
"https://auth.prd.cloud.redpanda.com/oauth/token",
json={
"client_id": client_id,
"client_secret": client_secret,
"audience": "cloudv2-production.redpanda.cloud",
"grant_type": "client_credentials",
},
)
)
# Can't finish our test otherwise
assert result["expires_in"] >= 3600, result
self.token = result["access_token"]
self.controlplane_api_url = "https://api.redpanda.com"

def headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}

def wait(self, result: dict[str, Any]) -> dict[str, Any]:
operation_id = result["operation"]["id"]
while True:
time.sleep(10)
result = get_result(
requests.get(
f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}",
headers=self.headers(),
)
)
if result["operation"]["state"] == "STATE_COMPLETED":
return result["operation"]
if result["operation"]["state"] == "STATE_FAILED":
raise ValueError(result)
if result["operation"]["state"] != "STATE_IN_PROGRESS":
raise ValueError(result)

def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
return get_result(
requests.post(
f"{self.controlplane_api_url}/v1beta2/{object}",
json=content,
headers=self.headers(),
)
)

def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
return get_result(
requests.patch(
f"{self.controlplane_api_url}/v1beta2/{object}",
json=content,
headers=self.headers(),
)
)

def get(self, object: str) -> dict[str, Any]:
return get_result(
requests.get(
f"{self.controlplane_api_url}/v1beta2/{object}",
headers=self.headers(),
)
)

def delete(self, object: str, id: str) -> dict[str, Any]:
return get_result(
requests.delete(
f"{self.controlplane_api_url}/v1beta2/{object}/{id}",
headers=self.headers(),
)
)

def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster:
return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
$ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes repeat=100
abc:abc

> DROP SOURCE IF EXISTS bytes CASCADE;
> DROP SOURCE IF EXISTS kafka_bytes CASCADE;
> DROP CONNECTION IF EXISTS kafka_conn;
> DROP SECRET IF EXISTS confluent_username
> DROP SECRET IF EXISTS confluent_password
Expand All @@ -30,17 +30,15 @@ abc:abc
SASL PASSWORD = SECRET confluent_password
);

> CREATE CLUSTER canary_sources SIZE '3xsmall';

> CREATE SOURCE bytes
> CREATE SOURCE kafka_bytes
IN CLUSTER canary_sources
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bytes-${testdrive.seed}')
FORMAT BYTES
ENVELOPE NONE;

> CREATE MATERIALIZED VIEW bytes_view AS SELECT COUNT(*) AS cnt FROM bytes;
> CREATE MATERIALIZED VIEW kafka_bytes_view AS SELECT COUNT(*) AS cnt FROM kafka_bytes;

> CREATE DEFAULT INDEX ON bytes_view;
> CREATE DEFAULT INDEX ON kafka_bytes_view;

> SELECT cnt > 0 from bytes_view
> SELECT cnt > 0 from kafka_bytes_view
true
44 changes: 44 additions & 0 deletions test/cloud-canary/canary-redpanda-privatelink-sources.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

$ kafka-create-topic topic=bytes-privatelink replication-factor=3

$ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes-privatelink repeat=100
abc:abc

> DROP SOURCE IF EXISTS redpanda_privatelink_bytes CASCADE;
> DROP CONNECTION IF EXISTS redpanda_privatelink_conn CASCADE;
> DROP SECRET IF EXISTS redpanda_privatelink_password CASCADE;

> CREATE SECRET redpanda_privatelink_password AS '${arg.redpanda-password}';

> VALIDATE CONNECTION privatelink_conn;

# When run immediately: ERROR: Meta data fetch error: BrokerTransportFailure
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=60s

> CREATE CONNECTION redpanda_privatelink_conn TO KAFKA (
AWS PRIVATELINK privatelink_conn (PORT 30292),
SASL MECHANISMS = 'SCRAM-SHA-512',
SASL USERNAME = '${arg.redpanda-username}',
SASL PASSWORD = SECRET redpanda_privatelink_password
);

> CREATE SOURCE redpanda_privatelink_bytes
IN CLUSTER canary_sources
FROM KAFKA CONNECTION redpanda_privatelink_conn (TOPIC 'testdrive-bytes-privatelink-${testdrive.seed}')
FORMAT BYTES
ENVELOPE NONE;

> CREATE MATERIALIZED VIEW redpanda_privatelink_bytes_view AS SELECT COUNT(*) AS cnt FROM redpanda_privatelink_bytes;

> CREATE DEFAULT INDEX ON redpanda_privatelink_bytes_view;

> SELECT cnt from redpanda_privatelink_bytes_view
100
39 changes: 39 additions & 0 deletions test/cloud-canary/canary-redpanda-sources.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

$ kafka-create-topic topic=bytes replication-factor=3

$ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes repeat=100
abc:abc

> DROP SOURCE IF EXISTS redpanda_bytes CASCADE;
> DROP CONNECTION IF EXISTS redpanda_conn CASCADE;
> DROP SECRET IF EXISTS redpanda_password CASCADE;

> CREATE SECRET redpanda_password AS '${arg.redpanda-password}';

> CREATE CONNECTION redpanda_conn TO KAFKA (
BROKER '${testdrive.kafka-addr}',
SASL MECHANISMS = 'SCRAM-SHA-512',
SASL USERNAME = '${arg.redpanda-username}',
SASL PASSWORD = SECRET redpanda_password
);

> CREATE SOURCE redpanda_bytes
IN CLUSTER canary_sources
FROM KAFKA CONNECTION redpanda_conn (TOPIC 'testdrive-bytes-${testdrive.seed}')
FORMAT BYTES
ENVELOPE NONE;

> CREATE MATERIALIZED VIEW redpanda_bytes_view AS SELECT COUNT(*) AS cnt FROM redpanda_bytes;

> CREATE DEFAULT INDEX ON redpanda_bytes_view;

> SELECT cnt from redpanda_bytes_view
100
Loading

0 comments on commit 47e0e1a

Please sign in to comment.