Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-8311][DataProxy] Add event handling support for FlumeEvent type #8312

Merged
merged 3 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.MainChannelFullException;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -277,7 +278,8 @@ public void processEvent(Event event) {
}
}
if (!success) {
if (MessageUtils.isSyncSendForOrder(event)) {
if (MessageUtils.isSyncSendForOrder(event)
|| event instanceof ProxyPackEvent) {
throw new MainChannelFullException(errMsg);
}
List<Channel> optionalChannels = selector.getOptionalChannels(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,13 @@ private boolean reloadDataProxyConfig(String clusterName, String clusterTag, Str
}
httpPost.setEntity(HttpUtils.getEntity(request));
// request with post
LOG.info("Start to request {} to get config info, with params {}", url, request);
LOG.info("Start to request {} to get config info, with params: {}, headers: {}",
url, request, httpPost.getAllHeaders());
CloseableHttpResponse response = httpClient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
if (response.getStatusLine().getStatusCode() != 200) {
LOG.warn("Failed to request {}, with params {}, the response is {}",
url, request, returnStr);
LOG.warn("Failed to request {}, with params: {}, headers: {}, the response is {}",
url, request, httpPost.getAllHeaders(), returnStr);
return false;
}
LOG.info("End to request {} to get config info:{}", url, returnStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS = "msg.post.v1.success";
public static final java.lang.String EVENT_MSG_V1_POST_DROPPED = "msg.post.v1.dropped";
// sink
public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED = "sink.event.v1.malformed";
public static final java.lang.String EVENT_SINK_EVENT_V1_FILE = "sink.event.v1.file";
public static final java.lang.String EVENT_SINK_EVENT_V0_FILE = "sink.event.v1.file";
public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING = "sink.topic.missing";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING = "default.topic.empty";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED = "default.topic.used";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;

import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
Expand Down Expand Up @@ -209,8 +213,33 @@ public Status process() throws EventDeliveryException {
tx.commit();
return Status.READY;
}
// file event
if (StringUtils.isEmpty(event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER))) {
String groupId = event.getHeaders().get(EventConstants.INLONG_GROUP_ID);
String streamId = event.getHeaders().get(EventConstants.INLONG_STREAM_ID);
String msgTimeStr = event.getHeaders().get(EventConstants.HEADER_KEY_MSG_TIME);
String sourceIp = event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
String sourceTimeStr = event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_TIME);
if (groupId != null
&& streamId != null
&& msgTimeStr != null
&& sourceIp != null
&& sourceTimeStr != null) {
ProxyEvent proxyEvent = new ProxyEvent(groupId, streamId, msgTimeStr,
sourceIp, sourceTimeStr, event.getHeaders(), event.getBody());
this.dispatchManager.addEvent(proxyEvent);
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_FILE);
} else {
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_MALFORMED);
}
} else {
SimpleEvent simpleEvent = new SimpleEvent();
simpleEvent.setBody(event.getBody());
simpleEvent.setHeaders(event.getHeaders());
this.dispatchManager.addSimpleEvent(simpleEvent);
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V0_FILE);
}
tx.commit();
this.context.addSendFailMetric();
return Status.READY;
} catch (Throwable t) {
if (logCounter.shouldPrint()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;

import org.apache.commons.lang3.math.NumberUtils;

import java.util.Map;

/**
Expand All @@ -39,11 +41,11 @@ public ProxyEvent() {
/**
* Constructor
*
* @param inlongGroupId
* @param inlongStreamId
* @param body
* @param msgTime
* @param sourceIp
* @param inlongGroupId the group id
* @param inlongStreamId the stream id
* @param body the body content
* @param msgTime the message time
* @param sourceIp the source ip
*/
public ProxyEvent(String inlongGroupId, String inlongStreamId, byte[] body, long msgTime, String sourceIp) {
this.inlongGroupId = inlongGroupId;
Expand All @@ -65,9 +67,9 @@ public ProxyEvent(String inlongGroupId, String inlongStreamId, byte[] body, long
/**
* Constructor
*
* @param inlongGroupId
* @param inlongStreamId
* @param obj
* @param inlongGroupId the group id
* @param inlongStreamId the stream id
* @param obj the pb message object
*/
public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) {
this.inlongGroupId = inlongGroupId;
Expand All @@ -86,6 +88,29 @@ public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) {
this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, String.valueOf(sourceTime));
}

/**
* ReBuild ProxyEvent object
*
* @param groupId the group id
* @param streamId the stream id
* @param msgTimeStr the message time
* @param sourceIp the source ip
* @param sourceTimeStr the source time
* @param headers the rebuild headers, include required headers
* @param body the rebuild body
*/
public ProxyEvent(String groupId, String streamId, String msgTimeStr, String sourceIp,
String sourceTimeStr, Map<String, String> headers, byte[] body) {
this.inlongGroupId = groupId;
this.inlongStreamId = streamId;
this.sourceIp = sourceIp;
this.uid = InlongId.generateUid(this.inlongGroupId, this.inlongStreamId);
this.msgTime = NumberUtils.toLong(msgTimeStr, System.currentTimeMillis());
this.sourceTime = NumberUtils.toLong(sourceTimeStr, System.currentTimeMillis());
super.setBody(body);
super.setHeaders(headers);
}

/**
* get sourceTime
*
Expand Down