Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Use java api proto inlet (#1071)
Browse files Browse the repository at this point in the history
  • Loading branch information
RayRoestenburg authored Jul 21, 2021
1 parent 449b0e6 commit 120cefa
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,36 @@ final case class ProtoInlet[T <: GeneratedMessageV3](
}

object ProtoInlet {

/**
* Creates a ProtoInlet.
* @param name The name of the inlet
* @param clazz The type of message that can be consumed from this inlet
*/
def create[T <: GeneratedMessageV3](name: String, clazz: Class[T]): ProtoInlet[T] =
ProtoInlet[T](name, clazz)

/**
* Creates a ProtoInlet.
* @param name The name of the inlet
* @param clazz The type of message that can be consumed from this inlet
* @param hasUniqueGroupId Specifies if the inlet should have a unique group Id across streamlet instances.
* Setting hasUniqueGroupId to true, when a streamlet is scaled, will result in the inlet to receive all messsages of the topic (similar to a broadcast).
* Setting hasUniqueGroupId to false, when a streamlet is scaled, will result in the inlet to receive only part of the messages of the topic (partitioned).
*
*/
def create[T <: GeneratedMessageV3](name: String, clazz: Class[T], hasUniqueGroupId: Boolean): ProtoInlet[T] =
ProtoInlet[T](name, clazz, hasUniqueGroupId)

/**
* Creates a ProtoInlet.
* @param name The name of the inlet
* @param clazz The type of message that can be consumed from this inlet
* @param hasUniqueGroupId Specifies if the inlet should have a unique group Id across streamlet instances.
* Setting hasUniqueGroupId to true, when a streamlet is scaled, will result in the inlet to receive all messsages of the topic (similar to a broadcast).
* Setting hasUniqueGroupId to false, when a streamlet is scaled, will result in the inlet to receive only part of the messages of the topic (partitioned).
* @param errorHandler The errorHandler, which is invoked when a message could not be deserialized.
*/
def create[T <: GeneratedMessageV3](
name: String,
clazz: Class[T],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,24 @@ final case class ProtoOutlet[T <: GeneratedMessageV3](name: String, partitioner:
*/
override def withPartitioner(partitioner: T => String): ProtoOutlet[T] = copy(partitioner = partitioner)
}

object ProtoOutlet {

/**
* Creates a ProtoOutlet.
* @param name The name of the outlet
* @param partitioner The partitioner function for the outlet. The String result of the partitioner is used as a key for partitioning.
* @param clazz The type of message that can be produced to this outlet
*/
def create[T <: GeneratedMessageV3](name: String, partitioner: T => String, clazz: Class[T]): ProtoOutlet[T] =
ProtoOutlet[T](name, partitioner, clazz)

/**
* Creates a ProtoOutlet.
* Messages produced to the outlet are distributed in round-robin fashion to topic partitions.
* @param name The name of the outlet
* @param clazz The type of message that can be produced to this outlet
*/
def create[T <: GeneratedMessageV3](name: String, clazz: Class[T]): ProtoOutlet[T] =
ProtoOutlet[T](name, RoundRobinPartitioner, clazz)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.proto.javadsl.ProtoInlet;

import scala.Option;
import java.util.Optional;
import sensordata.grpc.SensorData;

public class Logger extends AkkaStreamlet {
private final ProtoInlet<SensorData> inlet = (ProtoInlet<SensorData>) ProtoInlet.create("in", SensorData.class)
.withErrorHandler((inBytes, throwable) -> {
context().system().log().error(String.format("an exception occurred on inlet: %s -> (hex string) %h", throwable.getMessage(), inBytes));
return Option.apply(null); // skip the element
}
);

private final ProtoInlet<SensorData> inlet = (ProtoInlet<SensorData>) ProtoInlet.create(
"in",
SensorData.class,
false,
(inBytes, throwable) -> {
context().system().log().error(String.format("an exception occurred on inlet: %s -> (hex string) %h", throwable.getMessage(), inBytes));
return Optional.empty();
}
);

@Override
public StreamletShape shape() {
return StreamletShape.createWithInlets(inlet);
Expand Down

0 comments on commit 120cefa

Please sign in to comment.