Skip to content

Commit 3152541

Browse files
committed
HSEARCH-5464 Move around gson entity to keep the common code in the backend
1 parent b805101 commit 3152541

File tree

31 files changed

+651
-1411
lines changed

31 files changed

+651
-1411
lines changed

backend/elasticsearch-client/elasticsearch-rest5-client/src/main/java/org/hibernate/search/backend/elasticsearch/client/rest5/impl/ByteBufferDataStreamChannel.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

backend/elasticsearch-client/elasticsearch-rest5-client/src/main/java/org/hibernate/search/backend/elasticsearch/client/rest5/impl/ClientRest5ElasticsearchClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private CompletableFuture<Response> send(ElasticsearchRequest elasticsearchReque
9797

9898
HttpEntity entity;
9999
try {
100-
entity = GsonHttpEntity.toEntity( gson, elasticsearchRequest );
100+
entity = ClientRest5GsonHttpEntity.toEntity( gson, elasticsearchRequest );
101101
}
102102
catch (IOException | RuntimeException e) {
103103
completableFuture.completeExceptionally( e );
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright Red Hat Inc. and Hibernate Authors
4+
*/
5+
package org.hibernate.search.backend.elasticsearch.client.rest5.impl;
6+
7+
import java.io.IOException;
8+
import java.nio.ByteBuffer;
9+
import java.util.List;
10+
import java.util.Set;
11+
12+
import org.hibernate.search.backend.elasticsearch.client.common.gson.entity.spi.ContentEncoder;
13+
import org.hibernate.search.backend.elasticsearch.client.common.gson.entity.spi.GsonHttpEntityContentProvider;
14+
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequest;
15+
16+
import com.google.gson.Gson;
17+
import com.google.gson.JsonObject;
18+
19+
import org.apache.hc.core5.function.Supplier;
20+
import org.apache.hc.core5.http.ContentType;
21+
import org.apache.hc.core5.http.Header;
22+
import org.apache.hc.core5.http.HttpEntity;
23+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
24+
import org.apache.hc.core5.http.nio.DataStreamChannel;
25+
26+
/**
27+
* Optimised adapter to encode GSON objects into HttpEntity instances.
28+
* The naive approach was using various StringBuilders; the objects we
29+
* need to serialise into JSON might get large and this was causing the
30+
* internal StringBuilder buffers to need frequent resizing and cause
31+
* problems with excessive allocations.
32+
*
33+
* Rather than trying to guess reasonable default sizes for these buffers,
34+
* we can defer the serialisation to write directly into the ByteBuffer
35+
* of the HTTP client, this has the additional benefit of making the
36+
* intermediary buffers short lived.
37+
*
38+
* The one complexity to watch for is flow control: when writing into
39+
* the output buffer chances are that not all bytes are accepted; in
40+
* this case we have to hold on the remaining portion of data to
41+
* be written when the flow control is re-enabled.
42+
*
43+
* A side effect of this strategy is that the total content length which
44+
* is being produced is not known in advance. Not reporting the length
45+
* in advance to the Apache Http client causes it to use chunked-encoding,
46+
* which is great for large blocks but not optimal for small messages.
47+
* For this reason we attempt to start encoding into a small buffer
48+
* upfront: if all data we need to produce fits into that then we can
49+
* report the content length; if not the encoding completion will be deferred
50+
* but not resetting so to avoid repeating encoding work.
51+
*
52+
* @author Sanne Grinovero (C) 2017 Red Hat Inc.
53+
*/
54+
final class ClientRest5GsonHttpEntity extends GsonHttpEntityContentProvider implements HttpEntity, AsyncEntityProducer {
55+
56+
private static final String CONTENT_TYPE = ContentType.APPLICATION_JSON.toString();
57+
58+
59+
public static HttpEntity toEntity(Gson gson, ElasticsearchRequest request) throws IOException {
60+
final List<JsonObject> bodyParts = request.bodyParts();
61+
if ( bodyParts.isEmpty() ) {
62+
return null;
63+
}
64+
return new ClientRest5GsonHttpEntity( gson, bodyParts );
65+
}
66+
67+
public ClientRest5GsonHttpEntity(Gson gson, List<JsonObject> bodyParts) throws IOException {
68+
super( gson, bodyParts );
69+
}
70+
71+
@Override
72+
public boolean isRepeatable() {
73+
return true;
74+
}
75+
76+
@Override
77+
public void failed(Exception cause) {
78+
79+
}
80+
81+
@Override
82+
public boolean isChunked() {
83+
return false;
84+
}
85+
86+
@Override
87+
public Set<String> getTrailerNames() {
88+
return Set.of();
89+
}
90+
91+
@Override
92+
public String getContentType() {
93+
return CONTENT_TYPE;
94+
}
95+
96+
@Override
97+
public String getContentEncoding() {
98+
//Apparently this is the correct value:
99+
return null;
100+
}
101+
102+
@Override
103+
public boolean isStreaming() {
104+
return false;
105+
}
106+
107+
@Override
108+
public Supplier<List<? extends Header>> getTrailers() {
109+
return null;
110+
}
111+
112+
@Override
113+
public int available() {
114+
return 0;
115+
}
116+
117+
@Override
118+
public void produce(DataStreamChannel channel) throws IOException {
119+
produceContent( new ClientRest5ContentEncoder( channel ) );
120+
}
121+
122+
@Override
123+
public void releaseResources() {
124+
close();
125+
}
126+
127+
128+
private static class ClientRest5ContentEncoder implements ContentEncoder {
129+
130+
private final DataStreamChannel channel;
131+
132+
public ClientRest5ContentEncoder(DataStreamChannel channel) {
133+
this.channel = channel;
134+
}
135+
136+
@Override
137+
public int write(ByteBuffer src) throws IOException {
138+
return channel.write( src );
139+
}
140+
141+
@Override
142+
public void complete() throws IOException {
143+
channel.endStream();
144+
}
145+
146+
@Override
147+
public boolean isCompleted() {
148+
return false;
149+
}
150+
}
151+
}

backend/elasticsearch-client/elasticsearch-rest5-client/src/main/java/org/hibernate/search/backend/elasticsearch/client/rest5/impl/ClientRest5HttpRequestInterceptor.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,28 @@
77
import java.io.IOException;
88
import java.io.InputStream;
99
import java.net.URISyntaxException;
10+
import java.nio.ByteBuffer;
1011
import java.util.HashMap;
1112
import java.util.List;
1213
import java.util.Map;
1314

15+
import org.hibernate.search.backend.elasticsearch.client.common.gson.entity.spi.ContentEncoder;
16+
import org.hibernate.search.backend.elasticsearch.client.common.gson.entity.spi.ContentProducer;
17+
import org.hibernate.search.backend.elasticsearch.client.common.gson.entity.spi.HttpAsyncContentProducerInputStream;
1418
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequestInterceptor;
1519
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequestInterceptorContext;
1620
import org.hibernate.search.util.common.AssertionFailure;
1721

1822
import org.apache.hc.client5.http.protocol.HttpClientContext;
1923
import org.apache.hc.core5.http.EntityDetails;
24+
import org.apache.hc.core5.http.Header;
2025
import org.apache.hc.core5.http.HttpEntity;
2126
import org.apache.hc.core5.http.HttpEntityContainer;
2227
import org.apache.hc.core5.http.HttpRequest;
2328
import org.apache.hc.core5.http.HttpRequestInterceptor;
2429
import org.apache.hc.core5.http.NameValuePair;
2530
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
31+
import org.apache.hc.core5.http.nio.DataStreamChannel;
2632
import org.apache.hc.core5.http.protocol.HttpContext;
2733
import org.apache.hc.core5.net.URIBuilder;
2834

@@ -68,7 +74,7 @@ else if ( request instanceof HttpEntityContainer entityContainer ) {
6874
if ( !producer.isRepeatable() ) {
6975
throw new AssertionFailure( "Cannot sign AWS requests with non-repeatable entities" );
7076
}
71-
return new HttpAsyncEntityProducerInputStream( producer, 1024 );
77+
return new HttpAsyncContentProducerInputStream( new ClientRest5GsonContentProducer( producer ), 1024 );
7278
}
7379
return null;
7480
}
@@ -140,4 +146,50 @@ public String toString() {
140146
return request.toString();
141147
}
142148
}
149+
150+
private static class GsonEncoderDataStreamChannel implements DataStreamChannel {
151+
private final ContentEncoder encoder;
152+
153+
public GsonEncoderDataStreamChannel(ContentEncoder encoder) {
154+
this.encoder = encoder;
155+
}
156+
157+
@Override
158+
public void requestOutput() {
159+
160+
}
161+
162+
@Override
163+
public int write(ByteBuffer src) throws IOException {
164+
return encoder.write( src );
165+
}
166+
167+
@Override
168+
public void endStream(List<? extends Header> trailers) throws IOException {
169+
encoder.complete();
170+
}
171+
172+
@Override
173+
public void endStream() throws IOException {
174+
encoder.complete();
175+
}
176+
}
177+
178+
private static class ClientRest5GsonContentProducer implements ContentProducer {
179+
private final AsyncEntityProducer producer;
180+
181+
public ClientRest5GsonContentProducer(AsyncEntityProducer producer) {
182+
this.producer = producer;
183+
}
184+
185+
@Override
186+
public void produceContent(ContentEncoder encoder) throws IOException {
187+
producer.produce( new GsonEncoderDataStreamChannel( encoder ) );
188+
}
189+
190+
@Override
191+
public void close() throws IOException {
192+
producer.releaseResources();
193+
}
194+
}
143195
}

0 commit comments

Comments
 (0)