Skip to content

Commit

Permalink
updated easybus to create exchanges if events dont have handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
TareqK committed Nov 7, 2022
1 parent 0b792c0 commit 8da7fd6
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 42 deletions.
9 changes: 7 additions & 2 deletions easybus-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>me.kisoft</groupId>
<artifactId>easybus-core</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
<packaging>jar</packaging>
<parent>
<groupId>me.kisoft</groupId>
<artifactId>easybus-parent</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>

<name>Easybus Core</name>
Expand Down Expand Up @@ -38,6 +38,11 @@
<scope>test</scope>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>

</project>
26 changes: 13 additions & 13 deletions easybus-core/src/main/java/me/kisoft/easybus/EasyBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
package me.kisoft.easybus;

import java.lang.reflect.InvocationTargetException;
import java.util.logging.Level;
import lombok.extern.java.Log;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author tareq
*/
@Log
public class EasyBus {


private final Logger log = LoggerFactory.getLogger(EasyBus.class);
private static final String NO_EVENT_CLASS_ERROR = "Error in Class : %s : No Event Class Specified.";
private static final String EVENT_CLASS_NOT_ANNOTATED = "Error in Class : %s : Event Class : %s : Not annotated with @Event";
private static final String NO_METHOD_DEFINED_ERROR = "Error in Class : %s : 'handle' method for Specified Event type : %s : not defined";

private final Bus bus;

/**
Expand All @@ -29,7 +29,7 @@ public class EasyBus {
public EasyBus() {
bus = new MemoryBusImpl();
}

public EasyBus(Bus bus) {
this.bus = bus;
}
Expand All @@ -48,7 +48,7 @@ public void removeHandlers() {
*/
public void post(Object event) {
if (event != null) {
log.log(Level.FINE, "Event Thrown : {0}", event.getClass().getCanonicalName());
log.debug(String.format("Event Thrown : %s", event.getClass().getCanonicalName()));
bus.post(event);
}
}
Expand Down Expand Up @@ -91,7 +91,7 @@ public final EasyBus search(ClassLoader loader) {
*/
public final EasyBus search(Reflections r) {
for (Class clazz : r.getTypesAnnotatedWith(Handle.class)) {

try {
Object o = clazz.getConstructor().newInstance();
if (o.getClass().getAnnotation(Handle.class).event() == null) {
Expand All @@ -106,13 +106,13 @@ public final EasyBus search(Reflections r) {
}
}
this.addHandler(new EventHandler(o));
log.log(Level.INFO, "Added Event Handler {0}", clazz.getSimpleName());
log.info(String.format("Added Event Handler %s", clazz.getSimpleName()));
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
log.log(Level.SEVERE, null, ex);
log.error(ex.getMessage());
throw new RuntimeException(ex);
}
}

return this;
}

Expand All @@ -133,9 +133,9 @@ public void addHandler(EventHandler handler) {
public void removeHandler(EventHandler handler) {
bus.removeHandler(handler);
}

public void close() throws Exception {
bus.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import java.lang.reflect.Method;
import java.util.Objects;
import lombok.Getter;
import lombok.extern.java.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author tareq
*/
@Log
public class EventHandler {

private final Logger log = LoggerFactory.getLogger(EventHandler.class);
private static final String NO_ANNOTATION = "Handler of type %s Must have the @Handle annotation";
private static final String NO_METHOD = "Handler of type %s Must have the public method 'handle' with parameter type %s";
private static final String NO_CLASS = "Handler of type %s is missing the target event class";
Expand Down Expand Up @@ -66,7 +67,7 @@ public final void handle(Object event) throws RuntimeException {
try {
this.handlerMethod.invoke(this.handler, event);
} catch (Throwable ex) {
log.severe(ex.getMessage());
log.error(ex.getMessage());
throw new RuntimeException(ex);
}

Expand Down Expand Up @@ -108,7 +109,5 @@ public boolean equals(Object obj) {
}
return true;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

/**
*
Expand Down
9 changes: 7 additions & 2 deletions easybus-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
<parent>
<groupId>me.kisoft</groupId>
<artifactId>easybus-parent</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>
<groupId>me.kisoft</groupId>
<artifactId>easybus-mongodb</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
<packaging>jar</packaging>
<name>Easybus Mongodb</name>
<description>Simple, No Frills Event Bus for Java - Mongodb Backing Bus</description>
Expand Down Expand Up @@ -59,6 +59,11 @@
<version>1.17.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import me.kisoft.easybus.Bus;
import me.kisoft.easybus.EventHandler;
import org.jongo.Jongo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author tareq
*/
public class MongodbBusImpl implements Bus {

private final Logger log = LoggerFactory.getLogger(MongodbBusImpl.class);
private final ScheduledExecutorService pool;
private final long pollTime;
private final Map<EventHandler, ScheduledFuture> futureMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Date;
import java.util.logging.Level;
import lombok.extern.java.Log;
import me.kisoft.easybus.EventHandler;
import org.jongo.Jongo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author tareq
*/
@Log
public class MongodbCollectionPollRunnable implements Runnable {

private final Logger log = LoggerFactory.getLogger(MongodbCollectionPollRunnable.class);
private final EventHandler handler;
private final Jongo jongo;
private final ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -63,17 +63,17 @@ public void run() {
.as(MongodbEvent.class);

} catch (RuntimeException ex) {
log.fine(ex.getMessage());
log.debug(ex.getMessage());
event = this.jongo.getCollection(handler.getEventClassName())
.findAndModify("{eventId:#}", event.getEventId())
.with("{$set:{processing:false,handled:false,lastAccess:#}}", new Date())
.as(MongodbEvent.class);
log.log(Level.FINE, "Re-Submitting event with id : {0}", event.getEventId());
log.debug(String.format("Re-Submitting event with id : %s", event.getEventId()));
}

}
} catch (IllegalArgumentException ex) {
log.severe(ex.getMessage());
log.error(ex.getMessage());
}
}

Expand Down
9 changes: 7 additions & 2 deletions easybus-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
<parent>
<groupId>me.kisoft</groupId>
<artifactId>easybus-parent</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>
<artifactId>easybus-rabbitmq</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
<packaging>jar</packaging>
<name>Easybus RabbitMQ</name>
<description>Simple, No Frills Event Bus for Java Backed by RabbitMQ</description>
Expand Down Expand Up @@ -57,5 +57,10 @@
<version>1.17.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package me.kisoft.easybus.rabbitmq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand All @@ -25,22 +24,24 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import lombok.extern.java.Log;
import me.kisoft.easybus.Bus;
import me.kisoft.easybus.EventHandler;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author tareq
*/
@Log
public class RabbitMQBusImpl implements Bus {

private final Logger log = LoggerFactory.getLogger(RabbitMQBusImpl.class);
private final Connection connection;
private final ObjectMapper mapper = new ObjectMapper();
private final Map<EventHandler, String> tagMap = new HashMap<>();
private final Map<EventHandler, Channel> channelMap = new HashMap<>();
private final Map<String, Boolean> exchangeExistanceMap = new HashMap<>();

public RabbitMQBusImpl(Connection connection) {
this.connection = connection;
Expand All @@ -59,6 +60,11 @@ public RabbitMQBusImpl() {
@Override
public void post(Object object) {
try ( Channel channel = this.connection.createChannel()) {
String exchangeName = getExcahngeName(object.getClass());
if (!exchangeExistanceMap.getOrDefault(exchangeName, Boolean.FALSE)) {
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
exchangeExistanceMap.put(exchangeName, Boolean.TRUE);
}
channel.basicPublish(getExcahngeName(object.getClass()), "all", null, mapper.writer().writeValueAsBytes(object));
} catch (IOException | TimeoutException ex) {
throw new RuntimeException(ex);
Expand Down Expand Up @@ -92,14 +98,14 @@ public void clear() {
try {
channel.basicCancel(tag);
} catch (IOException ex) {
log.severe(ex.getMessage());
log.debug(ex.getMessage());
}
});
channelMap.values().forEach(usedChannel -> {
try {
usedChannel.close();
} catch (IOException | TimeoutException ex) {
log.severe(ex.getMessage());
log.error(ex.getMessage());
}
});
} catch (IOException | TimeoutException ex) {
Expand All @@ -117,7 +123,7 @@ public void addHandler(EventHandler handler) {
channel.queueDeclare(queueName, false, false, false, null).getQueue();
channel.queueBind(queueName, exchangeName, RandomStringUtils.randomAlphabetic(30));
String tag = channel.basicConsume(getQueueName(handler.getHandler()), (consumerTag, delivery) -> {
log.fine(String.format("Received Message from Exchange %s Queue %s with Delivery Tag %s", exchangeName, queueName, String.valueOf(delivery.getEnvelope().getDeliveryTag())));
log.debug(String.format("Received Message from Exchange %s Queue %s with Delivery Tag %s", exchangeName, queueName, String.valueOf(delivery.getEnvelope().getDeliveryTag())));
handler.handle(mapper.reader().forType(handler.getEventClass()).readValue(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
Expand All @@ -133,11 +139,14 @@ public void addHandler(EventHandler handler) {
public void removeHandler(EventHandler handler) {
try {
String consumerTag = tagMap.get(handler);
Channel channel = channelMap.get(handler);
channel.basicCancel(consumerTag);
channel.close();
try ( Channel channel = channelMap.get(handler)) {
channel.basicCancel(consumerTag);
}
} catch (IOException | TimeoutException ex) {
throw new RuntimeException(ex);
} finally {
channelMap.remove(handler);
tagMap.remove(handler);
}
}

Expand All @@ -147,7 +156,7 @@ public void close() throws IOException {
try {
channel.close();
} catch (IOException | TimeoutException ex) {
log.severe(ex.getMessage());
log.error(ex.getMessage());
}
});
connection.close();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>me.kisoft</groupId>
<artifactId>easybus-parent</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
<name>EasyBus</name>
<packaging>pom</packaging>
<properties>
Expand Down

0 comments on commit 8da7fd6

Please sign in to comment.