Skip to content

Commit 8862c08

Browse files
committed
Add hierarchical routing processors for document co-location
Implements ingest and search pipeline processors to co-locate related documents based on hierarchical paths (e.g., folder structures). Documents with same path prefix are routed to same shard, enabling efficient search within hierarchies. - HierarchicalRoutingProcessor: Routes documents during indexing - HierarchicalRoutingSearchProcessor: Routes queries to relevant shards - Consistent MurmurHash3-based routing between ingest and search - Configurable anchor depth, path separators, and field mapping Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Fix test asserts Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Fix failing tests and update per comments Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Yet more test fixes Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Fix more comments Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Mark flaky test for now Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Add more javadocs Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent 04bf3cb commit 8862c08

File tree

10 files changed

+1838
-3
lines changed

10 files changed

+1838
-3
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
12+
import org.opensearch.action.bulk.BulkRequest;
13+
import org.opensearch.action.bulk.BulkResponse;
14+
import org.opensearch.action.get.GetResponse;
15+
import org.opensearch.action.index.IndexRequest;
16+
import org.opensearch.action.search.SearchResponse;
17+
import org.opensearch.common.document.DocumentField;
18+
import org.opensearch.core.common.bytes.BytesReference;
19+
import org.opensearch.core.xcontent.MediaTypeRegistry;
20+
import org.opensearch.index.query.PrefixQueryBuilder;
21+
import org.opensearch.plugins.Plugin;
22+
import org.opensearch.search.SearchHit;
23+
import org.opensearch.search.builder.SearchSourceBuilder;
24+
import org.opensearch.test.OpenSearchIntegTestCase;
25+
26+
import java.util.Arrays;
27+
import java.util.Collection;
28+
import java.util.Map;
29+
30+
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.notNullValue;
33+
34+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
35+
public class HierarchicalRoutingProcessorIT extends OpenSearchIntegTestCase {
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> nodePlugins() {
39+
return Arrays.asList(IngestCommonModulePlugin.class);
40+
}
41+
42+
public void testHierarchicalRoutingProcessor() throws Exception {
43+
// Create ingest pipeline with hierarchical routing processor
44+
String pipelineId = "hierarchical-routing-test";
45+
BytesReference pipelineConfig = BytesReference.bytes(
46+
jsonBuilder().startObject()
47+
.startArray("processors")
48+
.startObject()
49+
.startObject("hierarchical_routing")
50+
.field("path_field", "file_path")
51+
.field("anchor_depth", 2)
52+
.field("path_separator", "/")
53+
.endObject()
54+
.endObject()
55+
.endArray()
56+
.endObject()
57+
);
58+
59+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
60+
61+
// Create index with multiple shards
62+
String indexName = "test-hierarchical-routing";
63+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
64+
Map.of("number_of_shards", 3, "number_of_replicas", 0, "index.default_pipeline", pipelineId)
65+
)
66+
.mapping(
67+
jsonBuilder().startObject()
68+
.startObject("properties")
69+
.startObject("file_path")
70+
.field("type", "keyword")
71+
.endObject()
72+
.startObject("content")
73+
.field("type", "text")
74+
.endObject()
75+
.endObject()
76+
.endObject()
77+
);
78+
79+
client().admin().indices().create(createIndexRequest).get();
80+
81+
// Index documents with same hierarchical prefix
82+
BulkRequest bulkRequest = new BulkRequest();
83+
84+
// Engineering documents (should get same routing)
85+
bulkRequest.add(
86+
new IndexRequest(indexName).id("1")
87+
.source(
88+
jsonBuilder().startObject()
89+
.field("file_path", "/company/engineering/backend/api.pdf")
90+
.field("content", "API documentation")
91+
.endObject()
92+
)
93+
);
94+
95+
bulkRequest.add(
96+
new IndexRequest(indexName).id("2")
97+
.source(
98+
jsonBuilder().startObject()
99+
.field("file_path", "/company/engineering/frontend/ui.pdf")
100+
.field("content", "UI guidelines")
101+
.endObject()
102+
)
103+
);
104+
105+
// Marketing documents (should get different routing)
106+
bulkRequest.add(
107+
new IndexRequest(indexName).id("3")
108+
.source(
109+
jsonBuilder().startObject()
110+
.field("file_path", "/company/marketing/campaigns/q1.pdf")
111+
.field("content", "Q1 campaign")
112+
.endObject()
113+
)
114+
);
115+
116+
BulkResponse bulkResponse = client().bulk(bulkRequest).get();
117+
assertFalse("Bulk indexing should succeed", bulkResponse.hasFailures());
118+
119+
// Refresh to make documents searchable
120+
client().admin().indices().prepareRefresh(indexName).get();
121+
122+
// Verify documents were routed correctly
123+
// We need to calculate the expected routing values to retrieve the documents
124+
String engineeringRouting = computeRouting("company/engineering");
125+
String marketingRouting = computeRouting("company/marketing");
126+
127+
GetResponse doc1 = client().prepareGet(indexName, "1").setRouting(engineeringRouting).get();
128+
GetResponse doc2 = client().prepareGet(indexName, "2").setRouting(engineeringRouting).get();
129+
GetResponse doc3 = client().prepareGet(indexName, "3").setRouting(marketingRouting).get();
130+
131+
assertTrue("Document 1 should exist", doc1.isExists());
132+
assertTrue("Document 2 should exist", doc2.isExists());
133+
assertTrue("Document 3 should exist", doc3.isExists());
134+
135+
// Check that routing was applied
136+
DocumentField doc1Routing = doc1.getField("_routing");
137+
DocumentField doc2Routing = doc2.getField("_routing");
138+
DocumentField doc3Routing = doc3.getField("_routing");
139+
140+
assertThat("Document 1 should have routing", doc1Routing, notNullValue());
141+
assertThat("Document 2 should have routing", doc2Routing, notNullValue());
142+
assertThat("Document 3 should have routing", doc3Routing, notNullValue());
143+
144+
// Documents 1 and 2 should have same routing (same anchor: /company/engineering)
145+
assertThat("Documents with same anchor should have same routing", doc1Routing.getValue(), equalTo(doc2Routing.getValue()));
146+
147+
// Document 3 should have different routing (different anchor: /company/marketing)
148+
assertNotEquals("Documents with different anchors should have different routing", doc1Routing.getValue(), doc3Routing.getValue());
149+
150+
// Test search functionality
151+
SearchResponse searchResponse = client().prepareSearch(indexName)
152+
.setSource(new SearchSourceBuilder().query(new PrefixQueryBuilder("file_path", "/company/engineering")))
153+
.get();
154+
155+
assertThat("Should find engineering documents", searchResponse.getHits().getTotalHits().value(), equalTo(2L));
156+
157+
for (SearchHit hit : searchResponse.getHits().getHits()) {
158+
String filePath = (String) hit.getSourceAsMap().get("file_path");
159+
assertTrue("Found document should be in engineering folder", filePath.startsWith("/company/engineering"));
160+
}
161+
}
162+
163+
public void testHierarchicalRoutingWithCustomSeparator() throws Exception {
164+
// Create pipeline with custom separator
165+
String pipelineId = "hierarchical-routing-custom-sep";
166+
BytesReference pipelineConfig = BytesReference.bytes(
167+
jsonBuilder().startObject()
168+
.startArray("processors")
169+
.startObject()
170+
.startObject("hierarchical_routing")
171+
.field("path_field", "windows_path")
172+
.field("anchor_depth", 2)
173+
.field("path_separator", "\\")
174+
.endObject()
175+
.endObject()
176+
.endArray()
177+
.endObject()
178+
);
179+
180+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
181+
182+
String indexName = "test-custom-separator";
183+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
184+
Map.of("number_of_shards", 2, "number_of_replicas", 0, "index.default_pipeline", pipelineId)
185+
);
186+
187+
client().admin().indices().create(createIndexRequest).get();
188+
189+
// Index document with Windows-style path
190+
IndexRequest indexRequest = new IndexRequest(indexName).id("win1")
191+
.source(
192+
jsonBuilder().startObject()
193+
.field("windows_path", "C:\\Users\\admin\\Documents\\file.txt")
194+
.field("content", "Windows document")
195+
.endObject()
196+
);
197+
198+
client().index(indexRequest).get();
199+
client().admin().indices().prepareRefresh(indexName).get();
200+
201+
// Calculate expected routing for Windows path
202+
String windowsRouting = computeRouting("C:\\Users", "\\");
203+
204+
GetResponse doc = client().prepareGet(indexName, "win1").setRouting(windowsRouting).get();
205+
assertTrue("Document should exist", doc.isExists());
206+
DocumentField routing = doc.getField("_routing");
207+
assertThat("Document should have routing", routing, notNullValue());
208+
}
209+
210+
public void testHierarchicalRoutingWithMissingField() throws Exception {
211+
// Create pipeline with ignore_missing = true
212+
String pipelineId = "hierarchical-routing-ignore-missing";
213+
BytesReference pipelineConfig = BytesReference.bytes(
214+
jsonBuilder().startObject()
215+
.startArray("processors")
216+
.startObject()
217+
.startObject("hierarchical_routing")
218+
.field("path_field", "nonexistent_field")
219+
.field("anchor_depth", 2)
220+
.field("ignore_missing", true)
221+
.endObject()
222+
.endObject()
223+
.endArray()
224+
.endObject()
225+
);
226+
227+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
228+
229+
String indexName = "test-ignore-missing";
230+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
231+
Map.of("number_of_shards", 2, "number_of_replicas", 0, "index.default_pipeline", pipelineId)
232+
);
233+
234+
client().admin().indices().create(createIndexRequest).get();
235+
236+
// Index document without the required field
237+
IndexRequest indexRequest = new IndexRequest(indexName).id("missing1")
238+
.source(
239+
jsonBuilder().startObject().field("other_field", "some value").field("content", "Document without path field").endObject()
240+
);
241+
242+
client().index(indexRequest).get();
243+
client().admin().indices().prepareRefresh(indexName).get();
244+
245+
// Document without path field should not have routing, so no routing needed for get
246+
GetResponse doc = client().prepareGet(indexName, "missing1").get();
247+
assertTrue("Document should be indexed even with missing field", doc.isExists());
248+
// Routing should be null since field was missing and ignored
249+
}
250+
251+
public void testHierarchicalRoutingProcessorRegistration() throws Exception {
252+
// Verify processor is registered by attempting to create a pipeline
253+
String pipelineId = "test-processor-registration";
254+
BytesReference pipelineConfig = BytesReference.bytes(
255+
jsonBuilder().startObject()
256+
.startArray("processors")
257+
.startObject()
258+
.startObject("hierarchical_routing")
259+
.field("path_field", "path")
260+
.field("anchor_depth", 1)
261+
.endObject()
262+
.endObject()
263+
.endArray()
264+
.endObject()
265+
);
266+
267+
// This should succeed if processor is properly registered
268+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
269+
270+
// Verify pipeline was created
271+
var getPipelineResponse = client().admin().cluster().prepareGetPipeline(pipelineId).get();
272+
assertTrue("Pipeline should be created successfully", getPipelineResponse.isFound());
273+
274+
// Clean up
275+
client().admin().cluster().prepareDeletePipeline(pipelineId).get();
276+
}
277+
278+
// Helper method to compute expected routing (mirrors processor logic)
279+
private String computeRouting(String anchor) {
280+
return computeRouting(anchor, "/");
281+
}
282+
283+
private String computeRouting(String anchor, String separator) {
284+
// This mirrors the logic in HierarchicalRoutingProcessor
285+
byte[] anchorBytes = anchor.getBytes(java.nio.charset.StandardCharsets.UTF_8);
286+
long hash = org.opensearch.common.hash.MurmurHash3.hash128(
287+
anchorBytes,
288+
0,
289+
anchorBytes.length,
290+
0,
291+
new org.opensearch.common.hash.MurmurHash3.Hash128()
292+
).h1;
293+
return String.valueOf(hash == Long.MIN_VALUE ? 0L : (hash < 0 ? -hash : hash));
294+
}
295+
}

0 commit comments

Comments
 (0)