Skip to content

Commit

Permalink
Adapt BanyanDB Java Client 0.7.0. (#12621)
Browse files Browse the repository at this point in the history
  • Loading branch information
wankai123 authored Sep 14, 2024
1 parent f8716b4 commit ddbed6d
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 174 deletions.
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* Fix the previous analysis result missing in the ALS `k8s-mesh` analyzer.
* Fix `findEndpoint` query require `keyword` when using BanyanDB.
* Support to analysis the ztunnel mapped IP address in eBPF Access Log Receiver.
* Adapt BanyanDB Java Client 0.7.0-rc3.

#### UI

Expand Down
2 changes: 1 addition & 1 deletion oap-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<httpcore.version>4.4.13</httpcore.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<commons-compress.version>1.21</commons-compress.version>
<banyandb-java-client.version>0.7.0-rc2</banyandb-java-client.version>
<banyandb-java-client.version>0.7-rc3</banyandb-java-client.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<consul.client.version>1.5.3</consul.client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;

@Slf4j
public class BanyanDBIndexInstaller extends ModelInstaller {
Expand Down Expand Up @@ -58,20 +60,20 @@ public boolean isExists(Model model) throws StorageException {
final boolean resourceExist = metadata.checkResourceExistence(c);
if (!resourceExist) {
return false;
}

// then check entity schema
if (metadata.findRemoteSchema(c).isPresent()) {
// register models only locally but not remotely
} else {
// register models only locally(Schema cache) but not remotely
if (model.isRecord()) { // stream
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
} else { // measure
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
}
// pre-load remote schema for java client
MetadataCache.EntityMetadata remoteMeta = metadata.updateRemoteSchema(c);
if (remoteMeta == null) {
throw new IllegalStateException("inconsistent state: metadata:" + metadata + ", remoteMeta: null");
}
return true;
}

throw new IllegalStateException("inconsistent state:" + metadata);
} catch (BanyanDBException ex) {
throw new StorageException("fail to check existence", ex);
}
Expand All @@ -84,11 +86,17 @@ public void createTable(Model model) throws StorageException {
.provider()
.getService(ConfigService.class);
if (model.isRecord()) { // stream
Stream stream = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
StreamModel streamModel = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
Stream stream = streamModel.getStream();
if (stream != null) {
log.info("install stream schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
((BanyanDBStorageClient) client).define(stream);
if (CollectionUtils.isNotEmpty(streamModel.getIndexRules())) {
client.define(stream, streamModel.getIndexRules());
} else {
client.define(stream);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info(
Expand All @@ -102,12 +110,17 @@ public void createTable(Model model) throws StorageException {
}
}
} else { // measure
Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
Measure measure = measureModel.getMeasure();
if (measure != null) {
log.info("install measure schema {}", measure.name());
final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
log.info("install measure schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
c.define(measure);
if (CollectionUtils.isNotEmpty(measureModel.getIndexRules())) {
client.define(measure, measureModel.getIndexRules());
} else {
client.define(measure);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} already created by another OAP node",
Expand All @@ -119,7 +132,7 @@ public void createTable(Model model) throws StorageException {
}
final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
try {
schema.installTopNAggregation(c);
schema.installTopNAggregation(client);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} TopN({}) already created by another OAP node",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void insert(Model model, NoneStream noneStream) throws IOException {
if (schema == null) {
throw new IOException(model.getName() + " is not registered");
}
StreamWrite streamWrite = getClient().client.createStreamWrite(
StreamWrite streamWrite = getClient().createStreamWrite(
schema.getMetadata().getGroup(), // group name
schema.getMetadata().name(), // stream-name
noneStream.id().build() // identity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import io.grpc.Status;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
Expand All @@ -30,14 +31,15 @@
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TopNQuery;
import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
import org.apache.skywalking.banyandb.v1.client.metadata.Property;
import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
Expand Down Expand Up @@ -103,9 +105,9 @@ public Property queryProperty(String group, String name, String id) throws IOExc
}
}

public PropertyStore.DeleteResult deleteProperty(String group, String name, String id, String... tags) throws IOException {
public DeleteResponse deleteProperty(String group, String name, String id, String... tags) throws IOException {
try {
PropertyStore.DeleteResult result = this.client.deleteProperty(group, name, id, tags);
DeleteResponse result = this.client.deleteProperty(group, name, id, tags);
this.healthChecker.health();
return result;
} catch (BanyanDBException ex) {
Expand Down Expand Up @@ -158,7 +160,7 @@ public TopNQueryResponse query(TopNQuery q) throws IOException {
}

/**
* PropertyStore.Strategy is default to {@link PropertyStore.Strategy#MERGE}
* PropertyStore.Strategy is default to {@link Strategy#STRATEGY_MERGE}
*/
public void define(Property property) throws IOException {
try {
Expand All @@ -170,7 +172,7 @@ public void define(Property property) throws IOException {
}
}

public void define(Property property, PropertyStore.Strategy strategy) throws IOException {
public void define(Property property, Strategy strategy) throws IOException {
try {
this.client.apply(property, strategy);
this.healthChecker.health();
Expand All @@ -190,6 +192,16 @@ public void define(Stream stream) throws BanyanDBException {
}
}

public void define(Stream stream, List<BanyandbDatabase.IndexRule> indexRules) throws BanyanDBException {
try {
this.client.define(stream, indexRules);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
throw ex;
}
}

public void define(Measure measure) throws BanyanDBException {
try {
this.client.define(measure);
Expand All @@ -200,6 +212,16 @@ public void define(Measure measure) throws BanyanDBException {
}
}

public void define(Measure measure, List<BanyandbDatabase.IndexRule> indexRules) throws BanyanDBException {
try {
this.client.define(measure, indexRules);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
throw ex;
}
}

public void defineIfEmpty(Group group) throws IOException {
try {
try {
Expand All @@ -223,12 +245,20 @@ public void define(TopNAggregation topNAggregation) throws IOException {
}
}

public StreamWrite createStreamWrite(String group, String name, String elementId) {
return this.client.createStreamWrite(group, name, elementId);
public StreamWrite createStreamWrite(String group, String name, String elementId) throws IOException {
try {
return this.client.createStreamWrite(group, name, elementId);
} catch (BanyanDBException e) {
throw new IOException("fail to create stream write", e);
}
}

public MeasureWrite createMeasureWrite(String group, String name, long timestamp) {
return this.client.createMeasureWrite(group, name, timestamp);
public MeasureWrite createMeasureWrite(String group, String name, long timestamp) throws IOException {
try {
return this.client.createMeasureWrite(group, name, timestamp);
} catch (BanyanDBException e) {
throw new IOException("fail to create measure write", e);
}
}

public void write(StreamWrite streamWrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
Expand Down Expand Up @@ -175,7 +175,13 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
this.client.registerChecker(healthChecker);
try {
this.client.connect();
this.client.defineIfEmpty(Group.create(BanyanDBUITemplateManagementDAO.GROUP));
this.client.defineIfEmpty(BanyandbCommon.Group.newBuilder()
.setMetadata(
BanyandbCommon.Metadata.newBuilder()
.setName(
BanyanDBUITemplateManagementDAO.GROUP))
.setCatalog(BanyandbCommon.Catalog.CATALOG_UNSPECIFIED)
.build());
this.modelInstaller.start();

getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(modelInstaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.metadata.Property;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenu;
import org.apache.skywalking.oap.server.core.storage.management.UIMenuManagementDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
Expand All @@ -46,17 +49,27 @@ public UIMenu getMenu(String id) throws IOException {

@Override
public void saveMenu(UIMenu menu) throws IOException {
this.getClient().define(Property.create(GROUP, UIMenu.INDEX_NAME, menu.id().build())
.addTag(TagAndValue.newStringTag(UIMenu.CONFIGURATION, menu.getConfigurationJson()))
.addTag(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, menu.getUpdateTime()))
.build());
Property property = Property.newBuilder()
.setMetadata(BanyandbProperty.Metadata.newBuilder().setId(menu.getMenuId())
.setContainer(
BanyandbCommon.Metadata.newBuilder()
.setGroup(GROUP)
.setName(
UIMenu.INDEX_NAME)))

.addTags(TagAndValue.newStringTag(UIMenu.CONFIGURATION, menu.getConfigurationJson())
.build())
.addTags(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, menu.getUpdateTime()).build())
.build();
this.getClient().define(property);
}

public UIMenu parse(Property property) {
UIMenu menu = new UIMenu();
menu.setMenuId(property.id());
menu.setMenuId(property.getMetadata().getId());

for (TagAndValue<?> tagAndValue : property.tags()) {
for (BanyandbModel.Tag tag : property.getTagsList()) {
TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
if (tagAndValue.getTagName().equals(UIMenu.CONFIGURATION)) {
menu.setConfigurationJson((String) tagAndValue.getValue());
} else if (tagAndValue.getTagName().equals(UIMenu.UPDATE_TIME)) {
Expand Down
Loading

0 comments on commit ddbed6d

Please sign in to comment.