From eb4d38c0bb6e31affa9ec0fb3a49709b0fc5e2b7 Mon Sep 17 00:00:00 2001 From: Wan Kai Date: Mon, 2 Dec 2024 12:49:20 +0800 Subject: [PATCH] BanyanDB: Support update the Schema when OAP starting. (#12808) --- .github/workflows/skywalking.yaml | 2 +- docs/en/changes/changes.md | 2 + .../banyandb/BanyanDBAggregationQueryDAO.java | 2 +- .../banyandb/BanyanDBIndexInstaller.java | 434 +++++++++++++++++- .../plugin/banyandb/MetadataRegistry.java | 209 +-------- ...BanyanDBEBPFProfilingScheduleQueryDAO.java | 15 +- .../measure/BanyanDBEventQueryDAO.java | 6 +- .../banyandb/stream/AbstractBanyanDBDAO.java | 8 +- .../ebpf/access_log/banyandb/e2e.yaml | 2 +- 9 files changed, 462 insertions(+), 218 deletions(-) diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml index 8edb000dc007..3e1e66e40ec1 100644 --- a/.github/workflows/skywalking.yaml +++ b/.github/workflows/skywalking.yaml @@ -436,7 +436,7 @@ jobs: - name: Log ES 7.17.10 config: test/e2e-v2/cases/log/es/e2e.yaml env: ES_VERSION=7.17.10 - - name: Log ES 8.8.1 Shardng + - name: Log ES 8.8.1 Sharding config: test/e2e-v2/cases/log/es/es-sharding/e2e.yaml env: ES_VERSION=8.8.1 - name: Log BanyanDB diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 9d021e01fccc..4dda6f327cc0 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -30,6 +30,8 @@ * Bump up netty to 4.1.115, grpc to 1.68.1, boringssl to 2.0.69. * BanyanDB: Support update the Group settings when OAP starting. * BanyanDB: Introduce index mode and refactor banyandb group settings. +* BanyanDB: Support update the Schema when OAP starting. +* BanyanDB: Speed up OAP booting while initializing BanyanDB. #### UI diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java index 5f524006303f..9c8c97f723b8 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java @@ -70,7 +70,7 @@ public List sortMetrics(TopNCondition condition, String valueCol // 2) additional conditions are all group by tags if (CollectionUtils.isEmpty(additionalConditions) || additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet()) - .equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNames()))) { + .equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNamesList()))) { return serverSideTopN(condition, schema, spec, timestampRange, additionalConditions); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index 74036fcf3384..0b3cad0463ff 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -19,13 +19,28 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import io.grpc.Status; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; +import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group; +import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule; +import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream; +import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule; +import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding; +import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation; import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache; +import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.RunningMode; import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -36,6 +51,9 @@ @Slf4j public class BanyanDBIndexInstaller extends ModelInstaller { + // BanyanDB group setting aligned with the OAP settings + private static final Set GROUP_ALIGNED = new HashSet<>(); + private static final Map> GROUP_INDEX_RULES = new HashMap<>(); private final BanyanDBStorageConfig config; public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, BanyanDBStorageConfig config) { @@ -56,18 +74,37 @@ public boolean isExists(Model model) throws StorageException { try { final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client; // first check resource existence and create group if necessary - final boolean resourceExist = metadata.checkResourceExistence(c); + final boolean resourceExist = checkResourceExistence(metadata, c); if (!resourceExist) { return false; } else { // register models only locally(Schema cache) but not remotely if (model.isRecord()) { // stream - MetadataRegistry.INSTANCE.registerStreamModel(model, config, downSamplingConfigService); + StreamModel streamModel = MetadataRegistry.INSTANCE.registerStreamModel( + model, config, downSamplingConfigService); + if (!RunningMode.isNoInitMode()) { + checkStream(streamModel.getStream(), c); + checkIndexRules(model.getName(), streamModel.getIndexRules(), c); + checkIndexRuleBinding( + streamModel.getIndexRules(), metadata.getGroup(), metadata.name(), + BanyandbCommon.Catalog.CATALOG_STREAM, c + ); + // Stream not support server side TopN pre-aggregation + } } else { // measure - MetadataRegistry.INSTANCE.registerMeasureModel(model, config, downSamplingConfigService); + MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, downSamplingConfigService); + if (!RunningMode.isNoInitMode()) { + checkMeasure(measureModel.getMeasure(), c); + checkIndexRules(model.getName(), measureModel.getIndexRules(), c); + checkIndexRuleBinding( + measureModel.getIndexRules(), metadata.getGroup(), metadata.name(), + BanyandbCommon.Catalog.CATALOG_MEASURE, c + ); + checkTopNAggregation(model, c); + } } // pre-load remote schema for java client - MetadataCache.EntityMetadata remoteMeta = metadata.updateRemoteSchema(c); + MetadataCache.EntityMetadata remoteMeta = updateSchemaFromServer(metadata, c); if (remoteMeta == null) { throw new IllegalStateException("inconsistent state: metadata:" + metadata + ", remoteMeta: null"); } @@ -91,10 +128,15 @@ public void createTable(Model model) throws StorageException { log.info("install stream schema {}", model.getName()); final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client; try { + client.define(stream); if (CollectionUtils.isNotEmpty(streamModel.getIndexRules())) { - client.define(stream, streamModel.getIndexRules()); - } else { - client.define(stream); + for (IndexRule indexRule : streamModel.getIndexRules()) { + defineIndexRule(model.getName(), indexRule, client); + } + defineIndexRuleBinding( + streamModel.getIndexRules(), stream.getMetadata().getGroup(), stream.getMetadata().getName(), + BanyandbCommon.Catalog.CATALOG_STREAM, client + ); } } catch (BanyanDBException ex) { if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { @@ -115,10 +157,15 @@ public void createTable(Model model) throws StorageException { log.info("install measure schema {}", model.getName()); final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client; try { + client.define(measure); if (CollectionUtils.isNotEmpty(measureModel.getIndexRules())) { - client.define(measure, measureModel.getIndexRules()); - } else { - client.define(measure); + for (IndexRule indexRule : measureModel.getIndexRules()) { + defineIndexRule(model.getName(), indexRule, client); + } + defineIndexRuleBinding( + measureModel.getIndexRules(), measure.getMetadata().getGroup(), measure.getMetadata().getName(), + BanyandbCommon.Catalog.CATALOG_MEASURE, client + ); } } catch (BanyanDBException ex) { if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { @@ -131,7 +178,7 @@ public void createTable(Model model) throws StorageException { } final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); try { - schema.installTopNAggregation(client); + defineTopNAggregation(schema, client); } catch (BanyanDBException ex) { if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { log.info("Measure schema {}_{} TopN({}) already created by another OAP node", @@ -148,4 +195,369 @@ public void createTable(Model model) throws StorageException { throw new StorageException("fail to create schema " + model.getName(), ex); } } + + /** + * Check if the group settings need to be updated + */ + private boolean checkGroup(MetadataRegistry.SchemaMetadata metadata, BanyanDBClient client) throws BanyanDBException { + Group g = client.findGroup(metadata.getGroup()); + return g.getResourceOpts().getShardNum() != metadata.getShard() + || g.getResourceOpts().getSegmentInterval().getNum() != metadata.getSegmentIntervalDays() + || g.getResourceOpts().getTtl().getNum() != metadata.getTtlDays(); + } + + private boolean checkResourceExistence(MetadataRegistry.SchemaMetadata metadata, + BanyanDBClient client) throws BanyanDBException { + ResourceExist resourceExist; + Group.Builder gBuilder + = Group.newBuilder() + .setMetadata(BanyandbCommon.Metadata.newBuilder().setName(metadata.getGroup())) + .setResourceOpts(BanyandbCommon.ResourceOpts.newBuilder() + .setShardNum(metadata.getShard()) + .setSegmentInterval( + IntervalRule.newBuilder() + .setUnit( + IntervalRule.Unit.UNIT_DAY) + .setNum( + metadata.getSegmentIntervalDays())) + .setTtl( + IntervalRule.newBuilder() + .setUnit( + IntervalRule.Unit.UNIT_DAY) + .setNum( + metadata.getTtlDays()))); + switch (metadata.getKind()) { + case STREAM: + resourceExist = client.existStream(metadata.getGroup(), metadata.name()); + gBuilder.setCatalog(BanyandbCommon.Catalog.CATALOG_STREAM).build(); + break; + case MEASURE: + resourceExist = client.existMeasure(metadata.getGroup(), metadata.name()); + gBuilder.setCatalog(BanyandbCommon.Catalog.CATALOG_MEASURE).build(); + break; + default: + throw new IllegalStateException("unknown metadata kind: " + metadata.getKind()); + } + if (!RunningMode.isNoInitMode()) { + if (!GROUP_ALIGNED.contains(metadata.getGroup())) { + // create the group if not exist + if (!resourceExist.hasGroup()) { + try { + Group g = client.define(gBuilder.build()); + if (g != null) { + log.info("group {} created", g.getMetadata().getName()); + } + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("group {} already created by another OAP node", metadata.getGroup()); + } else { + throw ex; + } + } + } else { + // update the group if necessary + if (this.checkGroup(metadata, client)) { + client.update(gBuilder.build()); + log.info("group {} updated", metadata.getGroup()); + } + } + // mark the group as aligned + GROUP_ALIGNED.add(metadata.getGroup()); + } + } + return resourceExist.hasResource(); + } + + /** + * Update the schema from the banyanDB server side for the java client cache + */ + private MetadataCache.EntityMetadata updateSchemaFromServer(MetadataRegistry.SchemaMetadata metadata, BanyanDBClient client) throws BanyanDBException { + switch (metadata.getKind()) { + case STREAM: + return client.updateStreamMetadataCacheFromSever(metadata.getGroup(), metadata.name()); + case MEASURE: + return client.updateMeasureMetadataCacheFromSever(metadata.getGroup(), metadata.name()); + default: + throw new IllegalStateException("unknown metadata kind: " + metadata.getKind()); + } + } + + private void defineTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClient client) throws BanyanDBException { + if (schema.getTopNSpec() == null) { + if (schema.getMetadata().getKind() == MetadataRegistry.Kind.MEASURE) { + log.debug("skip null TopN Schema for [{}]", schema.getMetadata().name()); + } + return; + } + try { + client.define(schema.getTopNSpec()); + log.info("installed TopN schema for measure {}", schema.getMetadata().name()); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("TopNAggregation {} already created by another OAP node", schema.getTopNSpec()); + } else { + throw ex; + } + } + } + + /** + * Check if the index rule conflicts with the exist one. + */ + private void checkIndexRuleConflicts(String modelName, IndexRule indexRule, IndexRule existRule) { + if (!existRule.equals(indexRule)) { + throw new IllegalStateException( + "conflict index rule in model: " + modelName + ": " + indexRule + " vs exist rule: " + existRule); + } + } + + /** + * Check if the index rule has been processed. + * If the index rule has been processed, return true. + * Otherwise, return false and mark the index rule as processed. + */ + private boolean checkIndexRuleProcessed(String modelName, IndexRule indexRule) { + Map rules = GROUP_INDEX_RULES.computeIfAbsent( + indexRule.getMetadata().getGroup(), k -> new HashMap<>()); + IndexRule existRule = rules.get(indexRule.getMetadata().getName()); + if (existRule != null) { + checkIndexRuleConflicts(modelName, indexRule, existRule); + return true; + } else { + rules.put(indexRule.getMetadata().getName(), indexRule); + return false; + } + } + + /** + * Define the index rule if not exist and no conflict. + */ + private void defineIndexRule(String modelName, + IndexRule indexRule, + BanyanDBClient client) throws BanyanDBException { + if (checkIndexRuleProcessed(modelName, indexRule)) { + return; + } + try { + client.define(indexRule); + log.info("new IndexRule created: {}", indexRule.getMetadata().getName()); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("IndexRule {} already created by another OAP node", indexRule.getMetadata().getName()); + } else { + throw ex; + } + } + } + + private void defineIndexRuleBinding(List indexRules, + String group, + String name, + BanyandbCommon.Catalog catalog, + BanyanDBClient client) throws BanyanDBException { + List indexRuleNames = indexRules.stream().map(indexRule -> indexRule.getMetadata().getName()).collect( + Collectors.toList()); + try { + client.define(IndexRuleBinding.newBuilder() + .setMetadata(BanyandbCommon.Metadata.newBuilder() + .setGroup(group) + .setName(name)) + .setSubject(BanyandbDatabase.Subject.newBuilder() + .setName(name) + .setCatalog(catalog)) + .addAllRules(indexRuleNames) + .build()); + log.info("new IndexRuleBinding created: {}", name); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("IndexRuleBinding {} already created by another OAP node", name); + } else { + throw ex; + } + } + } + + /** + * Check if the measure exists and update it if necessary + */ + private void checkMeasure(Measure measure, BanyanDBClient client) throws BanyanDBException { + Measure hisMeasure = client.findMeasure(measure.getMetadata().getGroup(), measure.getMetadata().getName()); + if (hisMeasure == null) { + throw new IllegalStateException("Measure: " + measure.getMetadata().getName() + " exist but can't find it from BanyanDB server"); + } else { + boolean equals = hisMeasure.toBuilder() + .clearUpdatedAt() + .clearMetadata() + .build() + .equals(measure.toBuilder().clearMetadata().build()); + if (!equals) { + client.update(measure); + log.info("update Measure: {} from: {} to: {}", hisMeasure.getMetadata().getName(), hisMeasure, measure); + } + } + } + + /** + * Check if the stream exists and update it if necessary + */ + private void checkStream(Stream stream, BanyanDBClient client) throws BanyanDBException { + Stream hisStream = client.findStream(stream.getMetadata().getGroup(), stream.getMetadata().getName()); + if (hisStream == null) { + throw new IllegalStateException("Stream: " + stream.getMetadata().getName() + " exist but can't find it from BanyanDB server"); + } else { + boolean equals = hisStream.toBuilder() + .clearUpdatedAt() + .clearMetadata() + .build() + .equals(stream.toBuilder().clearUpdatedAt().clearMetadata().build()); + if (!equals) { + client.update(stream); + log.info("update Stream: {} from: {} to: {}", hisStream.getMetadata().getName(), hisStream, stream); + } + } + } + + /** + * Check if the index rules exist and update them if necessary + */ + private void checkIndexRules(String modelName, List indexRules, BanyanDBClient client) throws BanyanDBException { + for (IndexRule indexRule : indexRules) { + if (checkIndexRuleProcessed(modelName, indexRule)) { + return; + } + IndexRule hisIndexRule = client.findIndexRule( + indexRule.getMetadata().getGroup(), indexRule.getMetadata().getName()); + if (hisIndexRule == null) { + try { + client.define(indexRule); + log.info("new IndexRule created: {}", indexRule); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("IndexRule {} already created by another OAP node", indexRule); + } else { + throw ex; + } + } + } else { + boolean equals = hisIndexRule.toBuilder() + .clearUpdatedAt() + .clearMetadata() + .build() + .equals(indexRule.toBuilder().clearUpdatedAt().clearMetadata().build()); + if (!equals) { + client.update(indexRule); + log.info( + "update IndexRule: {} from: {} to: {}", hisIndexRule.getMetadata().getName(), hisIndexRule, + indexRule + ); + } + } + } + } + + /** + * Check if the index rule binding exists and update it if necessary. + * If the old index rule is not in the index rule binding, delete it. + */ + private void checkIndexRuleBinding(List indexRules, + String group, + String name, + BanyandbCommon.Catalog catalog, + BanyanDBClient client) throws BanyanDBException { + if (indexRules.isEmpty()) { + return; + } + List indexRuleNames = indexRules.stream().map(indexRule -> indexRule.getMetadata().getName()).collect( + Collectors.toList()); + + IndexRuleBinding indexRuleBinding = IndexRuleBinding.newBuilder() + .setMetadata(BanyandbCommon.Metadata.newBuilder() + .setGroup( + group) + .setName(name)) + .setSubject(BanyandbDatabase.Subject.newBuilder() + .setName(name) + .setCatalog( + catalog)) + .addAllRules(indexRuleNames).build(); + IndexRuleBinding hisIndexRuleBinding = client.findIndexRuleBinding(group, name); + if (hisIndexRuleBinding == null) { + try { + client.define(indexRuleBinding); + log.info("new IndexRuleBinding created: {}", indexRuleBinding); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("IndexRuleBinding {} already created by another OAP node", indexRuleBinding); + } else { + throw ex; + } + } + } else { + boolean equals = hisIndexRuleBinding.toBuilder() + .clearUpdatedAt() + .clearMetadata() + .clearBeginAt() + .clearExpireAt() + .build() + .equals(indexRuleBinding.toBuilder().clearMetadata().build()); + if (!equals) { + // update binding and use the same begin expire time + client.update(indexRuleBinding.toBuilder() + .setBeginAt(hisIndexRuleBinding.getBeginAt()) + .setExpireAt(hisIndexRuleBinding.getExpireAt()) + .build()); + log.info( + "update IndexRuleBinding: {} from: {} to: {}", hisIndexRuleBinding.getMetadata().getName(), + hisIndexRuleBinding, indexRuleBinding + ); + } + } + } + + /** + * Check if the TopN aggregation exists and update it if necessary. + * If the old TopN aggregation is not in the schema, delete it. + */ + private void checkTopNAggregation(Model model, BanyanDBClient client) throws BanyanDBException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); + String topNName = MetadataRegistry.Schema.formatTopNName(schema.getMetadata().name()); + TopNAggregation hisTopNAggregation = client.findTopNAggregation(schema.getMetadata().getGroup(), topNName); + + if (schema.getTopNSpec() != null) { + TopNAggregation topNAggregation = schema.getTopNSpec(); + if (hisTopNAggregation == null) { + try { + client.define(topNAggregation); + log.info("new TopNAggregation created: {}", topNAggregation); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("TopNAggregation {} already created by another OAP node", topNAggregation); + } else { + throw ex; + } + } + } else { + boolean equals = hisTopNAggregation.toBuilder() + .clearUpdatedAt() + .clearMetadata() + .build() + .equals(topNAggregation.toBuilder().clearMetadata().build()); + if (!equals) { + client.update(topNAggregation); + log.info( + "update TopNAggregation: {} from: {} to: {}", hisTopNAggregation.getMetadata().getName(), + hisTopNAggregation, topNAggregation + ); + } + } + } else { + if (hisTopNAggregation != null) { + client.deleteTopNAggregation(schema.getMetadata().getGroup(), topNName); + log.info( + "delete deprecated TopNAggregation: {} from group: {}", hisTopNAggregation.getMetadata().getName(), + schema.getMetadata().getGroup() + ); + } + } + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 8813773386c8..c5214b986c8f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -18,25 +18,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonObject; -import io.grpc.Status; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.Singular; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog; -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group; -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Metadata; -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.ResourceOpts; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.CompressionMethod; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.EncodingMethod; @@ -50,11 +42,7 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation; import org.apache.skywalking.banyandb.model.v1.BanyandbModel; -import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Duration; -import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache; -import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.metrics.IntList; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -77,7 +65,6 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -90,9 +77,6 @@ public enum MetadataRegistry { INSTANCE; - private static final ObjectMapper MAPPER = new ObjectMapper(); - // BanyanDB group setting aligned with the OAP settings - private static final Set GROUP_ALIGNED = new HashSet<>(); private final Map registry = new HashMap<>(); public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig config, DownSamplingConfigService configService) { @@ -187,13 +171,13 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf schemaBuilder.field(field.getName()); } // parse TopN - schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name())); + schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.group, schemaMetadata.name())); registry.put(schemaMetadata.name(), schemaBuilder.build()); return new MeasureModel(builder.build(), indexRules); } - private TopNSpec parseTopNSpec(final Model model, final String measureName) + private TopNAggregation parseTopNSpec(final Model model, final String group, final String measureName) throws StorageException { if (model.getBanyanDBModelExtension().getTopN() == null) { return null; @@ -208,14 +192,16 @@ private TopNSpec parseTopNSpec(final Model model, final String measureName) if (CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())) { throw new StorageException("invalid groupBy tags: " + model.getBanyanDBModelExtension().getTopN().getGroupByTagNames()); } - return TopNSpec.builder() - .name(measureName + "_topn") - .lruSize(model.getBanyanDBModelExtension().getTopN().getLruSize()) - .countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber()) - .fieldName(valueColumnOpt.get().getValueCName()) - .groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames()) - .sort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN - .build(); + return TopNAggregation.newBuilder() + .setMetadata( + Metadata.newBuilder().setGroup(group).setName(Schema.formatTopNName(measureName))) + .setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName)) + .setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN + .setFieldName(valueColumnOpt.get().getValueCName()) + .addAllGroupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames()) + .setCountersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber()) + .setLruSize(model.getBanyanDBModelExtension().getTopN().getLruSize()) + .build(); } public Schema findMetadata(final Model model) { @@ -306,8 +292,8 @@ Duration downSamplingDuration(DownSampling downSampling) { IndexRule indexRule(String group, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) { IndexRule.Builder builder = IndexRule.newBuilder() - .setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group)) - .setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName); + .setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group)) + .setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName); if (analyzer != null) { switch (analyzer) { case KEYWORD: @@ -553,36 +539,6 @@ static String formatName(String modelName, DownSampling downSampling) { return modelName + "_" + downSampling.getName(); } - public Optional findRemoteSchema(BanyanDBClient client) throws BanyanDBException { - try { - switch (kind) { - case STREAM: - return Optional.ofNullable(client.findStream(this.group, this.name())); - case MEASURE: - return Optional.ofNullable(client.findMeasure(this.group, this.name())); - default: - throw new IllegalStateException("should not reach here"); - } - } catch (BanyanDBException ex) { - if (ex.getStatus().equals(Status.Code.NOT_FOUND)) { - return Optional.empty(); - } - - throw ex; - } - } - - public MetadataCache.EntityMetadata updateRemoteSchema(BanyanDBClient client) throws BanyanDBException { - switch (kind) { - case STREAM: - return client.updateStreamMetadataCacheFromSever(this.group, this.name()); - case MEASURE: - return client.updateMeasureMetadataCacheFromSever(this.group, this.name()); - default: - throw new IllegalStateException("should not reach here"); - } - } - private List extractTagFamilySpec(List tagMetadataList, boolean shouldAddID) { final String indexFamily = SchemaMetadata.this.indexFamily(); final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily(); @@ -603,95 +559,6 @@ private List extractTagFamilySpec(List tagMetadataLi return tagFamilySpecs; } - /** - * Check if the group settings need to be updated - */ - private boolean checkGroupUpdate(BanyanDBClient client) throws BanyanDBException { - Group g = client.findGroup(this.group); - return g.getResourceOpts().getShardNum() != this.shard - || g.getResourceOpts().getSegmentInterval().getNum() != this.segmentIntervalDays - || g.getResourceOpts().getTtl().getNum() != this.ttlDays; - } - - public boolean checkResourceExistence(BanyanDBClient client) throws BanyanDBException { - ResourceExist resourceExist; - Group.Builder gBuilder - = Group.newBuilder() - .setMetadata(Metadata.newBuilder().setName(this.group)) - .setResourceOpts(ResourceOpts.newBuilder() - .setShardNum(this.shard) - .setSegmentInterval( - IntervalRule.newBuilder() - .setUnit( - IntervalRule.Unit.UNIT_DAY) - .setNum( - this.segmentIntervalDays)) - .setTtl( - IntervalRule.newBuilder() - .setUnit( - IntervalRule.Unit.UNIT_DAY) - .setNum( - this.ttlDays))); - switch (kind) { - case STREAM: - resourceExist = client.existStream(this.group, this.name()); - gBuilder.setCatalog(Catalog.CATALOG_STREAM).build(); - if (!GROUP_ALIGNED.contains(this.group)) { - // create the group if not exist - if (!resourceExist.hasGroup()) { - try { - Group g = client.define(gBuilder.build()); - if (g != null) { - log.info("group {} created", g.getMetadata().getName()); - } - } catch (BanyanDBException ex) { - if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { - log.info("group {} already created by another OAP node", this.group); - } else { - throw ex; - } - } - } else { - // update the group if necessary - if (this.checkGroupUpdate(client)) { - client.update(gBuilder.build()); - log.info("group {} updated", this.group); - } - } - // mark the group as aligned - GROUP_ALIGNED.add(this.group); - } - return resourceExist.hasResource(); - case MEASURE: - resourceExist = client.existMeasure(this.group, this.name()); - gBuilder.setCatalog(Catalog.CATALOG_MEASURE).build(); - if (!GROUP_ALIGNED.contains(this.group)) { - if (!resourceExist.hasGroup()) { - try { - Group g = client.define(gBuilder.build()); - if (g != null) { - log.info("group {} created", g.getMetadata().getName()); - } - } catch (BanyanDBException ex) { - if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { - log.info("group {} already created by another OAP node", this.group); - } else { - throw ex; - } - } - } else { - if (this.checkGroupUpdate(client)) { - client.update(gBuilder.build()); - log.info("group {} updated", this.group); - } - } - } - return resourceExist.hasResource(); - default: - throw new IllegalStateException("should not reach here"); - } - } - /** * @return name of the Stream/Measure in the BanyanDB */ @@ -761,52 +628,17 @@ public static class Schema { @Getter @Nullable - private final TopNSpec topNSpec; + private final TopNAggregation topNSpec; public ColumnSpec getSpec(String columnName) { return this.specs.get(columnName); } - public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException { - if (this.getTopNSpec() == null) { - if (this.metadata.kind == Kind.MEASURE) { - log.debug("skip null TopN Schema for [{}]", metadata.getModelName()); - } - return; - } - TopNAggregation.Builder builder - = TopNAggregation.newBuilder() - .setMetadata(Metadata.newBuilder() - .setGroup(getMetadata().getGroup()) - .setName(this.getTopNSpec().getName())) - - .setSourceMeasure(Metadata.newBuilder() - .setGroup(getMetadata().getGroup()) - .setName(getMetadata().name())) - .setFieldValueSort(this.getTopNSpec().getSort()) - .setFieldName(this.getTopNSpec().getFieldName()) - .addAllGroupByTagNames(this.getTopNSpec().getGroupByTagNames()) - .setCountersNumber(this.getTopNSpec().getCountersNumber()) - .setLruSize(this.getTopNSpec().getLruSize()); - client.define(builder.build()); - log.info("installed TopN schema for measure {}", getMetadata().name()); + public static String formatTopNName(String measureName) { + return measureName + "_topn"; } } - @Builder - @EqualsAndHashCode - @Getter - @ToString - public static class TopNSpec { - private final String name; - @Singular - private final List groupByTagNames; - private final String fieldName; - private final BanyandbModel.Sort sort; - private final int lruSize; - private final int countersNumber; - } - @RequiredArgsConstructor @Getter @ToString @@ -818,11 +650,4 @@ public static class ColumnSpec { public enum ColumnType { TAG, FIELD; } - - @Getter - @Setter - @NoArgsConstructor - public static class GroupSetting { - private int segmentIntervalDays; - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java index 9b68a1d1fef0..33164a5e9bc8 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java @@ -55,13 +55,14 @@ public List querySchedules(String taskId) throws IOExcept MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EBPFProfilingScheduleRecord.INDEX_NAME, DownSampling.Minute); MeasureQueryResponse resp = query(schema, TAGS, - Collections.emptySet(), new QueryBuilder() { - @Override - protected void apply(MeasureQuery query) { - query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId)); - query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC)); - } - }); + Collections.emptySet(), new QueryBuilder() { + @Override + protected void apply(MeasureQuery query) { + query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId)); + query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC)); + } + } + ); return resp.getDataPoints().stream().map(this::buildEBPFProfilingSchedule).collect(Collectors.toList()); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java index 09174d399ef3..cc85537b0bdf 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java @@ -120,9 +120,11 @@ protected void apply(MeasureQuery query) { query.limit(page.getLimit()); query.offset(page.getFrom()); if (queryOrder == Order.ASC) { - query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC)); + query.setOrderBy( + new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC)); } else { - query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC)); + query.setOrderBy( + new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC)); } for (final EventQueryCondition condition : conditionList) { List> queryConditions = new ArrayList<>(); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java index b33335976435..9f6cef7b1b42 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import com.google.gson.Gson; +import java.util.Objects; import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.banyandb.v1.client.AbstractCriteria; import org.apache.skywalking.banyandb.v1.client.AbstractQuery; @@ -183,9 +184,10 @@ private TopNQueryResponse topNQuery(MetadataRegistry.Schema schema, AbstractQuery.Sort sort, List additionalConditions, List attributes) throws IOException { - final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), schema.getTopNSpec().getName(), - timestampRange, - number, sort); + final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), Objects.requireNonNull( + schema.getTopNSpec()).getMetadata().getName(), + timestampRange, + number, sort); q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN); List> conditions = new ArrayList<>(); if (CollectionUtils.isNotEmpty(additionalConditions)) { diff --git a/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml b/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml index 110dad60f93d..044a3ab993b0 100644 --- a/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml +++ b/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml @@ -83,4 +83,4 @@ verify: interval: 10s cases: - includes: - - ../accesslog-cases.yaml \ No newline at end of file + - ../accesslog-cases.yaml