Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BanyanDB: Support update the Schema when OAP starting. #12808

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/skywalking.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ jobs:
- name: Log ES 7.17.10
config: test/e2e-v2/cases/log/es/e2e.yaml
env: ES_VERSION=7.17.10
- name: Log ES 8.8.1 Shardng
- name: Log ES 8.8.1 Sharding
config: test/e2e-v2/cases/log/es/es-sharding/e2e.yaml
env: ES_VERSION=8.8.1
- name: Log BanyanDB
Expand Down
2 changes: 2 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
* Bump up netty to 4.1.115, grpc to 1.68.1, boringssl to 2.0.69.
* BanyanDB: Support update the Group settings when OAP starting.
* BanyanDB: Introduce index mode and refactor banyandb group settings.
* BanyanDB: Support update the Schema when OAP starting.
wu-sheng marked this conversation as resolved.
Show resolved Hide resolved
* BanyanDB: Speed up OAP booting while initializing BanyanDB.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueCol
// 2) additional conditions are all group by tags
if (CollectionUtils.isEmpty(additionalConditions) ||
additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet())
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNames()))) {
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNamesList()))) {
return serverSideTopN(condition, schema, spec, timestampRange, additionalConditions);
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,17 @@

package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import io.grpc.Status;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.Singular;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Metadata;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.ResourceOpts;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.CompressionMethod;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.EncodingMethod;
Expand All @@ -50,11 +42,7 @@
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
Expand All @@ -77,7 +65,6 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -90,9 +77,6 @@
public enum MetadataRegistry {
INSTANCE;

private static final ObjectMapper MAPPER = new ObjectMapper();
// BanyanDB group setting aligned with the OAP settings
private static final Set<String> GROUP_ALIGNED = new HashSet<>();
private final Map<String, Schema> registry = new HashMap<>();

public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig config, DownSamplingConfigService configService) {
Expand Down Expand Up @@ -187,13 +171,13 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf
schemaBuilder.field(field.getName());
}
// parse TopN
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name()));
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.group, schemaMetadata.name()));

registry.put(schemaMetadata.name(), schemaBuilder.build());
return new MeasureModel(builder.build(), indexRules);
}

private TopNSpec parseTopNSpec(final Model model, final String measureName)
private TopNAggregation parseTopNSpec(final Model model, final String group, final String measureName)
throws StorageException {
if (model.getBanyanDBModelExtension().getTopN() == null) {
return null;
Expand All @@ -208,14 +192,16 @@ private TopNSpec parseTopNSpec(final Model model, final String measureName)
if (CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())) {
throw new StorageException("invalid groupBy tags: " + model.getBanyanDBModelExtension().getTopN().getGroupByTagNames());
}
return TopNSpec.builder()
.name(measureName + "_topn")
.lruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
.countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
.fieldName(valueColumnOpt.get().getValueCName())
.groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
.sort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN
.build();
return TopNAggregation.newBuilder()
.setMetadata(
Metadata.newBuilder().setGroup(group).setName(Schema.formatTopNName(measureName)))
.setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName))
.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN
.setFieldName(valueColumnOpt.get().getValueCName())
.addAllGroupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
.setCountersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
.setLruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
.build();
}

