Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream extension supported #422

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.xiaomi.mone.log.manager.domain;

import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.manager.bootstrap.LogStoragePlugin;
import com.xiaomi.mone.log.manager.common.context.MoneUserContext;
import com.xiaomi.mone.log.manager.mapper.MilogEsClusterMapper;
import com.xiaomi.mone.log.manager.model.pojo.MilogEsClusterDO;
Expand All @@ -37,6 +38,9 @@ public class EsCluster {
@Resource
private MilogEsClusterMapper esClusterMapper;

@Resource
private LogStoragePlugin logStoragePlugin;

private StoreExtensionService storeExtensionService;

public void init() {
Expand All @@ -53,11 +57,23 @@ public EsService getEsService(Long esClusterId) {
if (esClusterId == null) {
return null;
}
if (Ioc.ins().containsBean(Constant.LOG_STORAGE_SERV_BEAN_PRE + esClusterId)) {
return Ioc.ins().getBean(Constant.LOG_STORAGE_SERV_BEAN_PRE + esClusterId);
} else {
return null;

String beanName = Constant.LOG_STORAGE_SERV_BEAN_PRE + esClusterId;

if (isBeanInitialized(esClusterId, beanName)) {
return Ioc.ins().getBean(beanName);
}

return null;
}

private boolean isBeanInitialized(Long esClusterId, String beanName) {
if (Ioc.ins().containsBean(beanName)) {
return true;
}

logStoragePlugin.initializeLogStorage(getById(esClusterId));
return Ioc.ins().containsBean(beanName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private SearchSourceBuilder assembleSearchSourceBuilder(LogQuery logQuery, List<
private void transformSearchResponse(SearchResponse searchResponse, final LogDTO logDTO, List<String> keyList) {
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits == null || hits.length == 0) {
log.info("es query result is empty,es response:{}", searchResponse);
return;
}
List<LogDataDTO> logDataList = Lists.newArrayList();
Expand Down Expand Up @@ -640,9 +641,13 @@ private LogDataDTO hit2DTO(SearchHit hit, List<String> keyList) {
Map<String, Object> ferry = hit.getSourceAsMap();
long time = 0;
if (ferry.containsKey("time") && null != ferry.get("time") && StringUtils.isNotBlank(ferry.get("time").toString())) {
time = DateUtil.parse(ferry.get("time").toString()).toTimestamp().getTime();
try {
time = DateUtil.parse(ferry.get("time").toString()).toTimestamp().getTime();
} catch (Exception e) {
log.error("Log query error, log context error,time:{}", ferry.get("time"), e);
}
}
if (null == ferry.get(LogParser.esKeyMap_timestamp)) {
if (!ferry.containsKey(LogParser.esKeyMap_timestamp) || null == ferry.get(LogParser.esKeyMap_timestamp)) {
logData.setValue(LogParser.esKeyMap_timestamp, time);
} else {
logData.setValue(LogParser.esKeyMap_timestamp, ferry.get(LogParser.esKeyMap_timestamp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class LogStreamConstants {

public static final String DEFAULT_MESSAGE_LIFECYCLE_MANAGER = "defaultMessageLifecycleManager";

public static final String DEFAULT_COMMON_STREAM_EXTENSION = "defaultCommonStreamExtension";


public static final String LOG_STREAM_SPACE_ID = "spaceId";
public static final String LOG_STREAM_STORE_ID = "storeId";
public static final String LOG_STREAM_TAIL_ID = "tailId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import com.alibaba.nacos.api.config.listener.Listener;
import com.google.gson.Gson;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.model.MiLogStreamConfig;
import com.xiaomi.mone.log.model.MilogSpaceData;
import com.xiaomi.mone.log.stream.common.util.StreamUtils;
import com.xiaomi.mone.log.stream.exception.StreamException;
import com.xiaomi.mone.log.stream.job.extension.StreamCommonExtension;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.common.StringUtils;
import com.xiaomi.youpin.docean.plugin.config.anno.Value;
Expand All @@ -40,6 +43,7 @@
import java.util.concurrent.locks.ReentrantLock;

import static com.xiaomi.mone.log.common.Constant.*;
import static com.xiaomi.mone.log.stream.common.LogStreamConstants.DEFAULT_COMMON_STREAM_EXTENSION;

@Service
@Slf4j
Expand All @@ -48,7 +52,7 @@ public class ConfigManager {
private NacosConfig nacosConfig;


@Value("${hera.stream.monitor_space_data_id}")
@Value("$hera.stream.monitor_space_data_id")
private String spaceDataId;

//final String spaceDataId = LOG_MANAGE_PREFIX + NAMESPACE_CONFIG_DATA_ID;
Expand Down Expand Up @@ -158,18 +162,26 @@ private void handleMiLogStreamConfig(MiLogStreamConfig milogStreamConfig) {
}

private void processConfigForUniqueMark(String uniqueMark, Map<String, Map<Long, String>> config) {
if (config.containsKey(uniqueMark)) {
Map<Long, String> dataIdMap = config.get(uniqueMark);
try {
spaceLock.tryLock();
stopUnusefulListenerAndJob(dataIdMap);
startNewListenerAndJob(dataIdMap);
} finally {
spaceLock.unlock();
}
StreamCommonExtension extensionInstance = getStreamCommonExtensionInstance();
if (!extensionInstance.checkUniqueMarkExists(uniqueMark, config)) {
log.warn("listen dataID:{},groupId:{},but receive config is empty", spaceDataId, DEFAULT_GROUP_ID);
return;
}
Map<Long, String> dataIdMap = extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
try {
spaceLock.tryLock();
stopUnusefulListenerAndJob(dataIdMap);
startNewListenerAndJob(dataIdMap);
} finally {
spaceLock.unlock();
}
}

private StreamCommonExtension getStreamCommonExtensionInstance() {
String factualServiceName = Config.ins().get("common.stream.extension", DEFAULT_COMMON_STREAM_EXTENSION);
return Ioc.ins().getBean(factualServiceName);
}

/**
* The new {spaceid,dataid} A is compared to {spaceid,dataid} B in memory to filter out the sets A-B
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import com.alibaba.nacos.api.config.listener.Listener;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.model.LogtailConfig;
import com.xiaomi.mone.log.model.MilogSpaceData;
import com.xiaomi.mone.log.model.SinkConfig;
import com.xiaomi.mone.log.stream.job.JobManager;
import com.xiaomi.mone.log.stream.job.extension.StreamCommonExtension;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.anno.Component;
import com.xiaomi.youpin.docean.common.StringUtils;
import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
Expand All @@ -42,6 +45,7 @@
import java.util.stream.Collectors;

import static com.xiaomi.mone.log.common.Constant.GSON;
import static com.xiaomi.mone.log.stream.common.LogStreamConstants.DEFAULT_COMMON_STREAM_EXTENSION;

@Component
@Slf4j
Expand All @@ -66,6 +70,8 @@ public class MilogConfigListener {

private ReentrantLock buildDataLock = new ReentrantLock();

private StreamCommonExtension streamCommonExtension;

public MilogConfigListener(Long spaceId, String dataId, String group, MilogSpaceData milogSpaceData, NacosConfig nacosConfig) {
this.spaceId = spaceId;
this.dataId = dataId;
Expand All @@ -75,6 +81,12 @@ public MilogConfigListener(Long spaceId, String dataId, String group, MilogSpace
this.jobManager = new JobManager();
this.listener = getListener(dataId, milogSpaceData);
nacosConfig.addListener(dataId, group, listener);
streamCommonExtension = getStreamCommonExtensionInstance();
}

private StreamCommonExtension getStreamCommonExtensionInstance() {
String factualServiceName = Config.ins().get("common.stream.extension", DEFAULT_COMMON_STREAM_EXTENSION);
return Ioc.ins().getBean(factualServiceName);
}

private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) {
Expand Down Expand Up @@ -248,7 +260,12 @@ private void initNewJob(MilogSpaceData newMilogSpaceData) {

private void startTailPer(SinkConfig sinkConfig, LogtailConfig logTailConfig, Long logSpaceId) {
if (null == logSpaceId) {
log.error("startTailPer error,logSpaceId is null,LogtailConfig:{}", gson.toJson(logTailConfig), new RuntimeException());
log.warn("startTailPer error,logSpaceId is null,LogTailConfig:{}", gson.toJson(logTailConfig));
return;
}
Boolean isStart = streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig, logSpaceId);
if (!isStart) {
log.warn("preCheckTaskExecution error,preCheckTaskExecution is false,LogTailConfig:{}", gson.toJson(logTailConfig));
return;
}
log.info("【Listen tail】Initialize the new task, tail configuration:{},index:{},cluster information:{},spaceId:{}", gson.toJson(logTailConfig), sinkConfig.getEsIndex(), gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
Expand All @@ -271,9 +288,10 @@ public void receiveConfigInfo(String dataValue) {
try {
log.info("listen tail received a configuration request:{},a configuration that already exists:storeMap:{},tailMap:{}", dataValue, gson.toJson(oldSinkConfigMap), gson.toJson(oldLogTailConfigMap));
if (StringUtils.isNotEmpty(dataValue) && !Constant.NULLVALUE.equals(dataValue)) {
dataValue = streamCommonExtension.dataPreProcess(dataValue);
MilogSpaceData newMilogSpaceData = GSON.fromJson(dataValue, MilogSpaceData.class);
if (null == newMilogSpaceData || CollectionUtils.isEmpty(newMilogSpaceData.getSpaceConfig())) {
log.error("Listen tail received configuration error,dataId:{},spaceId:{}", dataId, milogSpaceData.getMilogSpaceId());
log.warn("Listen tail received configuration error,dataId:{},spaceId:{}", dataId, milogSpaceData.getMilogSpaceId());
return;
}
handleNacosConfigDataJob(newMilogSpaceData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ public void closeJobs(MilogSpaceData milogSpaceData) {

private void sinkJobsShutDown(LogtailConfig logtailConfig) {
Map<SinkJobEnum, SinkJob> sinkJobs = jobs.get(logtailConfig.getLogtailId());
sinkJobs.values().forEach(sinkJob -> {
try {
sinkJob.shutdown();
} catch (Exception e) {
log.error("[JobManager.shutdown] closeJobs.shutdown error,logTailID:{}", logtailConfig.getLogtailId(), e);
}
});
if (null != sinkJobs && !sinkJobs.isEmpty()) {
sinkJobs.values().forEach(sinkJob -> {
try {
sinkJob.shutdown();
} catch (Exception e) {
log.error("[JobManager.shutdown] closeJobs.shutdown error,logTailID:{}", logtailConfig.getLogtailId(), e);
}
});
}
jobs.remove(logtailConfig.getLogtailId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,16 @@ public void handleMessage(String type, String msg, String time) {
}

private void toSendMessage(Map<String, Object> dataMap) throws Exception {
if (sendMsgNumber.get() % COUNT_NUM == 0 || sendMsgNumber.get() == 1) {
log.info(jobType.name() + " send msg:{}", dataMap);
}
if (SinkJobEnum.NORMAL_JOB == jobType) {
if (null != dataMap && !sinkChain.execute(dataMap)) {
sendMessage(dataMap);
}
} else {
sendMessage(dataMap);
}
if (sendMsgNumber.get() % COUNT_NUM == 0 || sendMsgNumber.get() == 1) {
log.info(jobType.name() + " send msg:{}", dataMap);
}
}

private Map<String, Object> parseMessage(LineMessage lineMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import com.xiaomi.mone.log.stream.job.extension.impl.EsMessageSender;
import com.xiaomi.mone.log.stream.job.extension.impl.RocketMqMessageProduct;
import com.xiaomi.mone.log.stream.plugin.es.EsPlugin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import static com.xiaomi.mone.log.common.Constant.GSON;

/**
* @author wtt
* @version 1.0
* @description
* @date 2023/11/14 15:09
*/
@Slf4j
public class MessageSenderFactory {

public static MessageSender getMessageSender(SinkJobConfig sinkJobConfig) {
Expand All @@ -50,8 +55,12 @@ public static MessageSender getMessageSender(SinkJobConfig sinkJobConfig) {

private static MessageSender getEsMessageSender(SinkJobConfig sinkJobConfig, MqMessageProduct mqMessageProduct) {
String index = sinkJobConfig.getIndex();
if (StringUtils.isEmpty(index)) {
log.error("es index is null,sinkJobConfig:{}", GSON.toJson(sinkJobConfig));
throw new RuntimeException("es index is null");
}
EsMessageSender esMessageSender = new EsMessageSender(index, mqMessageProduct);
EsProcessor esProcessor = EsPlugin.getEsProcessor(sinkJobConfig.getStorageInfo(), mqMessageDTO -> esMessageSender.compensateSend(mqMessageDTO));
EsProcessor esProcessor = EsPlugin.getEsProcessor(sinkJobConfig.getStorageInfo(), esMessageSender::compensateSend);
esMessageSender.setEsProcessor(esProcessor);
return esMessageSender;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2020 Xiaomi Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.xiaomi.mone.log.stream.job.extension;

import com.xiaomi.mone.log.model.LogtailConfig;
import com.xiaomi.mone.log.model.SinkConfig;

import java.util.Map;

/**
* @author wtt
* @version 1.0
* @description
* @date 2024/8/20 14:40
*/
public interface StreamCommonExtension {
String dataPreProcess(String data);

Boolean checkUniqueMarkExists(String uniqueMark, Map<String, Map<Long, String>> config);

Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long, String>> config, String uniqueMark);

Boolean preCheckTaskExecution(SinkConfig sinkConfig, LogtailConfig logTailConfig, Long logSpaceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2020 Xiaomi Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.xiaomi.mone.log.stream.job.extension.impl;

import com.xiaomi.mone.log.model.LogtailConfig;
import com.xiaomi.mone.log.model.SinkConfig;
import com.xiaomi.mone.log.stream.job.extension.StreamCommonExtension;
import com.xiaomi.youpin.docean.anno.Service;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

import static com.xiaomi.mone.log.stream.common.LogStreamConstants.DEFAULT_COMMON_STREAM_EXTENSION;

/**
* @author wtt
* @version 1.0
* @description
* @date 2024/8/20 14:45
*/
@Service(name = DEFAULT_COMMON_STREAM_EXTENSION)
@Slf4j
public class DefaultStreamCommonExtension implements StreamCommonExtension {
@Override
public String dataPreProcess(String data) {
return data;
}

public Boolean checkUniqueMarkExists(String uniqueMark, Map<String, Map<Long, String>> config) {
return config.containsKey(uniqueMark);
}

public Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long, String>> config, String uniqueMark) {
return config.get(uniqueMark);
}

@Override
public Boolean preCheckTaskExecution(SinkConfig sinkConfig, LogtailConfig logTailConfig, Long logSpaceId) {
return true;
}
}
Loading
Loading