Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,21 @@
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.data.Solace.Record;
import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper;
import org.apache.beam.sdk.io.solace.read.AckMessageDoFn;
import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
import org.apache.beam.sdk.io.solace.write.AddShardKeyDoFn;
import org.apache.beam.sdk.io.solace.write.SolaceOutput;
import org.apache.beam.sdk.io.solace.write.UnboundedBatchedSolaceWriter;
import org.apache.beam.sdk.io.solace.write.UnboundedStreamingSolaceWriter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
Expand Down Expand Up @@ -412,6 +416,7 @@ public class SolaceIO {
}
};
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
private static final int DEFAULT_ACK_DEADLINE_SECONDS = 30;
private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD =
Duration.standardSeconds(30);
public static final int DEFAULT_WRITER_NUM_SHARDS = 20;
Expand Down Expand Up @@ -461,6 +466,7 @@ public static Read<Solace.Record> read() {
.setParseFn(SolaceRecordMapper::map)
.setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we set this to a higher default value? If there is fused processing that is slow this coudl be exceeded and we'd never ack anything. Another idea would be to measure the latency between pulling and successful finalization calls to notice if we're nacking too aggressively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of measuring latency! At this moment I would keep it as a configuration item where nack happen after 30s by default. After 60s solace invalidates session. afair sessions were expensive in solace so they have to be recycled quickly.

.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
}

Expand Down Expand Up @@ -490,6 +496,7 @@ public static <T> Read<T> read(
.setParseFn(parseFn)
.setTimestampFn(timestampFn)
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
}

Expand Down Expand Up @@ -587,6 +594,16 @@ public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
return this;
}

/**
* Optional, default: 30, max less than 60. Set to ack deadline after which {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader} will start to reject outstanding
* messages that were not successfully checkpointed.
*/
public Read<T> withAckDeadlineSeconds(int ackDeadlineSeconds) {
configurationBuilder.setAckDeadlineSeconds(ackDeadlineSeconds);
return this;
}

