Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9523][Manager] Support querying all audit information based on IP address #9537

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ public List<AuditVO> list(AuditRequest request) {
return response.getData();
}

/**
* Query audit data for list by condition
*
* @param request The audit request of query condition
* @return The result of query
*/
public List<AuditVO> listAll(AuditRequest request) {
Preconditions.expectNotNull(request, "request cannot be null");
Preconditions.expectNotBlank(request.getInlongGroupId(), ErrorCodeEnum.INVALID_PARAMETER,
"inlong group id cannot be empty");
Preconditions.expectNotBlank(request.getInlongStreamId(), ErrorCodeEnum.INVALID_PARAMETER,
"inlong stream id cannot be empty");
Response<List<AuditVO>> response = ClientUtils.executeHttpCall(auditApi.listAll(request));
ClientUtils.assertRespSuccess(response);
return response.getData();
}

/**
* Refresh the base item of audit cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public interface AuditApi {
@POST("audit/list")
Call<Response<List<AuditVO>>> list(@Body AuditRequest auditRequest);

@POST("audit/listAll")
Call<Response<List<AuditVO>>> listAll(@Body AuditRequest auditRequest);

@POST("audit/refreshCache")
Call<Response<Boolean>> refreshCache();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,37 @@ public interface AuditEntityMapper {
* @param groupId The groupId of inlong
* @param streamId The streamId of inlong
* @param auditId The auditId of inlong
* @param sDate The start date
* @param eDate The end date
* @param startDate The start date
* @param endData The end date
* @param format The format such as '%Y-%m-%d %H:%i:00'
* @return The result of query
*/
List<Map<String, Object>> sumByLogTs(@Param(value = "groupId") String groupId,
@Param(value = "streamId") String streamId,
@Param(value = "auditId") String auditId,
@Param(value = "sDate") String sDate,
@Param(value = "eDate") String eDate,
@Param(value = "startDate") String startDate,
@Param(value = "endData") String endData,
@Param(value = "format") String format);

/**
* sumByLogTsAndIp
*
* @param ip ip
* @param auditId The auditId of inlong
* @param sDate The start date
* @param eDate The end date
* @param startDate The start date
* @param endData The end date
* @param format The format such as '%Y-%m-%d %H:%i:00'
* @return The result of query
*/
List<Map<String, Object>> sumByLogTsAndIp(@Param(value = "ip") String ip,
@Param(value = "auditId") String auditId,
@Param(value = "sDate") String sDate,
@Param(value = "eDate") String eDate,
@Param(value = "startDate") String startDate,
@Param(value = "endData") String endData,
@Param(value = "format") String format);

