Skip to content

Commit

Permalink
Merge branch 'master' into fix/async-profiler-time-stamp
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng authored Dec 2, 2024
2 parents 1d1c74d + eb4d38c commit 46d10eb
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 218 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/skywalking.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ jobs:
- name: Log ES 7.17.10
config: test/e2e-v2/cases/log/es/e2e.yaml
env: ES_VERSION=7.17.10
- name: Log ES 8.8.1 Shardng
- name: Log ES 8.8.1 Sharding
config: test/e2e-v2/cases/log/es/es-sharding/e2e.yaml
env: ES_VERSION=8.8.1
- name: Log BanyanDB
Expand Down
2 changes: 2 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
* Bump up netty to 4.1.115, grpc to 1.68.1, boringssl to 2.0.69.
* BanyanDB: Support update the Group settings when OAP starting.
* BanyanDB: Introduce index mode and refactor banyandb group settings.
* BanyanDB: Support update the Schema when OAP starting.
* BanyanDB: Speed up OAP booting while initializing BanyanDB.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueCol
// 2) additional conditions are all group by tags
if (CollectionUtils.isEmpty(additionalConditions) ||
additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet())
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNames()))) {
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNamesList()))) {
return serverSideTopN(condition, schema, spec, timestampRange, additionalConditions);
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,17 @@

package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import io.grpc.Status;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.Singular;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Metadata;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.ResourceOpts;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.CompressionMethod;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.EncodingMethod;
Expand All @@ -50,11 +42,7 @@
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
Expand All @@ -77,7 +65,6 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -90,9 +77,6 @@
public enum MetadataRegistry {
INSTANCE;

private static final ObjectMapper MAPPER = new ObjectMapper();
// BanyanDB group setting aligned with the OAP settings
private static final Set<String> GROUP_ALIGNED = new HashSet<>();
private final Map<String, Schema> registry = new HashMap<>();

public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig config, DownSamplingConfigService configService) {
Expand Down Expand Up @@ -187,13 +171,13 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf
schemaBuilder.field(field.getName());
}
// parse TopN
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name()));
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.group, schemaMetadata.name()));

registry.put(schemaMetadata.name(), schemaBuilder.build());
return new MeasureModel(builder.build(), indexRules);
}

private TopNSpec parseTopNSpec(final Model model, final String measureName)
private TopNAggregation parseTopNSpec(final Model model, final String group, final String measureName)
throws StorageException {
if (model.getBanyanDBModelExtension().getTopN() == null) {
return null;
Expand All @@ -208,14 +192,16 @@ private TopNSpec parseTopNSpec(final Model model, final String measureName)
if (CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())) {
throw new StorageException("invalid groupBy tags: " + model.getBanyanDBModelExtension().getTopN().getGroupByTagNames());
}
return TopNSpec.builder()
.name(measureName + "_topn")
.lruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
.countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
.fieldName(valueColumnOpt.get().getValueCName())
.groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
.sort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN
.build();
return TopNAggregation.newBuilder()
.setMetadata(
Metadata.newBuilder().setGroup(group).setName(Schema.formatTopNName(measureName)))
.setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName))
.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN
.setFieldName(valueColumnOpt.get().getValueCName())
.addAllGroupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
.setCountersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
.setLruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
.build();
}

