4141import org .apache .beam .sdk .io .solace .data .Solace .SolaceRecordMapper ;
4242import org .apache .beam .sdk .io .solace .read .UnboundedSolaceSource ;
4343import org .apache .beam .sdk .io .solace .write .SolaceOutput ;
44+ import org .apache .beam .sdk .io .solace .write .UnboundedBatchedSolaceWriter ;
45+ import org .apache .beam .sdk .io .solace .write .UnboundedSolaceWriter ;
46+ import org .apache .beam .sdk .io .solace .write .UnboundedStreamingSolaceWriter ;
4447import org .apache .beam .sdk .options .PipelineOptions ;
4548import org .apache .beam .sdk .schemas .NoSuchSchemaException ;
49+ import org .apache .beam .sdk .transforms .MapElements ;
4650import org .apache .beam .sdk .transforms .PTransform ;
51+ import org .apache .beam .sdk .transforms .ParDo ;
4752import org .apache .beam .sdk .transforms .SerializableFunction ;
53+ import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
54+ import org .apache .beam .sdk .transforms .windowing .GlobalWindows ;
55+ import org .apache .beam .sdk .transforms .windowing .Window ;
56+ import org .apache .beam .sdk .values .KV ;
4857import org .apache .beam .sdk .values .PBegin ;
4958import org .apache .beam .sdk .values .PCollection ;
59+ import org .apache .beam .sdk .values .PCollectionTuple ;
5060import org .apache .beam .sdk .values .TupleTag ;
61+ import org .apache .beam .sdk .values .TupleTagList ;
5162import org .apache .beam .sdk .values .TypeDescriptor ;
63+ import org .apache .beam .sdk .values .WindowingStrategy ;
5264import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
65+ import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions ;
5366import org .checkerframework .checker .nullness .qual .Nullable ;
5467import org .joda .time .Duration ;
5568import org .joda .time .Instant ;
@@ -805,7 +818,8 @@ private Queue initializeQueueForTopicIfNeeded(
805818
806819 public enum SubmissionMode {
807820 HIGHER_THROUGHPUT ,
808- LOWER_LATENCY
821+ LOWER_LATENCY ,
822+ TESTING // Send acks 1 by 1, this will be very slow, never use this in an actual pipeline!
809823 }
810824
811825 public enum WriterType {
@@ -816,6 +830,8 @@ public enum WriterType {
816830 @ AutoValue
817831 public abstract static class Write <T > extends PTransform <PCollection <T >, SolaceOutput > {
818832
833+ private static final Logger LOG = LoggerFactory .getLogger (Write .class );
834+
819835 public static final TupleTag <Solace .PublishResult > FAILED_PUBLISH_TAG =
820836 new TupleTag <Solace .PublishResult >() {};
821837 public static final TupleTag <Solace .PublishResult > SUCCESSFUL_PUBLISH_TAG =
@@ -961,6 +977,21 @@ public Write<T> withWriterType(WriterType writerType) {
961977 return toBuilder ().setWriterType (writerType ).build ();
962978 }
963979
980+ /**
981+ * Set the format function for your custom data type, and/or for dynamic destinations.
982+ *
983+ * <p>If you are using a custom data class, this function should return a {@link Solace.Record}
984+ * corresponding to your custom data class instance.
985+ *
986+ * <p>If you are using this formatting function with dynamic destinations, you must ensure that
987+ * you set the right value in the destination value of the {@link Solace.Record} messages.
988+ *
989+ * <p>In any other case, this format function is optional.
990+ */
991+ public Write <T > withFormatFunction (SerializableFunction <T , Solace .Record > formatFunction ) {
992+ return toBuilder ().setFormatFunction (formatFunction ).build ();
993+ }
994+
964995 /**
965996 * Set the provider used to obtain the properties to initialize a new session in the broker.
966997 *
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
10261057
10271058 @ Override
10281059 public SolaceOutput expand (PCollection <T > input ) {
1029- // TODO: will be sent in upcoming PR
1030- return SolaceOutput .in (input .getPipeline (), null , null );
1060+ Class <? super T > pcollClass = checkNotNull (input .getTypeDescriptor ()).getRawType ();
1061+ boolean usingSolaceRecord =
1062+ pcollClass
1063+ .getTypeName ()
1064+ .equals ("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record" )
1065+ || pcollClass .isAssignableFrom (Solace .Record .class );
1066+
1067+ validateWriteTransform (usingSolaceRecord );
1068+
1069+ boolean usingDynamicDestinations = getDestination () == null ;
1070+ SerializableFunction <Solace .Record , Destination > destinationFn ;
1071+ if (usingDynamicDestinations ) {
1072+ destinationFn = x -> SolaceIO .convertToJcsmpDestination (checkNotNull (x .getDestination ()));
1073+ } else {
1074+ // Constant destination for all messages (same topic or queue)
1075+ // This should not be non-null, as nulls would have been flagged by the
1076+ // validateWriteTransform method
1077+ destinationFn = x -> checkNotNull (getDestination ());
1078+ }
1079+
1080+ @ SuppressWarnings ("unchecked" )
1081+ PCollection <Solace .Record > records =
1082+ getFormatFunction () == null
1083+ ? (PCollection <Solace .Record >) input
1084+ : input .apply (
1085+ "Format records" ,
1086+ MapElements .into (TypeDescriptor .of (Solace .Record .class ))
1087+ .via (checkNotNull (getFormatFunction ())));
1088+
1089+ // Store the current window used by the input
1090+ PCollection <Solace .PublishResult > captureWindow =
1091+ records .apply (
1092+ "Capture window" , ParDo .of (new UnboundedSolaceWriter .RecordToPublishResultDoFn ()));
1093+
1094+ @ SuppressWarnings ("unchecked" )
1095+ WindowingStrategy <Solace .PublishResult , BoundedWindow > windowingStrategy =
1096+ (WindowingStrategy <Solace .PublishResult , BoundedWindow >)
1097+ captureWindow .getWindowingStrategy ();
1098+
1099+ PCollection <Solace .Record > withGlobalWindow =
1100+ records .apply ("Global window" , Window .into (new GlobalWindows ()));
1101+
1102+ PCollection <KV <Integer , Solace .Record >> withShardKeys =
1103+ withGlobalWindow .apply (
1104+ "Add shard key" ,
1105+ ParDo .of (new UnboundedSolaceWriter .AddShardKeyDoFn (getMaxNumOfUsedWorkers ())));
1106+
1107+ String label =
1108+ getWriterType () == WriterType .STREAMING ? "Publish (streaming)" : "Publish (batched)" ;
1109+
1110+ PCollectionTuple solaceOutput = withShardKeys .apply (label , getWriterTransform (destinationFn ));
1111+
1112+ SolaceOutput output ;
1113+ if (getDeliveryMode () == DeliveryMode .PERSISTENT ) {
1114+ PCollection <Solace .PublishResult > failedPublish = solaceOutput .get (FAILED_PUBLISH_TAG );
1115+ PCollection <Solace .PublishResult > successfulPublish =
1116+ solaceOutput .get (SUCCESSFUL_PUBLISH_TAG );
1117+ output =
1118+ rewindow (
1119+ SolaceOutput .in (input .getPipeline (), failedPublish , successfulPublish ),
1120+ windowingStrategy );
1121+ } else {
1122+ LOG .info (
1123+ String .format (
1124+ "Solace.Write: omitting writer output because delivery mode is %s" ,
1125+ getDeliveryMode ()));
1126+ output = SolaceOutput .in (input .getPipeline (), null , null );
1127+ }
1128+
1129+ return output ;
1130+ }
1131+
1132+ private ParDo .MultiOutput <KV <Integer , Solace .Record >, Solace .PublishResult > getWriterTransform (
1133+ SerializableFunction <Solace .Record , Destination > destinationFn ) {
1134+
1135+ ParDo .SingleOutput <KV <Integer , Solace .Record >, Solace .PublishResult > writer =
1136+ ParDo .of (
1137+ getWriterType () == WriterType .STREAMING
1138+ ? new UnboundedStreamingSolaceWriter .WriterDoFn (
1139+ destinationFn ,
1140+ checkNotNull (getSessionServiceFactory ()),
1141+ getDeliveryMode (),
1142+ getDispatchMode (),
1143+ getNumberOfClientsPerWorker (),
1144+ getPublishLatencyMetrics ())
1145+ : new UnboundedBatchedSolaceWriter .WriterDoFn (
1146+ destinationFn ,
1147+ checkNotNull (getSessionServiceFactory ()),
1148+ getDeliveryMode (),
1149+ getDispatchMode (),
1150+ getNumberOfClientsPerWorker (),
1151+ getPublishLatencyMetrics ()));
1152+
1153+ return writer .withOutputTags (FAILED_PUBLISH_TAG , TupleTagList .of (SUCCESSFUL_PUBLISH_TAG ));
1154+ }
1155+
1156+ private SolaceOutput rewindow (
1157+ SolaceOutput solacePublishResult ,
1158+ WindowingStrategy <Solace .PublishResult , BoundedWindow > strategy ) {
1159+ PCollection <Solace .PublishResult > correct = solacePublishResult .getSuccessfulPublish ();
1160+ PCollection <Solace .PublishResult > failed = solacePublishResult .getFailedPublish ();
1161+
1162+ PCollection <Solace .PublishResult > correctWithWindow = null ;
1163+ PCollection <Solace .PublishResult > failedWithWindow = null ;
1164+
1165+ if (correct != null ) {
1166+ correctWithWindow = applyOriginalWindow (correct , strategy , "Rewindow correct" );
1167+ }
1168+
1169+ if (failed != null ) {
1170+ failedWithWindow = applyOriginalWindow (failed , strategy , "Rewindow failed" );
1171+ }
1172+
1173+ return SolaceOutput .in (
1174+ solacePublishResult .getPipeline (), failedWithWindow , correctWithWindow );
1175+ }
1176+
1177+ private static PCollection <Solace .PublishResult > applyOriginalWindow (
1178+ PCollection <Solace .PublishResult > pcoll ,
1179+ WindowingStrategy <Solace .PublishResult , BoundedWindow > strategy ,
1180+ String label ) {
1181+ Window <Solace .PublishResult > originalWindow = captureWindowDetails (strategy );
1182+
1183+ if (strategy .getMode () == WindowingStrategy .AccumulationMode .ACCUMULATING_FIRED_PANES ) {
1184+ originalWindow = originalWindow .accumulatingFiredPanes ();
1185+ } else {
1186+ originalWindow = originalWindow .discardingFiredPanes ();
1187+ }
1188+
1189+ return pcoll .apply (label , originalWindow );
1190+ }
1191+
1192+ private static Window <Solace .PublishResult > captureWindowDetails (
1193+ WindowingStrategy <Solace .PublishResult , BoundedWindow > strategy ) {
1194+ return Window .<Solace .PublishResult >into (strategy .getWindowFn ())
1195+ .withAllowedLateness (strategy .getAllowedLateness ())
1196+ .withOnTimeBehavior (strategy .getOnTimeBehavior ())
1197+ .withTimestampCombiner (strategy .getTimestampCombiner ())
1198+ .triggering (strategy .getTrigger ());
1199+ }
1200+
1201+ /**
1202+ * Called before running the Pipeline to verify this transform is fully and correctly specified.
1203+ */
1204+ private void validateWriteTransform (boolean usingSolaceRecords ) {
1205+ if (!usingSolaceRecords ) {
1206+ Preconditions .checkArgument (
1207+ getFormatFunction () != null ,
1208+ "SolaceIO.Write: If you are not using Solace.Record as the input type, you"
1209+ + " must set a format function using withFormatFunction()." );
1210+ }
1211+
1212+ Preconditions .checkArgument (
1213+ getMaxNumOfUsedWorkers () > 0 ,
1214+ "SolaceIO.Write: The number of used workers must be positive." );
1215+ Preconditions .checkArgument (
1216+ getNumberOfClientsPerWorker () > 0 ,
1217+ "SolaceIO.Write: The number of clients per worker must be positive." );
1218+ Preconditions .checkArgument (
1219+ getDeliveryMode () == DeliveryMode .DIRECT || getDeliveryMode () == DeliveryMode .PERSISTENT ,
1220+ String .format (
1221+ "SolaceIO.Write: Delivery mode must be either DIRECT or PERSISTENT. %s"
1222+ + " not supported" ,
1223+ getDeliveryMode ()));
1224+ if (getPublishLatencyMetrics ()) {
1225+ Preconditions .checkArgument (
1226+ getDeliveryMode () == DeliveryMode .PERSISTENT ,
1227+ "SolaceIO.Write: Publish latency metrics can only be enabled for PERSISTENT"
1228+ + " delivery mode." );
1229+ }
1230+ Preconditions .checkArgument (
1231+ getSessionServiceFactory () != null ,
1232+ "SolaceIO: You need to pass a session service factory. For basic"
1233+ + " authentication, you can use BasicAuthJcsmpSessionServiceFactory." );
10311234 }
10321235 }
10331236}
0 commit comments