public Schema findMetadata(final Model model) {
Expand Down Expand Up @@ -306,8 +292,8 @@ Duration downSamplingDuration(DownSampling downSampling) {

IndexRule indexRule(String group, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) {
IndexRule.Builder builder = IndexRule.newBuilder()
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
if (analyzer != null) {
switch (analyzer) {
case KEYWORD:
Expand Down Expand Up @@ -553,36 +539,6 @@ static String formatName(String modelName, DownSampling downSampling) {
return modelName + "_" + downSampling.getName();
}

public Optional<Object> findRemoteSchema(BanyanDBClient client) throws BanyanDBException {
try {
switch (kind) {
case STREAM:
return Optional.ofNullable(client.findStream(this.group, this.name()));
case MEASURE:
return Optional.ofNullable(client.findMeasure(this.group, this.name()));
default:
throw new IllegalStateException("should not reach here");
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
return Optional.empty();
}

throw ex;
}
}

public MetadataCache.EntityMetadata updateRemoteSchema(BanyanDBClient client) throws BanyanDBException {
switch (kind) {
case STREAM:
return client.updateStreamMetadataCacheFromSever(this.group, this.name());
case MEASURE:
return client.updateMeasureMetadataCacheFromSever(this.group, this.name());
default:
throw new IllegalStateException("should not reach here");
}
}

private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList, boolean shouldAddID) {
final String indexFamily = SchemaMetadata.this.indexFamily();
final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
Expand All @@ -603,95 +559,6 @@ private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataLi
return tagFamilySpecs;
}

/**
* Check if the group settings need to be updated
*/
private boolean checkGroupUpdate(BanyanDBClient client) throws BanyanDBException {
Group g = client.findGroup(this.group);
return g.getResourceOpts().getShardNum() != this.shard
|| g.getResourceOpts().getSegmentInterval().getNum() != this.segmentIntervalDays
|| g.getResourceOpts().getTtl().getNum() != this.ttlDays;
}

public boolean checkResourceExistence(BanyanDBClient client) throws BanyanDBException {
ResourceExist resourceExist;
Group.Builder gBuilder
= Group.newBuilder()
.setMetadata(Metadata.newBuilder().setName(this.group))
.setResourceOpts(ResourceOpts.newBuilder()
.setShardNum(this.shard)
.setSegmentInterval(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
this.segmentIntervalDays))
.setTtl(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
this.ttlDays)));
switch (kind) {
case STREAM:
resourceExist = client.existStream(this.group, this.name());
gBuilder.setCatalog(Catalog.CATALOG_STREAM).build();
if (!GROUP_ALIGNED.contains(this.group)) {
// create the group if not exist
if (!resourceExist.hasGroup()) {
try {
Group g = client.define(gBuilder.build());
if (g != null) {
log.info("group {} created", g.getMetadata().getName());
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("group {} already created by another OAP node", this.group);
} else {
throw ex;
}
}
} else {
// update the group if necessary
if (this.checkGroupUpdate(client)) {
client.update(gBuilder.build());
log.info("group {} updated", this.group);
}
}
// mark the group as aligned
GROUP_ALIGNED.add(this.group);
}
return resourceExist.hasResource();
case MEASURE:
resourceExist = client.existMeasure(this.group, this.name());
gBuilder.setCatalog(Catalog.CATALOG_MEASURE).build();
if (!GROUP_ALIGNED.contains(this.group)) {
if (!resourceExist.hasGroup()) {
try {
Group g = client.define(gBuilder.build());
if (g != null) {
log.info("group {} created", g.getMetadata().getName());
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("group {} already created by another OAP node", this.group);
} else {
throw ex;
}
}
} else {
if (this.checkGroupUpdate(client)) {
client.update(gBuilder.build());
log.info("group {} updated", this.group);
}
}
}
return resourceExist.hasResource();
default:
throw new IllegalStateException("should not reach here");
}
}

/**
* @return name of the Stream/Measure in the BanyanDB
*/
Expand Down Expand Up @@ -761,52 +628,17 @@ public static class Schema {

@Getter
@Nullable
private final TopNSpec topNSpec;
private final TopNAggregation topNSpec;

public ColumnSpec getSpec(String columnName) {
return this.specs.get(columnName);
}

public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
if (this.getTopNSpec() == null) {
if (this.metadata.kind == Kind.MEASURE) {
log.debug("skip null TopN Schema for [{}]", metadata.getModelName());
}
return;
}
TopNAggregation.Builder builder
= TopNAggregation.newBuilder()
.setMetadata(Metadata.newBuilder()
.setGroup(getMetadata().getGroup())
.setName(this.getTopNSpec().getName()))

.setSourceMeasure(Metadata.newBuilder()
.setGroup(getMetadata().getGroup())
.setName(getMetadata().name()))
.setFieldValueSort(this.getTopNSpec().getSort())
.setFieldName(this.getTopNSpec().getFieldName())
.addAllGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
.setCountersNumber(this.getTopNSpec().getCountersNumber())
.setLruSize(this.getTopNSpec().getLruSize());
client.define(builder.build());
log.info("installed TopN schema for measure {}", getMetadata().name());
public static String formatTopNName(String measureName) {
return measureName + "_topn";
}
}

@Builder
@EqualsAndHashCode
@Getter
@ToString
public static class TopNSpec {
private final String name;
@Singular
private final List<String> groupByTagNames;
private final String fieldName;
private final BanyandbModel.Sort sort;
private final int lruSize;
private final int countersNumber;
}

@RequiredArgsConstructor
@Getter
@ToString
Expand All @@ -818,11 +650,4 @@ public static class ColumnSpec {
public enum ColumnType {
TAG, FIELD;
}

@Getter
@Setter
@NoArgsConstructor
public static class GroupSetting {
private int segmentIntervalDays;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ public List<EBPFProfilingSchedule> querySchedules(String taskId) throws IOExcept
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EBPFProfilingScheduleRecord.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
}
});
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
}
}
);

return resp.getDataPoints().stream().map(this::buildEBPFProfilingSchedule).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ protected void apply(MeasureQuery query) {
query.limit(page.getLimit());
query.offset(page.getFrom());
if (queryOrder == Order.ASC) {
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
query.setOrderBy(
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
} else {
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
query.setOrderBy(
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
}
for (final EventQueryCondition condition : conditionList) {
List<PairQueryCondition<?>> queryConditions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;

import com.google.gson.Gson;
import java.util.Objects;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
Expand Down Expand Up @@ -183,9 +184,10 @@ private TopNQueryResponse topNQuery(MetadataRegistry.Schema schema,
AbstractQuery.Sort sort,
List<KeyValue> additionalConditions,
List<AttrCondition> attributes) throws IOException {
final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), schema.getTopNSpec().getName(),
timestampRange,
number, sort);
final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), Objects.requireNonNull(
schema.getTopNSpec()).getMetadata().getName(),
timestampRange,
number, sort);
q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
List<PairQueryCondition<?>> conditions = new ArrayList<>();
if (CollectionUtils.isNotEmpty(additionalConditions)) {
Expand Down
Loading
Loading