public Schema findMetadata(final Model model) {
Expand Down Expand Up @@ -306,8 +292,8 @@ Duration downSamplingDuration(DownSampling downSampling) {

IndexRule indexRule(String group, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) {
IndexRule.Builder builder = IndexRule.newBuilder()
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
if (analyzer != null) {
switch (analyzer) {
case KEYWORD:
Expand Down Expand Up @@ -553,36 +539,6 @@ static String formatName(String modelName, DownSampling downSampling) {
return modelName + "_" + downSampling.getName();
}

public Optional<Object> findRemoteSchema(BanyanDBClient client) throws BanyanDBException {
try {
switch (kind) {
case STREAM:
return Optional.ofNullable(client.findStream(this.group, this.name()));
case MEASURE:
return Optional.ofNullable(client.findMeasure(this.group, this.name()));
default:
throw new IllegalStateException("should not reach here");
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
return Optional.empty();
}

throw ex;
}
}

public MetadataCache.EntityMetadata updateRemoteSchema(BanyanDBClient client) throws BanyanDBException {
switch (kind) {
case STREAM:
return client.updateStreamMetadataCacheFromSever(this.group, this.name());
case MEASURE:
return client.updateMeasureMetadataCacheFromSever(this.group, this.name());
default:
throw new IllegalStateException("should not reach here");
}
}

private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList, boolean shouldAddID) {
final String indexFamily = SchemaMetadata.this.indexFamily();
final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
Expand All @@ -603,95 +559,6 @@ private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataLi
return tagFamilySpecs;
}

/**
* Check if the group settings need to be updated
*/
private boolean checkGroupUpdate(BanyanDBClient client) throws BanyanDBException {
Group g = client.findGroup(this.group);
return g.getResourceOpts().getShardNum() != this.shard
|| g.getResourceOpts().getSegmentInterval().getNum() != this.segmentIntervalDays
|| g.getResourceOpts().getTtl().getNum() != this.ttlDays;
}

public boolean checkResourceExistence(BanyanDBClient client) throws BanyanDBException {
ResourceExist resourceExist;
Group.Builder gBuilder
= Group.newBuilder()
.setMetadata(Metadata.newBuilder().setName(this.group))
.setResourceOpts(ResourceOpts.newBuilder()
.setShardNum(this.shard)
.setSegmentInterval(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
this.segmentIntervalDays))
.setTtl(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
this.ttlDays)));
switch (kind) {
case STREAM:
resourceExist = client.existStream(this.group, this.name());
gBuilder.setCatalog(Catalog.CATALOG_STREAM).build();
if (!GROUP_ALIGNED.contains(this.group)) {
// create the group if not exist
if (!resourceExist.hasGroup()) {
try {
Group g = client.define(gBuilder.build());
if (g != null) {
log.info("group {} created", g.getMetadata().getName());
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("group {} already created by another OAP node", this.group);
} else {
throw ex;
}
}
} else {
// update the group if necessary
if (this.checkGroupUpdate(client)) {
client.update(gBuilder.build());
log.info("group {} updated", this.group);
}
}
// mark the group as aligned
GROUP_ALIGNED.add(this.group);
}
return resourceExist.hasResource();
case MEASURE:
resourceExist = client.existMeasure(this.group, this.name());
gBuilder.setCatalog(Catalog.CATALOG_MEASURE).build();
if (!GROUP_ALIGNED.contains(this.group)) {
if (!resourceExist.hasGroup()) {
try {
Group g = client.define(gBuilder.build());
if (g != null) {
log.info("group {} created", g.getMetadata().getName());
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("group {} already created by another OAP node", this.group);
} else {
throw ex;
}
}
} else {
if (this.checkGroupUpdate(client)) {
client.update(gBuilder.build());
log.info("group {} updated", this.group);
}
}
}
return resourceExist.hasResource();
default:
throw new IllegalStateException("should not reach here");
}
}

/**
* @return name of the Stream/Measure in the BanyanDB
*/
Expand Down Expand Up @@ -761,52 +628,17 @@ public static class Schema {

@Getter
@Nullable
private final TopNSpec topNSpec;
private final TopNAggregation topNSpec;

public ColumnSpec getSpec(String columnName) {
return this.specs.get(columnName);
}

public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
if (this.getTopNSpec() == null) {
if (this.metadata.kind == Kind.MEASURE) {
log.debug("skip null TopN Schema for [{}]", metadata.getModelName());
}
return;
}
TopNAggregation.Builder builder
= TopNAggregation.newBuilder()
.setMetadata(Metadata.newBuilder()
.setGroup(getMetadata().getGroup())
.setName(this.getTopNSpec().getName()))

.setSourceMeasure(Metadata.newBuilder()
.setGroup(getMetadata().getGroup())
.setName(getMetadata().name()))
.setFieldValueSort(this.getTopNSpec().getSort())
.setFieldName(this.getTopNSpec().getFieldName())
.addAllGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
.setCountersNumber(this.getTopNSpec().getCountersNumber())
.setLruSize(this.getTopNSpec().getLruSize());
client.define(builder.build());
log.info("installed TopN schema for measure {}", getMetadata().name());
public static String formatTopNName(String measureName) {
return measureName + "_topn";
}
}

@Builder
@EqualsAndHashCode
@Getter
@ToString
public static class TopNSpec {
private final String name;
@Singular
private final List<String> groupByTagNames;
private final String fieldName;
private final BanyandbModel.Sort sort;
private final int lruSize;
private final int countersNumber;
}

@RequiredArgsConstructor
@Getter
@ToString
Expand All @@ -818,11 +650,4 @@ public static class ColumnSpec {
public enum ColumnType {
TAG, FIELD;
}

@Getter
@Setter
@NoArgsConstructor
public static class GroupSetting {
private int segmentIntervalDays;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ public List<EBPFProfilingSchedule> querySchedules(String taskId) throws IOExcept
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EBPFProfilingScheduleRecord.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
}
});
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
}
}
);

return resp.getDataPoints().stream().map(this::buildEBPFProfilingSchedule).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ protected void apply(MeasureQuery query) {
query.limit(page.getLimit());
query.offset(page.getFrom());
if (queryOrder == Order.ASC) {
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
query.setOrderBy(
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
} else {
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
query.setOrderBy(
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
}
for (final EventQueryCondition condition : conditionList) {
List<PairQueryCondition<?>> queryConditions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;

import com.google.gson.Gson;
import java.util.Objects;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
Expand Down Expand Up @@ -183,9 +184,10 @@ private TopNQueryResponse topNQuery(MetadataRegistry.Schema schema,
AbstractQuery.Sort sort,
List<KeyValue> additionalConditions,
List<AttrCondition> attributes) throws IOException {
final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), schema.getTopNSpec().getName(),
timestampRange,
number, sort);
final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), Objects.requireNonNull(
schema.getTopNSpec()).getMetadata().getName(),
timestampRange,
number, sort);
q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
List<PairQueryCondition<?>> conditions = new ArrayList<>();
if (CollectionUtils.isNotEmpty(additionalConditions)) {
Expand Down
Loading

0 comments on commit 46d10eb

Please sign in to comment.