Skip to content

Commit

Permalink
[INLONG-7936][Manager] Support issued pulsar subscriptions to sort (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 authored and GanfengTan committed May 10, 2023
1 parent b88bf93 commit 4b330a7
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
final String primaryKey = pulsarSource.getPrimaryKey();
final String serviceUrl = pulsarSource.getServiceUrl();
final String adminUrl = pulsarSource.getAdminUrl();
final String scanStartupSubStartOffset =
StringUtils.isNotBlank(pulsarSource.getSubscription()) ? PulsarScanStartupMode.EARLIEST.getValue()
: null;
Map<String, String> properties = parseProperties(pulsarSource.getProperties());

return new PulsarExtractNode(pulsarSource.getSourceName(),
pulsarSource.getSourceName(),
fieldInfos,
Expand All @@ -246,7 +250,9 @@ public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
serviceUrl,
format,
startupMode.getValue(),
primaryKey);
primaryKey,
pulsarSource.getSubscription(),
scanStartupSubStartOffset);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class PulsarSource extends StreamSource {
@ApiModelProperty("Pulsar topic")
private String topic;

@ApiModelProperty("Pulsar subscription")
private String subscription;

@ApiModelProperty("Pulsar adminUrl")
private String adminUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class PulsarSourceDTO {
@ApiModelProperty("Pulsar topic")
private String topic;

@ApiModelProperty("Pulsar subscription")
private String subscription;

@ApiModelProperty("Pulsar adminUrl")
private String adminUrl;

Expand Down Expand Up @@ -83,6 +86,7 @@ public static PulsarSourceDTO getFromRequest(PulsarSourceRequest request) {
.tenant(request.getTenant())
.namespace(request.getNamespace())
.topic(request.getTopic())
.subscription(request.getSubscription())
.primaryKey(request.getPrimaryKey())
.scanStartupMode(request.getScanStartupMode())
.properties(request.getProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class PulsarSourceRequest extends SourceRequest {
@ApiModelProperty("Pulsar topic")
private String topic;

@ApiModelProperty("Pulsar subscription")
private String subscription;

@ApiModelProperty("Pulsar adminUrl")
private String adminUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
/**
* The name rule for Pulsar subscription: clusterTag_topicName_sinkId_consumer_group
*/
private static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";

@Autowired
private InlongClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
Expand All @@ -50,6 +52,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.inlong.manager.service.resource.queue.pulsar.PulsarResourceOperator.PULSAR_SUBSCRIPTION;

/**
* Pulsar stream source operator
*/
Expand All @@ -66,6 +70,8 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
private ObjectMapper objectMapper;
@Autowired
private InlongClusterService clusterService;
@Autowired
private StreamSinkEntityMapper sinkMapper;

@Override
public Boolean accept(String sourceType) {
Expand Down Expand Up @@ -131,6 +137,10 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
pulsarSource.setAdminUrl(adminUrl);
pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
if (StringUtils.isNotBlank(streamInfo.getDataType())) {
String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
pulsarSource.setSerializationType(serializationType);
}
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());

Expand All @@ -145,6 +155,15 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
continue;
}
List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByRelatedId(groupInfo.getInlongGroupId(),
streamId);
// Issued pulsar subscriptions to sort only supports a stream with only one source and one sink
if (sinkEntityList.size() == 1) {
String sub = String.format(PULSAR_SUBSCRIPTION, groupInfo.getInlongClusterTag(),
pulsarSource.getTopic(),
sinkEntityList.get(0).getId());
pulsarSource.setSubscription(sub);
}

pulsarSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));

Expand All @@ -164,7 +183,11 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
pulsarSource.setDataSeparator(String.valueOf((int) ','));
}
}
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
if (StringUtils.isNotBlank(pulsarSource.getSubscription())) {
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue());
} else {
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
}
pulsarSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric {
@JsonProperty("primaryKey")
private String primaryKey;

@JsonProperty("scanStartupSubName")
private String scanStartupSubName;

@JsonProperty("scanStartupSubStartOffset")
private String scanStartupSubStartOffset;

@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand All @@ -71,7 +77,9 @@ public PulsarExtractNode(@JsonProperty("id") String id,
@Nonnull @JsonProperty("serviceUrl") String serviceUrl,
@Nonnull @JsonProperty("format") Format format,
@Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
@JsonProperty("primaryKey") String primaryKey) {
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("scanStartupSubName") String scanStartupSubName,
@JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset) {
super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null.");
this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null.");
Expand All @@ -80,6 +88,8 @@ public PulsarExtractNode(@JsonProperty("id") String id,
"pulsar scanStartupMode is null.");
this.adminUrl = adminUrl;
this.primaryKey = primaryKey;
this.scanStartupSubName = scanStartupSubName;
this.scanStartupSubStartOffset = scanStartupSubStartOffset;
}

/**
Expand All @@ -104,7 +114,10 @@ public Map<String, String> tableOptions() {
options.put("service-url", serviceUrl);
options.put("topic", topic);
options.put("scan.startup.mode", scanStartupMode);

if (scanStartupSubName != null) {
options.put("scan.startup.sub-name", scanStartupSubName);
options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset);
}
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public Node getTestObject() {
"pulsar://localhost:6650",
format,
"earliest",
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public PulsarExtractNode buildPulsarExtractNode() {
"pulsar://localhost:6650",
format,
"earliest",
null,
null,
null);
}

Expand Down

0 comments on commit 4b330a7

Please sign in to comment.