Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7936][Manager] Support issued pulsar subscriptions to sort #7937

Merged
merged 3 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
final String primaryKey = pulsarSource.getPrimaryKey();
final String serviceUrl = pulsarSource.getServiceUrl();
final String adminUrl = pulsarSource.getAdminUrl();
final String scanStartupSubStartOffset =
StringUtils.isNotBlank(pulsarSource.getSubscription()) ? PulsarScanStartupMode.EARLIEST.getValue()
: null;
Map<String, String> properties = parseProperties(pulsarSource.getProperties());

return new PulsarExtractNode(pulsarSource.getSourceName(),
pulsarSource.getSourceName(),
fieldInfos,
Expand All @@ -246,7 +250,9 @@ public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
serviceUrl,
format,
startupMode.getValue(),
primaryKey);
primaryKey,
pulsarSource.getSubscription(),
scanStartupSubStartOffset);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class PulsarSource extends StreamSource {
@ApiModelProperty("Pulsar topic")
private String topic;

@ApiModelProperty("Pulsar subscription")
private String subscription;

@ApiModelProperty("Pulsar adminUrl")
private String adminUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class PulsarSourceDTO {
@ApiModelProperty("Pulsar topic")
private String topic;

@ApiModelProperty("Pulsar subscription")
private String subscription;

@ApiModelProperty("Pulsar adminUrl")
private String adminUrl;

Expand Down Expand Up @@ -83,6 +86,7 @@ public static PulsarSourceDTO getFromRequest(PulsarSourceRequest request) {
.tenant(request.getTenant())
.namespace(request.getNamespace())
.topic(request.getTopic())
.subscription(request.getSubscription())
.primaryKey(request.getPrimaryKey())
.scanStartupMode(request.getScanStartupMode())
.properties(request.getProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class PulsarSourceRequest extends SourceRequest {
@ApiModelProperty("Pulsar topic")
private String topic;

@ApiModelProperty("Pulsar subscription")
private String subscription;

@ApiModelProperty("Pulsar adminUrl")
private String adminUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
/**
* The name rule for Pulsar subscription: clusterTag_topicName_sinkId_consumer_group
*/
private static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";

@Autowired
private InlongClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
Expand All @@ -50,6 +52,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.inlong.manager.service.resource.queue.pulsar.PulsarResourceOperator.PULSAR_SUBSCRIPTION;

/**
* Pulsar stream source operator
*/
Expand All @@ -66,6 +70,8 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
private ObjectMapper objectMapper;
@Autowired
private InlongClusterService clusterService;
@Autowired
private StreamSinkEntityMapper sinkMapper;

@Override
public Boolean accept(String sourceType) {
Expand Down Expand Up @@ -131,6 +137,10 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
pulsarSource.setAdminUrl(adminUrl);
pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
if (StringUtils.isNotBlank(streamInfo.getDataType())) {
String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
pulsarSource.setSerializationType(serializationType);
}
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());

Expand All @@ -145,6 +155,15 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
continue;
}
List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByRelatedId(groupInfo.getInlongGroupId(),
streamId);
// Issued pulsar subscriptions to sort only supports a stream with only one source and one sink
if (sinkEntityList.size() == 1) {
String sub = String.format(PULSAR_SUBSCRIPTION, groupInfo.getInlongClusterTag(),
pulsarSource.getTopic(),
sinkEntityList.get(0).getId());
pulsarSource.setSubscription(sub);
}

pulsarSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));

Expand All @@ -164,7 +183,11 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
pulsarSource.setDataSeparator(String.valueOf((int) ','));
}
}
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
if (StringUtils.isNotBlank(pulsarSource.getSubscription())) {
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue());
} else {
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
}
pulsarSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric {
@JsonProperty("primaryKey")
private String primaryKey;

@JsonProperty("scanStartupSubName")
private String scanStartupSubName;

@JsonProperty("scanStartupSubStartOffset")
private String scanStartupSubStartOffset;

@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand All @@ -71,7 +77,9 @@ public PulsarExtractNode(@JsonProperty("id") String id,
@Nonnull @JsonProperty("serviceUrl") String serviceUrl,
@Nonnull @JsonProperty("format") Format format,
@Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
@JsonProperty("primaryKey") String primaryKey) {
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("scanStartupSubName") String scanStartupSubName,
@JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset) {
super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null.");
this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null.");
Expand All @@ -80,6 +88,8 @@ public PulsarExtractNode(@JsonProperty("id") String id,
"pulsar scanStartupMode is null.");
this.adminUrl = adminUrl;
this.primaryKey = primaryKey;
this.scanStartupSubName = scanStartupSubName;
this.scanStartupSubStartOffset = scanStartupSubStartOffset;
}

/**
Expand All @@ -104,7 +114,10 @@ public Map<String, String> tableOptions() {
options.put("service-url", serviceUrl);
options.put("topic", topic);
options.put("scan.startup.mode", scanStartupMode);

if (scanStartupSubName != null) {
options.put("scan.startup.sub-name", scanStartupSubName);
options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset);
}
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public Node getTestObject() {
"pulsar://localhost:6650",
format,
"earliest",
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public PulsarExtractNode buildPulsarExtractNode() {
"pulsar://localhost:6650",
format,
"earliest",
null,
null,
null);
}

Expand Down