An event sourcing framework built on top of Apache Cassandra allowing horizontally scalable throughput and native cross data-center replication.
- Exactly one delivery.
- Horizontal Scalability.
- Backed by Apache Cassandra.
- Native cross data center replication.
Once you have setup a Cassandra cluster then deploy the necessary schema located in homeaway github (old version) or in Expedia Group github (newer version).
You need to use Java version compatible with your Cassandra version to execute unit-tests for this project. E.g. if you use Cassandra 3, you need to use Java 8. Higher Java versions will cause errors like "NoHostAvailable" failures in unit tests. You can change JDK by setting JAVA_HOME environment variable in the shell, where you run the tests.
Warning: some unit tests are observed to fail occasionally, then succeed without any code changes.
Make sure you include the Photon dependency below in your code
<dependencies>
<dependency>
<groupId>com.homeaway.datatools.photon</groupId>
<artifactId>photon</artifactId>
<version>0.1.8</version>
</dependency>
</dependencies>
In order to create a producer or consumer, you will first need to define the properties that will be used to build the producer or consumer. Below an example of properties using the default Cassandra driver and the default Json serializer.
Properties properties = new Properties();
properties.put(PHOTON_DRIVER_CLASS, "com.homeaway.datatools.photon.driver.CassandraPhotonDriver");
properties.put(PHOTON_SERIALIZER_CLASS, "com.homeaway.datatools.photon.serialization.JsonPhotonSerializer");
properties.put(SESSION_CONTACT_POINTS, "XX.XXX.X.XX,XX.XXX.XX.XXX,XX.XXX.XX.XXX");
properties.put(SESSION_USER_NAME,"{username}");
properties.put(SESSION_PASSWORD,"{password}");
properties.put(SESSION_KEYSPACE,"{keyspace}");
BeamProducer producer = Producers.newProducer(properties);
Properties properties = new Properties();
properties.put(CONSUMER_TYPE, "SINGLE_REGION");
properties.put(PHOTON_DRIVER_CLASS, "com.homeaway.datatools.photon.driver.CassandraPhotonDriver");
properties.put(PHOTON_DESERIALIZER_CLASS, "com.homeaway.datatools.photon.serialization.JsonPhotonDeserializer");
properties.put(SESSION_CONTACT_POINTS, "XX.XXX.X.XX,XX.XXX.XX.XXX,XX.XXX.XX.XXX");
properties.put(SESSION_USER_NAME,"{username}");
properties.put(SESSION_PASSWORD,"{password}");
properties.put(SESSION_KEYSPACE,"{keyspace}");
ConsumerFactory factory = Consumers.newConsumerFactory(properties);
PayloadObject obj = new PayloadObject();
producer.writeMessageToBeam("stream.name.here", "message.key.here", obj);
PayloadObject obj = new PayloadObject();
BeamFuture future = producer.writeMessageToBeamAsync("stream.name.here", "message.key.here", obj);
if (future.get())
{
//Successful write
}
These examples will write events that will be automatically deleted in 30 days.
Duration ttl = Duration.ofDays(30);
PayloadObject obj = new PayloadObject();
producer.writeMessageToBeam("stream.name.here", "message.key.here", obj, ttl);
Duration ttl = Duration.ofDays(30);
PayloadObject obj = new PayloadObject();
BeamFuture future = producer.writeMessageToBeamAsync("stream.name.here", "message.key.here", obj, ttl);
if (future.get())
{
//Successful write
}
These examples will write an event that will be received 5 days from now.
Instant writeTime = Instant.now().plus(5, ChronoUnit.DAYS);
PayloadObject obj = new PayloadObject();
producer.writeMessageToBeam("stream.name.here", "message.key.here", obj, writeTime);
Instant writeTime = Instant.now().plus(5, ChronoUnit.DAYS);
PayloadObject obj = new PayloadObject();
BeamFuture future = producer.writeMessageToBeamAsync("stream.name.here", "message.key.here", obj, writeTime);
if (future.get())
{
//Successful write
}
Below is an example of how to setup and standard consumer that will beginning reading from the last point where this client read to.
PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() {
@Override
public void handleMessage(PhotonMessage message) {
//Perform some action base on the message received
}
@Override
public void handleException(BeamException beamException) {
//Handle exception thrown by the message
}
@Override
public void handleStaleMessage(PhotonMessage message) {
//If a message arrives here then it is because the it arrived out of order
}
}, PhotonBeamReaderOffsetType.FROM_CURRENT);
try {
consumer.start()
} catch (Exception e)
{
}
Events are processed in the order they are written and provides mechanism for detecting events that arrive late.
This example sets the polling interval to 500 milliseconds.
PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.setPollingInterval(500L);
try {
consumer.start()
} catch (Exception e)
{
}
There are 3 offset options:
The example below would read from the beginning of a stream:
PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_BEGINNING);
try {
consumer.start()
} catch (Exception e)
{
}
The example below will read from the last point where this client left off
PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_CURRENT);
try {
consumer.start()
} catch (Exception e)
{
}
The example below will read all messages starting from 3 hours ago
PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_OFFSET,
Instant.now().minus(3, ChronoUnit.HOURS));
try {
consumer.start()
} catch (Exception e)
{
}
The example below will read all messages starting from the Instant returned by the provided BiFunction
PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_OFFSET,
(clientName, beamName) -> {
//Some logic here to determine determine where to beginning reading from and returns //and Instant
});
try {
consumer.start()
} catch (Exception e)
{
}
Async consumer for processing events on multiple threads while still ensuring order with guaranteed delivery
The following example will create and Async processor that will map PhotonMessage
to Widget
and then execute the provided Consumer<Widget>
on separate threads. The events will be processed in order by stream name and message key and there will be no limit to the number of events that can be container in memory at any given time (This can be dangerous in high throughput situations where there are memory constraints).
AsyncPhotonConsumer<Widget> asyncConsumer = consumerFactory.getAsyncPhotonConsumer();
asyncConsumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
(photonMessage) -> {
Widget widget = new Widget();
//Some logic to mape the PhotonMessage to a Widget
return widget
},
(widget) -> {
//Some logic that is performed on the provided widget
},
PhotonBeamReaderOffsetType.FROM_CURRENT);
try {
asyncConsumer.start()
} catch (Exception e)
{
}
Configurable memory utilization for Async consumer (limit the number of events that are kept in memory at any give time).
The following example is the same as about but it will limit the number of events that can be held in memory to 500.
AsyncPhotonConsumer<Widget> asyncConsumer = consumerFactory.getAsyncPhotonConsumer(500);
asyncConsumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
(photonMessage) -> {
Widget widget = new Widget();
//Some logic to mape the PhotonMessage to a Widget
return widget
},
(widget) -> {
//Some logic that is performed on the provided widget
},
PhotonBeamReaderOffsetType.FROM_CURRENT);
try {
asyncConsumer.start()
} catch (Exception e)
{
}