Skip to content

Commit

Permalink
Merge pull request #340 from weaviate/add-support-for-headers
Browse files Browse the repository at this point in the history
Add support for headers in async client
  • Loading branch information
antas-marcin authored Nov 26, 2024
2 parents 44d46b5 + 41a84f2 commit b991dbf
Show file tree
Hide file tree
Showing 55 changed files with 227 additions and 154 deletions.
14 changes: 13 additions & 1 deletion src/main/java/io/weaviate/client/base/AsyncBaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.weaviate.client.Config;
import io.weaviate.client.base.http.async.ResponseParser;
import io.weaviate.client.base.http.async.WeaviateResponseConsumer;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
Expand All @@ -15,10 +17,12 @@ public abstract class AsyncBaseClient<T> {
protected final CloseableHttpAsyncClient client;
private final Config config;
private final Serializer serializer;
private final AccessTokenProvider tokenProvider;

public AsyncBaseClient(CloseableHttpAsyncClient client, Config config) {
public AsyncBaseClient(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider) {
this.client = client;
this.config = config;
this.tokenProvider = tokenProvider;
this.serializer = new Serializer();
}

Expand Down Expand Up @@ -78,6 +82,14 @@ protected SimpleHttpRequest getRequest(String endpoint, Object payload, String m
SimpleHttpRequest req = new SimpleHttpRequest(method, String.format("%s%s", config.getBaseURL(), endpoint));
req.addHeader(HttpHeaders.ACCEPT, "*/*");
req.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
if (config.getHeaders() != null) {
for (Map.Entry<String, String> h : config.getHeaders().entrySet()) {
req.addHeader(h.getKey(), h.getValue());
}
}
if (tokenProvider != null) {
req.addHeader("Authorization", String.format("Bearer %s", tokenProvider.getAccessToken()));
}
if (payload != null) {
req.setBody(serializer.toJsonString(payload), ContentType.APPLICATION_JSON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

import io.weaviate.client.Config;
import io.weaviate.client.base.http.async.WeaviateGraphQLTypedResponseConsumer;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.graphql.model.GraphQLTypedResponse;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

public class AsyncBaseGraphQLClient<T> extends AsyncBaseClient<T> {
public AsyncBaseGraphQLClient(CloseableHttpAsyncClient client, Config config) {
super(client, config);
public AsyncBaseGraphQLClient(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider) {
super(client, config, tokenProvider);
}

protected <C> Future<Result<GraphQLTypedResponse<C>>> sendGraphQLTypedRequest(Object payload, Class<C> classOfC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,35 @@ public WeaviateAsyncClient(Config config, AccessTokenProvider tokenProvider) {
}

public Misc misc() {
return new Misc(client, config);
return new Misc(client, config, tokenProvider);
}

public Schema schema() {
return new Schema(client, config, dbVersionSupport);
return new Schema(client, config, tokenProvider, dbVersionSupport);
}

public Data data() {
return new Data(client, config, dbVersionSupport);
return new Data(client, config, tokenProvider, dbVersionSupport);
}

public Batch batch() {
return new Batch(client, config, dbVersionSupport, grpcVersionSupport, tokenProvider, data());
}

public Cluster cluster() {
return new Cluster(client, config);
return new Cluster(client, config, tokenProvider);
}

public Classifications classifications() {
return new Classifications(client, config);
return new Classifications(client, config, tokenProvider);
}

public Backup backup() {
return new Backup(client, config);
return new Backup(client, config, tokenProvider);
}

public GraphQL graphQL() {
return new GraphQL(client, config);
return new GraphQL(client, config, tokenProvider);
}

private DbVersionProvider initDbVersionProvider() {
Expand All @@ -83,7 +83,7 @@ private DbVersionProvider initDbVersionProvider() {

private Result<Meta> getMeta() {
try {
return new Misc(client, config).metaGetter().run().get();
return new Misc(client, config, tokenProvider).metaGetter().run().get();
} catch (InterruptedException | ExecutionException e) {
// we can't connect to Weaviate, metaResult will be null
return null;
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/io/weaviate/client/v1/async/backup/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.weaviate.client.v1.async.backup.api.BackupGetter;
import io.weaviate.client.v1.async.backup.api.BackupRestoreStatusGetter;
import io.weaviate.client.v1.async.backup.api.BackupRestorer;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import lombok.RequiredArgsConstructor;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

Expand All @@ -17,37 +18,37 @@ public class Backup {

private final CloseableHttpAsyncClient client;
private final Config config;

private final AccessTokenProvider tokenProvider;

public BackupCreator creator() {
return creator(null);
}

public BackupCreator creator(Executor executor) {
return new BackupCreator(client, config, createStatusGetter(), executor);
return new BackupCreator(client, config, tokenProvider, createStatusGetter(), executor);
}

public BackupCreateStatusGetter createStatusGetter() {
return new BackupCreateStatusGetter(client, config);
return new BackupCreateStatusGetter(client, config, tokenProvider);
}

public BackupRestorer restorer() {
return restorer(null);
}

public BackupRestorer restorer(Executor executor) {
return new BackupRestorer(client, config, restoreStatusGetter(), executor);
return new BackupRestorer(client, config, tokenProvider, restoreStatusGetter(), executor);
}

public BackupRestoreStatusGetter restoreStatusGetter() {
return new BackupRestoreStatusGetter(client, config);
return new BackupRestoreStatusGetter(client, config, tokenProvider);
}

public BackupCanceler canceler() {
return new BackupCanceler(client, config);
return new BackupCanceler(client, config, tokenProvider);
}

public BackupGetter getter() { // TODO: add test
return new BackupGetter(client, config);
return new BackupGetter(client, config, tokenProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

Expand All @@ -23,8 +24,8 @@ public class BackupCanceler extends AsyncBaseClient<Void>
private String backupId;


public BackupCanceler(CloseableHttpAsyncClient client, Config config) {
super(client, config);
public BackupCanceler(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider) {
super(client, config, tokenProvider);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.model.BackupCreateStatusResponse;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
Expand All @@ -18,8 +19,8 @@ public class BackupCreateStatusGetter extends AsyncBaseClient<BackupCreateStatus
private String backupId;


public BackupCreateStatusGetter(CloseableHttpAsyncClient client, Config config) {
super(client, config);
public BackupCreateStatusGetter(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider) {
super(client, config, tokenProvider);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.weaviate.client.base.WeaviateErrorResponse;
import io.weaviate.client.base.util.Futures;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.model.BackupCreateResponse;
import io.weaviate.client.v1.backup.model.BackupCreateStatusResponse;
import io.weaviate.client.v1.backup.model.CreateStatus;
Expand Down Expand Up @@ -41,8 +42,8 @@ public class BackupCreator extends AsyncBaseClient<BackupCreateResponse>
private final Executor executor;


public BackupCreator(CloseableHttpAsyncClient client, Config config, BackupCreateStatusGetter statusGetter, Executor executor) {
super(client, config);
public BackupCreator(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, BackupCreateStatusGetter statusGetter, Executor executor) {
super(client, config, tokenProvider);
this.statusGetter = statusGetter;
this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.model.BackupCreateResponse;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
Expand All @@ -17,8 +18,8 @@ public class BackupGetter extends AsyncBaseClient<BackupCreateResponse[]>
private String backend;


public BackupGetter(CloseableHttpAsyncClient client, Config config) {
super(client, config);
public BackupGetter(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider) {
super(client, config, tokenProvider);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.model.BackupRestoreStatusResponse;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
Expand All @@ -18,8 +19,8 @@ public class BackupRestoreStatusGetter extends AsyncBaseClient<BackupRestoreStat
private String backupId;


public BackupRestoreStatusGetter(CloseableHttpAsyncClient client, Config config) {
super(client, config);
public BackupRestoreStatusGetter(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider) {
super(client, config, tokenProvider);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.weaviate.client.base.WeaviateErrorResponse;
import io.weaviate.client.base.util.Futures;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.model.BackupRestoreResponse;
import io.weaviate.client.v1.backup.model.BackupRestoreStatusResponse;
import io.weaviate.client.v1.backup.model.RestoreStatus;
Expand Down Expand Up @@ -41,8 +42,8 @@ public class BackupRestorer extends AsyncBaseClient<BackupRestoreResponse>
private final Executor executor;


public BackupRestorer(CloseableHttpAsyncClient client, Config config, BackupRestoreStatusGetter statusGetter, Executor executor) {
super(client, config);
public BackupRestorer(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, BackupRestoreStatusGetter statusGetter, Executor executor) {
super(client, config, tokenProvider);
this.statusGetter = statusGetter;
this.executor = executor;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/weaviate/client/v1/async/batch/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ObjectsBatcher objectsAutoBatcher(ObjectsBatcher.BatchRetriesConfig batch
}

public ObjectsBatchDeleter objectsBatchDeleter() {
return new ObjectsBatchDeleter(client, config, objectsPath);
return new ObjectsBatchDeleter(client, config, tokenProvider, objectsPath);
}

public ReferencePayloadBuilder referencePayloadBuilder() {
Expand All @@ -143,7 +143,7 @@ public ReferencesBatcher referencesBatcher(ReferencesBatcher.BatchRetriesConfig

public ReferencesBatcher referencesBatcher(ReferencesBatcher.BatchRetriesConfig batchRetriesConfig,
Executor executor) {
return ReferencesBatcher.create(client, config, referencesPath, batchRetriesConfig, executor);
return ReferencesBatcher.create(client, config, tokenProvider, referencesPath, batchRetriesConfig, executor);
}

public ReferencesBatcher referencesAutoBatcher() {
Expand Down Expand Up @@ -208,6 +208,6 @@ public ReferencesBatcher referencesAutoBatcher(ReferencesBatcher.BatchRetriesCon
public ReferencesBatcher referencesAutoBatcher(ReferencesBatcher.BatchRetriesConfig batchRetriesConfig,
ReferencesBatcher.AutoBatchConfig autoBatchConfig,
Executor executor) {
return ReferencesBatcher.createAuto(client, config, referencesPath, batchRetriesConfig, autoBatchConfig, executor);
return ReferencesBatcher.createAuto(client, config, tokenProvider, referencesPath, batchRetriesConfig, autoBatchConfig, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.batch.model.BatchDeleteResponse;
import io.weaviate.client.v1.batch.util.ObjectsPath;
import io.weaviate.client.v1.filters.WhereFilter;
Expand All @@ -20,8 +21,8 @@ public class ObjectsBatchDeleter extends AsyncBaseClient<BatchDeleteResponse> im
private String output;
private Boolean dryRun;

public ObjectsBatchDeleter(CloseableHttpAsyncClient client, Config config, ObjectsPath objectsPath) {
super(client, config);
public ObjectsBatchDeleter(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, ObjectsPath objectsPath) {
super(client, config, tokenProvider);
this.objectsPath = objectsPath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private ObjectsBatcher(CloseableHttpAsyncClient client, Config config, Data data
AccessTokenProvider tokenProvider, GrpcVersionSupport grpcVersionSupport,
ObjectsBatcher.BatchRetriesConfig batchRetriesConfig, ObjectsBatcher.AutoBatchConfig autoBatchConfig,
Executor executor) {
super(client, config);
super(client, config, tokenProvider);
this.config = config;
this.tokenProvider = tokenProvider;
this.data = data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.weaviate.client.base.WeaviateErrorResponse;
import io.weaviate.client.base.util.Assert;
import io.weaviate.client.base.util.Futures;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.batch.model.BatchReference;
import io.weaviate.client.v1.batch.model.BatchReferenceResponse;
import io.weaviate.client.v1.batch.util.ReferencesPath;
Expand Down Expand Up @@ -52,10 +53,10 @@ public class ReferencesBatcher extends AsyncBaseClient<BatchReferenceResponse[]>
private String consistencyLevel;


private ReferencesBatcher(CloseableHttpAsyncClient client, Config config, ReferencesPath referencesPath,
private ReferencesBatcher(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, ReferencesPath referencesPath,
BatchRetriesConfig batchRetriesConfig, AutoBatchConfig autoBatchConfig,
Executor executor) {
super(client, config);
super(client, config, tokenProvider);
this.referencesPath = referencesPath;
this.futures = Collections.synchronizedList(new ArrayList<>());
this.references = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -71,18 +72,18 @@ private ReferencesBatcher(CloseableHttpAsyncClient client, Config config, Refere
}
}

public static ReferencesBatcher create(CloseableHttpAsyncClient client, Config config, ReferencesPath referencesPath,
public static ReferencesBatcher create(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, ReferencesPath referencesPath,
BatchRetriesConfig batchRetriesConfig, Executor executor) {
Assert.requiredNotNull(batchRetriesConfig, "batchRetriesConfig");
return new ReferencesBatcher(client, config, referencesPath, batchRetriesConfig, null, executor);
return new ReferencesBatcher(client, config, tokenProvider, referencesPath, batchRetriesConfig, null, executor);
}

public static ReferencesBatcher createAuto(CloseableHttpAsyncClient client, Config config, ReferencesPath referencesPath,
public static ReferencesBatcher createAuto(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, ReferencesPath referencesPath,
BatchRetriesConfig batchRetriesConfig, AutoBatchConfig autoBatchConfig,
Executor executor) {
Assert.requiredNotNull(batchRetriesConfig, "batchRetriesConfig");
Assert.requiredNotNull(autoBatchConfig, "autoBatchConfig");
return new ReferencesBatcher(client, config, referencesPath, batchRetriesConfig, autoBatchConfig, executor);
return new ReferencesBatcher(client, config, tokenProvider, referencesPath, batchRetriesConfig, autoBatchConfig, executor);
}


Expand Down
Loading

0 comments on commit b991dbf

Please sign in to comment.