/**
* Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}.
*
Expand Down Expand Up @@ -689,6 +706,8 @@ abstract static class Configuration<T> {

abstract Duration getWatermarkIdleDurationThreshold();

abstract int getAckDeadlineSeconds();

public static <T> Builder<T> builder() {
Builder<T> builder =
new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder<T>();
Expand Down Expand Up @@ -719,6 +738,8 @@ abstract Builder<T> setParseFn(

abstract Builder<T> setWatermarkIdleDurationThreshold(Duration idleDurationThreshold);

abstract Builder<T> setAckDeadlineSeconds(int seconds);

abstract Configuration<T> build();
}
}
Expand All @@ -745,18 +766,28 @@ public PCollection<T> expand(PBegin input) {

Coder<T> coder = inferCoder(input.getPipeline(), configuration.getTypeDescriptor());

return input.apply(
org.apache.beam.sdk.io.Read.from(
new UnboundedSolaceSource<>(
initializedQueue,
sempClientFactory,
sessionServiceFactory,
configuration.getMaxNumConnections(),
configuration.getDeduplicateRecords(),
coder,
configuration.getTimestampFn(),
configuration.getWatermarkIdleDurationThreshold(),
configuration.getParseFn())));
PCollection<KV<Long, T>> messages =
input.apply(
org.apache.beam.sdk.io.Read.from(
new UnboundedSolaceSource<>(
initializedQueue,
sempClientFactory,
sessionServiceFactory,
configuration.getMaxNumConnections(),
configuration.getDeduplicateRecords(),
coder,
configuration.getTimestampFn(),
configuration.getWatermarkIdleDurationThreshold(),
configuration.getParseFn(),
configuration.getAckDeadlineSeconds())));

messages
.apply("Keys", Keys.create())
.apply("Reshuffle", Redistribute.arbitrarily())
.apply(
"Ack", ParDo.of(new AckMessageDoFn(initializedQueue.getName(), sempClientFactory)));

return messages.apply("Values", Values.create());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public long getBacklogBytes(String queueName) throws IOException {
return sempBasicAuthClientExecutor.getBacklogBytes(queueName);
}

@Override
public void ack(String queueName, Long msgId) throws IOException {
sempBasicAuthClientExecutor.ack(queueName, msgId);
}

private void createQueue(String queueName) throws IOException {
LOG.info("SolaceIO.Read: Creating new queue {}.", queueName);
sempBasicAuthClientExecutor.createQueueResponse(queueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -143,6 +144,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException

ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
flowProperties.addRequiredSettlementOutcomes(
XMLMessage.Outcome.FAILED, XMLMessage.Outcome.REJECTED);
flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);

EndpointProperties endpointProperties = new EndpointProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@
import java.io.UnsupportedEncodingException;
import java.net.CookieManager;
import java.net.HttpCookie;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.UrlEscapers;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class to execute requests to SEMP v2 with Basic Auth authentication.
Expand All @@ -59,6 +60,7 @@
public class SempBasicAuthClientExecutor implements Serializable {
// Every request will be repeated 2 times in case of abnormal connection failures.
private static final int REQUEST_NUM_RETRIES = 2;
private static final Logger LOG = LoggerFactory.getLogger(SempBasicAuthClientExecutor.class);
private static final Map<CookieManagerKey, CookieManager> COOKIE_MANAGER_MAP =
new ConcurrentHashMap<CookieManagerKey, CookieManager>();
private static final String COOKIES_HEADER = "Set-Cookie";
Expand Down Expand Up @@ -102,6 +104,13 @@ private static String getQueueEndpoint(String messageVpn, String queueName)
"/monitor/msgVpns/%s/queues/%s", urlEncode(messageVpn), urlEncode(queueName));
}

private static String getAckEndpoint(String messageVpn, String queueName, Long msgId)
throws UnsupportedEncodingException {
return String.format(
"/action/msgVpns/%s/queues/%s/msgs/%d/delete",
urlEncode(messageVpn), urlEncode(queueName), msgId);
}

private static String createQueueEndpoint(String messageVpn) throws UnsupportedEncodingException {
return String.format("/config/msgVpns/%s/queues", urlEncode(messageVpn));
}
Expand Down Expand Up @@ -158,6 +167,13 @@ private HttpResponse executePost(GenericUrl url, ImmutableMap<String, Object> pa
return execute(request);
}

private HttpResponse executePut(GenericUrl url, ImmutableMap<String, Object> parameters)
throws IOException {
HttpContent content = new JsonHttpContent(GsonFactory.getDefaultInstance(), parameters);
HttpRequest request = requestFactory.buildPutRequest(url, content);
return execute(request);
}

private HttpResponse execute(HttpRequest request) throws IOException {
request.setNumberOfRetries(REQUEST_NUM_RETRIES);
HttpHeaders httpHeaders = new HttpHeaders();
Expand Down Expand Up @@ -210,8 +226,8 @@ private void storeCookiesInCookieManager(HttpHeaders headers) {
}
}

private static String urlEncode(String queueName) throws UnsupportedEncodingException {
return URLEncoder.encode(queueName, StandardCharsets.UTF_8.name());
private static String urlEncode(String path) {
return UrlEscapers.urlPathSegmentEscaper().escape(path);
}

private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
Expand All @@ -228,6 +244,17 @@ public long getBacklogBytes(String queueName) throws IOException {
return q.data().msgSpoolUsage();
}

public void ack(String queueName, Long msgId) throws IOException {
String queryUrl = getAckEndpoint(messageVpn, queueName, msgId);
ImmutableMap<String, Object> params = ImmutableMap.<String, Object>builder().build();
try {
HttpResponse response = executePut(new GenericUrl(baseUrl + queryUrl), params);
BrokerResponse.fromHttpResponse(response);
} catch (HttpResponseException e) {
LOG.error("Failed to ack message", e);
}
}

private static class CookieManagerKey implements Serializable {
private final String baseUrl;
private final String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ public interface SempClient extends Serializable {
* the amount of data in messages that are waiting to be delivered to consumers.
*/
long getBacklogBytes(String queueName) throws IOException;

void ack(String queueName, Long msgId) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class AckMessageDoFn extends DoFn<Long, Void> {
private static final Logger LOG = LoggerFactory.getLogger(AckMessageDoFn.class);
private String queueName;
private final SempClientFactory sempClientFactory;
@Nullable SempClient sempClient;

public AckMessageDoFn(String queueName, SempClientFactory sempClientFactory) {
this.queueName = queueName;
this.sempClientFactory = sempClientFactory;
}

@StartBundle
public void startBundle() {
sempClient = sempClientFactory.create();
}

@Teardown
public void tearDown() {
if (sempClient != null) {
sempClient = null;
}
}

@ProcessElement
public void processElement(@Element Long msgId) throws IOException {
Preconditions.checkStateNotNull(sempClient).ack(queueName, msgId);
}

@FinishBundle
public void finishBundle() {
if (sempClient != null) {
sempClient = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -38,7 +38,7 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient List<BytesXMLMessage> safeToAck;
private transient Queue<BytesXMLMessage> safeToAck;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}
Expand All @@ -48,24 +48,12 @@ private SolaceCheckpointMark() {}
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
*/
SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
}

@Override
public void finalizeCheckpoint() {
for (BytesXMLMessage msg : safeToAck) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
}
}
public void finalizeCheckpoint() {}

@Override
public boolean equals(@Nullable Object o) {
Expand Down
Loading
Loading