Skip to content

Commit

Permalink
feat(examples): Add examples which show the usage of the `HeaderInfor…
Browse files Browse the repository at this point in the history
…mation` API.

Co-authored-by: cb0s <cedric.boes@online.de>
  • Loading branch information
fussel178 and cb0s committed Jan 29, 2022
1 parent 9bc61d3 commit 636def2
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package de.wuespace.telestion.examples.header;

import de.wuespace.telestion.api.message.HeaderInformation;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;

@SuppressWarnings("unused")
public class DelayCounterInformation extends HeaderInformation {

public static final String DELAY_KEY = "delay";
public static final String COUNTER_KEY = "counter";

public static final int DELAY_DEFAULT_VALUE = -1;
public static final int COUNTER_DEFAULT_VALUE = -1;

public static DelayCounterInformation from(MultiMap headers) {
return new DelayCounterInformation(headers);
}

public static DelayCounterInformation from(Message<?> message) {
return new DelayCounterInformation(message);
}

public static DelayCounterInformation from(DeliveryOptions options) {
return new DelayCounterInformation(options);
}

public DelayCounterInformation() {
this(DELAY_DEFAULT_VALUE, COUNTER_DEFAULT_VALUE);
}

public DelayCounterInformation(int delay, int counter) {
setDelay(delay);
setCounter(counter);
}

public DelayCounterInformation(DelayCounterInformation other) {
this(other.getDelay(), other.getCounter());
}

public DelayCounterInformation(MultiMap headers) {
super(headers);
}

public DelayCounterInformation(Message<?> message) {
super(message);
}

public DelayCounterInformation(DeliveryOptions options) {
super(options);
}

public DelayCounterInformation setDelay(int delay) {
add(DELAY_KEY, delay);
return this;
}

public DelayCounterInformation setCounter(int counter) {
add(COUNTER_KEY, counter);
return this;
}

public int getDelay() {
return getInt(DELAY_KEY, DELAY_DEFAULT_VALUE);
}

public int getCounter() {
return getInt(COUNTER_KEY, COUNTER_DEFAULT_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package de.wuespace.telestion.examples.header;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.HeaderInformation;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;
import de.wuespace.telestion.api.verticle.trait.WithTiming;
import io.vertx.core.Vertx;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

@SuppressWarnings("unused")
public class Publisher extends TelestionVerticle<Publisher.Configuration> implements WithTiming, WithEventBus {

public static void main(String[] args) {
var vertx = Vertx.vertx();

vertx.deployVerticle(new Publisher());
vertx.deployVerticle(new Receiver());
}

public record Configuration(
@JsonProperty String outAddress,
@JsonProperty int delay
) implements TelestionConfiguration {
public Configuration() {
this("publish-channel", 1);
}
}

@Override
public void onStart() {
var delay = Duration.ofSeconds(getConfig().delay());
var counter = new AtomicInteger();

interval(delay, id -> {
var infos = new HeaderInformation()
.add("delay", getConfig().delay())
.add("counter", counter.getAndIncrement());

publish(getConfig().outAddress(), "Hello from Publisher", infos);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package de.wuespace.telestion.examples.header;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.HeaderInformation;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;

@SuppressWarnings("unused")
public class Receiver extends TelestionVerticle<Receiver.Configuration> implements WithEventBus {

public static void main(String[] args) {
Publisher.main(args);
}

public record Configuration(@JsonProperty String inAddress) implements TelestionConfiguration {
public Configuration() {
this("publish-channel");
}
}

@Override
public void onStart() {
register(getConfig().inAddress(), message -> {
var infos = HeaderInformation.from(message);
var delay = infos.getInt("delay", -1);
var counter = infos.getInt("counter", -1);

logger.info("Received message: {}", message.body());
logger.info("Publisher delay: {}", delay);
logger.info("Publisher counter: {}", counter);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package de.wuespace.telestion.examples.header;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;
import de.wuespace.telestion.api.verticle.trait.WithTiming;
import io.vertx.core.Vertx;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

@SuppressWarnings("unused")
public class Requester extends TelestionVerticle<Requester.Configuration> implements WithTiming, WithEventBus {

public static void main(String[] args) {
var vertx = Vertx.vertx();

vertx.deployVerticle(new Requester());
vertx.deployVerticle(new Responder());
}

public record Configuration(
@JsonProperty String requestAddress,
@JsonProperty int delay
) implements TelestionConfiguration {
public Configuration() {
this("request-channel", 1);
}
}

@Override
public void onStart() {
var delay = Duration.ofSeconds(getConfig().delay());
var requestCounter = new AtomicInteger();

// Send a "Ping" message with custom headers periodically and request pong signal with custom headers
interval(delay, id -> {
var requestTime = System.currentTimeMillis();

var requestInfos = new DelayCounterInformation(getConfig().delay(), requestCounter.getAndIncrement());
var requestTimes = new TimeInformation(requestTime, requestTime);

request(getConfig().requestAddress(), "Ping", requestInfos, requestTimes).onSuccess(message -> {
var responseInfos = DelayCounterInformation.from(message);
var responseTimes = TimeInformation.from(message);

logger.info("Response body: {}", message.body());
logger.info("Response counter: {}", responseInfos.getCounter());
logger.info("Message Received on: {}", responseTimes.getReceiveTime());
});
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package de.wuespace.telestion.examples.header;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.HeaderInformation;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;

import java.util.concurrent.atomic.AtomicInteger;

@SuppressWarnings("unused")
public class Responder extends TelestionVerticle<Responder.Configuration> implements WithEventBus {

public static void main(String[] args) {
Requester.main(args);
}

public record Configuration(@JsonProperty String respondAddress) implements TelestionConfiguration {
public Configuration() {
this("request-channel");
}
}

@Override
public void onStart() {
var responseCounter = new AtomicInteger();

register(getConfig().respondAddress(), message -> {
var requestInfos = DelayCounterInformation.from(message);
var requestTimes = TimeInformation.from(message);

logger.info("Request body: {}", message.body());
logger.info("Request delay: {}", requestInfos.getDelay());
logger.info("Request counter: {}", requestInfos.getCounter());
logger.info("Request send time: {}", requestTimes.getSendTime());

var sendTime = System.currentTimeMillis();
var responseInfos = new DelayCounterInformation(
DelayCounterInformation.DELAY_DEFAULT_VALUE,
responseCounter.getAndIncrement());
var responseTimes = new TimeInformation(requestTimes).setSendTime(sendTime);

message.reply("Pong", HeaderInformation.merge(responseInfos, responseTimes).toOptions());
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package de.wuespace.telestion.examples.header;

import de.wuespace.telestion.api.message.HeaderInformation;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;

public class TimeInformation extends HeaderInformation {

public static final String RECEIVE_TIME = "receive-time";
public static final String SEND_TIME = "send-time";

public static final long NO_TIME = -1L;

public static TimeInformation from(MultiMap headers) {
return new TimeInformation(headers);
}

public static TimeInformation from(Message<?> message) {
return new TimeInformation(message);
}

public static TimeInformation from(DeliveryOptions options) {
return new TimeInformation(options);
}

public TimeInformation() {
this(System.currentTimeMillis());
}

public TimeInformation(long sendTime) {
this(NO_TIME, sendTime);
}

public TimeInformation(long receiveTime, long sendTime) {
setReceiveTime(receiveTime);
setSendTime(sendTime);
}

public TimeInformation(TimeInformation other) {
this(other.getReceiveTime(), other.getSendTime());
}
public TimeInformation(MultiMap headers) {
super(headers);
}

public TimeInformation(Message<?> message) {
super(message);
}

public TimeInformation(DeliveryOptions options) {
super(options);
}

public TimeInformation setReceiveTime(long receiveTime) {
add(RECEIVE_TIME, receiveTime);
return this;
}

public TimeInformation setSendTime(long sendTime) {
add(SEND_TIME, sendTime);
return this;
}

public long getReceiveTime() {
return getLong(RECEIVE_TIME, NO_TIME);
}

public long getSendTime() {
return getLong(SEND_TIME, NO_TIME);
}
}

0 comments on commit 636def2

Please sign in to comment.