Skip to content

Commit

Permalink
Merge pull request #260 from kate-goldenring/use-emqx-broker
Browse files Browse the repository at this point in the history
fix: switch to using EMQX MQTT broker for e2e tests
  • Loading branch information
kate-goldenring authored Jan 13, 2025
2 parents b60027b + d23f485 commit e3e5b93
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 13 deletions.
2 changes: 1 addition & 1 deletion images/spin/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=${BUILDPLATFORM} rust:1.79 AS build
FROM --platform=${BUILDPLATFORM} rust:1.81 AS build
WORKDIR /opt/build
COPY . .
RUN rustup target add wasm32-wasi && cargo build --target wasm32-wasi --release
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "1.79.0"
channel = "1.81.0"
components = ["clippy", "rustfmt"]
targets = ["wasm32-wasi", "wasm32-unknown-unknown"]
profile = "default"
13 changes: 13 additions & 0 deletions scripts/deploy-workloads.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,29 @@ if ! command -v kubectl &> /dev/null; then
sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl;
fi

update_mqtt_workload_with_broker_cluster_ip() {
local dir=$1
echo "Waiting for emqx pod to be ready"
kubectl wait --for=condition=ready --timeout=20s pod/emqx
# The MQTT trigger cannot do DNS resolution, so we need to use the IP address of the MQTT broker
# Replace "EMQX_CLUSTER_IP" with the actual ClusterIP of the EMQX service
local cluster_ip=$(kubectl get svc emqx -o jsonpath='{.spec.clusterIP}')
sed -i "s/EMQX_CLUSTER_IP/$cluster_ip/g" $dir/workloads.yaml
echo "Updated workloads.yaml with ClusterIP: $cluster_ip"
}

# apply the workloads
echo ">>> apply workloads"
kubectl apply -f tests/workloads-common
# wait for all the pods to be ready
kubectl wait --for=condition=ready --timeout=120s pod --all

if [ "$1" == "workloads-pushed-using-spin-registry-push" ]; then
update_mqtt_workload_with_broker_cluster_ip "tests/workloads-pushed-using-spin-registry-push"
echo "deploying spin apps pushed to registry using 'spin registry push' command"
kubectl apply -f tests/workloads-pushed-using-spin-registry-push
else
update_mqtt_workload_with_broker_cluster_ip "tests/workloads-pushed-using-docker-build-push"
echo "deploying spin apps pushed to registry using 'docker build && k3d image import' command"
kubectl apply -f tests/workloads-pushed-using-docker-build-push
fi
Expand Down
25 changes: 16 additions & 9 deletions tests/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod test {
anyhow::bail!("kubectl is not installed");
}

let forward_port = port_forward_redis(redis_port).await?;
let forward_port = port_forward_svc(redis_port, "redis").await?;

let client = redis::Client::open(format!("redis://localhost:{}", forward_port))?;
let mut con = client.get_multiplexed_async_connection().await?;
Expand Down Expand Up @@ -145,13 +145,14 @@ mod test {
anyhow::bail!("kubectl is not installed");
}

let forward_port = port_forward_redis(redis_port).await?;
let forward_port = port_forward_svc(redis_port, "redis").await?;

let client = redis::Client::open(format!("redis://localhost:{}", forward_port))
.context("connecting to redis")?;
let mut con = client.get_multiplexed_async_connection().await?;

con.publish("testchannel", "some-payload").await?;
con.publish::<_, _, ()>("testchannel", "some-payload")
.await?;

let one_sec = time::Duration::from_secs(1);
thread::sleep(one_sec);
Expand All @@ -174,8 +175,11 @@ mod test {
anyhow::bail!("kubectl is not installed");
}

// Publish a message to the MQTT broker
let mut mqttoptions = rumqttc::MqttOptions::new("123", "test.mosquitto.org", mqtt_port);
// Port forward the emqx mqtt broker
let forward_port = port_forward_svc(mqtt_port, "emqx").await?;

// Publish a message to the emqx broker
let mut mqttoptions = rumqttc::MqttOptions::new("123", "127.0.0.1", forward_port);
mqttoptions.set_keep_alive(std::time::Duration::from_secs(1));

let (client, mut eventloop) = rumqttc::AsyncClient::new(mqttoptions, 10);
Expand Down Expand Up @@ -253,15 +257,18 @@ mod test {
}
}

async fn port_forward_redis(redis_port: u16) -> Result<u16> {
async fn port_forward_svc(svc_port: u16, svc_name: &str) -> Result<u16> {
let port = get_random_port()?;

println!(" >>> kubectl portforward redis {}:{} ", port, redis_port);
println!(
" >>> kubectl portforward svc {} {}:{} ",
svc_name, port, svc_port
);

Command::new("kubectl")
.arg("port-forward")
.arg("redis")
.arg(format!("{}:{}", port, redis_port))
.arg(svc_name)
.arg(format!("{}:{}", port, svc_port))
.spawn()?;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Ok(port)
Expand Down
25 changes: 25 additions & 0 deletions tests/workloads-common/mqtt-broker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: v1
kind: Pod
metadata:
name: emqx
labels:
app: emqx
spec:
containers:
- name: emqx
image: emqx/emqx
ports:
- containerPort: 1883
---
apiVersion: v1
kind: Service
metadata:
name: emqx
spec:
selector:
app: emqx
ports:
- protocol: TCP
port: 1883
targetPort: 1883
type: ClusterIP
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ spec:
value: containerd-shim-spin/mqtt-test-17h24d
# The MQTT trigger cannot do DNS resolution, so we need to use the IP address of the MQTT broker
- name: SPIN_VARIABLE_MQTT_BROKER_URI
value: "mqtt://test.mosquitto.org"
value: "mqtt://EMQX_CLUSTER_IP:1883"
---
apiVersion: v1
kind: Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ spec:
value: containerd-shim-spin/mqtt-test-17h24d
# The MQTT trigger cannot do DNS resolution, so we need to use the IP address of the MQTT broker
- name: SPIN_VARIABLE_MQTT_BROKER_URI
value: "mqtt://test.mosquitto.org"
value: "mqtt://EMQX_CLUSTER_IP:1883"
---
apiVersion: v1
kind: Service
Expand Down

0 comments on commit e3e5b93

Please sign in to comment.