List<Map<String, Object>> sumGroupByIp(@Param(value = "groupId") String groupId,
@Param(value = "streamId") String streamId,
@Param(value = "auditId") String auditId,
@Param(value = "startDate") String startDate,
@Param(value = "endData") String endData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,28 @@
<result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
<result column="total_size" property="totalSize" jdbcType="BIGINT"/>
</resultMap>

<resultMap id="SumGroupByIdResultMap" type="java.util.Map">
<result column="inlong_group_id" property="inlongGroupId" jdbcType="VARCHAR"/>
<result column="inlong_stream_id" property="inlongStreamId" jdbcType="VARCHAR"/>
<result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
<result column="ip" property="ip" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
</resultMap>

<select id="sumByLogTs" resultMap="SumByLogTsResultMap">
select inlong_group_id, inlong_stream_id, date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size
from (
select distinct ip, docker_id, thread_id, sdk_ts, packet_id, log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
from apache_inlong_audit.audit_data
where inlong_group_id = #{groupId,jdbcType=VARCHAR}
and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
and audit_id = #{auditId,jdbcType=VARCHAR}
and log_ts &gt;= #{sDate, jdbcType=VARCHAR}
and log_ts &lt; #{eDate, jdbcType=VARCHAR}
) as sub
select distinct ip, docker_id, thread_id, sdk_ts, packet_id, log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
from apache_inlong_audit.audit_data
where inlong_group_id = #{groupId,jdbcType=VARCHAR}
and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
and audit_id = #{auditId,jdbcType=VARCHAR}
and log_ts &gt;= #{startDate, jdbcType=VARCHAR}
and log_ts &lt; #{endData, jdbcType=VARCHAR}
) as sub
group by log_ts, inlong_group_id, inlong_stream_id
order by log_ts
</select>
Expand All @@ -66,10 +74,30 @@
from apache_inlong_audit.audit_data
where ip = #{ip,jdbcType=VARCHAR}
and audit_id = #{auditId,jdbcType=VARCHAR}
and log_ts &gt;= #{sDate, jdbcType=VARCHAR}
and log_ts &lt; #{eDate, jdbcType=VARCHAR}
and log_ts &gt;= #{startDate, jdbcType=VARCHAR}
and log_ts &lt; #{endData, jdbcType=VARCHAR}
) as sub
group by log_ts, inlong_group_id, inlong_stream_id
order by log_ts
</select>
<select id="sumGroupByIp" resultMap="SumGroupByIdResultMap">
select inlong_group_id, inlong_stream_id, ip, sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size
from (
select distinct ip, docker_id, thread_id, sdk_ts, packet_id, log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
from apache_inlong_audit.audit_data
<where>
audit_id = #{auditId,jdbcType=VARCHAR}
and log_ts &gt;= #{startDate, jdbcType=VARCHAR}
and log_ts &lt; #{endData, jdbcType=VARCHAR}
<if test="ip != null and ip != ''">
and ip = #{ip,jdbcType=VARCHAR}
</if>
<if test="ip == null or ip == ''">
and inlong_group_id = #{groupId,jdbcType=VARCHAR}
and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
</if>
</where>
) as sub
group by inlong_group_id, inlong_stream_id, ip
</select>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class AuditInfo {
@ApiModelProperty(value = "inlong stream id")
private String inlongStreamId;

@ApiModelProperty(value = "ip")
private String ip;

@ApiModelProperty(value = "Audit log timestamp")
private String logTs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public interface AuditService {
*/
List<AuditVO> listByCondition(AuditRequest request) throws Exception;

/**
* Query audit data for list by condition
*
* @param request The audit request of query condition
* @return The result of query
*/
List<AuditVO> listAll(AuditRequest request) throws Exception;

List<AuditBaseResponse> getAuditBases();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
}
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
try (Connection connection = config.getCkConnection();
PreparedStatement statement = getAuditCkStatement(connection, groupId, streamId,
PreparedStatement statement = getAuditCkStatementGroupByLogTs(connection, groupId, streamId,
request.getIp(), auditId,
request.getStartDate(),
request.getEndDate());
Expand All @@ -358,6 +358,60 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
return aggregateByTimeDim(result, request.getTimeStaticsDim());
}

@Override
public List<AuditVO> listAll(AuditRequest request) throws Exception {
List<AuditVO> result = new ArrayList<>();
AuditQuerySource querySource = AuditQuerySource.valueOf(auditQuerySource);
for (String auditId : request.getAuditIds()) {
AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
String auditName = "";
if (auditBaseEntity != null) {
auditName = auditBaseEntity.getName();
}
if (AuditQuerySource.MYSQL == querySource) {
// Support min agg at now
DateTime endDate = DAY_DATE_FORMATTER.parseDateTime(request.getEndDate());
String endDateStr = endDate.plusDays(1).toString(DAY_DATE_FORMATTER);
List<Map<String, Object>> sumList = auditEntityMapper.sumGroupByIp(
request.getInlongGroupId(), request.getInlongStreamId(), auditId, request.getStartDate(),
endDateStr);
List<AuditInfo> auditSet = sumList.stream().map(s -> {
AuditInfo vo = new AuditInfo();
vo.setInlongGroupId((String) s.get("inlongGroupId"));
vo.setInlongStreamId((String) s.get("inlongStreamId"));
vo.setLogTs((String) s.get("logTs"));
vo.setIp((String) s.get("ip"));
vo.setCount(((BigDecimal) s.get("total")).longValue());
vo.setDelay(((BigDecimal) s.get("totalDelay")).longValue());
vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
return vo;
}).collect(Collectors.toList());
result.add(new AuditVO(auditId, auditName, auditSet, null));
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
try (Connection connection = config.getCkConnection();
PreparedStatement statement = getAuditCkStatementGroupByIp(connection,
request.getInlongGroupId(), request.getInlongStreamId(), request.getIp(), auditId,
request.getStartDate(), request.getEndDate());

ResultSet resultSet = statement.executeQuery()) {
List<AuditInfo> auditSet = new ArrayList<>();
while (resultSet.next()) {
AuditInfo vo = new AuditInfo();
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
vo.setIp(resultSet.getString("ip"));
vo.setCount(resultSet.getLong("total"));
vo.setDelay(resultSet.getLong("total_delay"));
vo.setSize(resultSet.getLong("total_size"));
auditSet.add(vo);
}
result.add(new AuditVO(auditId, auditName, auditSet, null));
}
}
}
return result;
}

@Override
public List<AuditBaseResponse> getAuditBases() {
List<AuditBaseEntity> auditBaseEntityList = auditBaseMapper.selectAll();
Expand Down Expand Up @@ -430,7 +484,8 @@ private SearchRequest toAuditSearchRequest(String index, String groupId, String
* @param endDate The en datetime of request
* @return The clickhouse Statement
*/
private PreparedStatement getAuditCkStatement(Connection connection, String groupId, String streamId, String ip,
private PreparedStatement getAuditCkStatementGroupByLogTs(Connection connection, String groupId, String streamId,
String ip,
String auditId, String startDate, String endDate) throws SQLException {
String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
Expand Down Expand Up @@ -466,6 +521,39 @@ private PreparedStatement getAuditCkStatement(Connection connection, String grou
return statement;
}

private PreparedStatement getAuditCkStatementGroupByIp(Connection connection, String groupId,
String streamId, String ip, String auditId, String startDate, String endDate) throws SQLException {

if (StringUtils.isNotBlank(ip)) {
return getAuditCkStatementByIpGroupByIp(connection, auditId, ip, startDate, endDate);
}
// Query results are duplicated according to all fields.
String subQuery = new SQL()
.SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id",
"inlong_stream_id", "audit_id", "count", "size", "delay")
.FROM("audit_data")
.WHERE("inlong_group_id = ?")
.WHERE("inlong_stream_id = ?")
.WHERE("audit_id = ?")
.WHERE("log_ts >= ?")
.WHERE("log_ts < ?")
.toString();

String sql = new SQL()
.SELECT("inlong_group_id", "inlong_stream_id", "sum(count) as total", "ip",
"sum(delay) as total_delay", "sum(size) as total_size")
.FROM("(" + subQuery + ") as sub")
.GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
.toString();
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, groupId);
statement.setString(2, streamId);
statement.setString(3, auditId);
statement.setString(4, startDate);
statement.setString(5, endDate);
return statement;
}

/**
* Aggregate by time dim
*/
Expand Down Expand Up @@ -578,4 +666,30 @@ private PreparedStatement getAuditCkStatementByIp(Connection connection, String
return statement;
}

private PreparedStatement getAuditCkStatementByIpGroupByIp(Connection connection, String auditId, String ip,
String startDate, String endDate) throws SQLException {
String subQuery = new SQL()
.SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id",
"inlong_stream_id", "audit_id", "count", "size", "delay")
.FROM("audit_data")
.WHERE("ip = ?")
.WHERE("audit_id = ?")
.WHERE("log_ts >= ?")
.WHERE("log_ts < ?")
.toString();

String sql = new SQL()
.SELECT("inlong_group_id", "inlong_stream_id", "ip", "sum(count) as total",
"sum(delay) as total_delay", "sum(size) as total_size")
.FROM("(" + subQuery + ") as sub")
.GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
.toString();
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, ip);
statement.setString(2, auditId);
statement.setString(3, startDate);
statement.setString(4, endDate);
return statement;
}

}