Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User Behavior Insights implementation for Apache Solr #2452

Draft
wants to merge 62 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
1a2afd0
address @link not working with precommit, now it does
epugh May 8, 2024
80f5c80
commented code is confusing to reader...
epugh May 8, 2024
e5450b8
address some intellj prompted warnings
epugh May 8, 2024
74b395b
Remove deprecated BlobRepository
epugh May 8, 2024
79863b4
checkpoint
epugh May 8, 2024
d9f3702
Merge remote-tracking branch 'upstream/main' into ubi
epugh May 8, 2024
4fbfa9d
better name, but I don't have the output pattern working yet
epugh May 8, 2024
80123eb
write to console out the docids, and add a hard coded query_id
epugh May 8, 2024
f6cb356
now handling passing in query_id instead of internally generated, and…
epugh May 8, 2024
861f922
now logging user_query as a map (hash) to our jsonl log file
epugh May 9, 2024
eaa56e1
Log wasn't really working, we want complex nested data, so lets not t…
epugh May 9, 2024
aa45a83
actually track the doc_ids in our jsonl file
epugh May 9, 2024
9523c18
tidy
epugh May 9, 2024
c7a939e
provide more context to how to use UBI
epugh May 15, 2024
a6d33a9
techproducts gives a better example because of the inStock filter
epugh May 20, 2024
34a04bf
Working on trying to get streaming expressions to provide a pluggable…
epugh May 21, 2024
43475c9
precommit failures
epugh May 21, 2024
f0d2cb6
cut n paste error
epugh May 21, 2024
9168998
policeman failures fixed
epugh May 21, 2024
6364427
argh, let the precommit pass
epugh May 21, 2024
01ee6c7
tidy!
epugh May 21, 2024
994bd34
making user_query as a string, and then introducing query_attributes …
epugh May 21, 2024
b1fc34e
Streaming expression dummy expression is now running
epugh May 22, 2024
607588b
tidy
epugh May 22, 2024
7d10403
allow ubi to run in non solrcloud mode.. boo
epugh May 22, 2024
7a365c1
Introduce an explicit UBIQuery class to wrap up the data required..
epugh May 25, 2024
0962c19
Merge remote-tracking branch 'upstream/main' into ubi
epugh Aug 16, 2024
cecd075
some weridness in merging...
epugh Aug 16, 2024
cb8a2d1
deal with merge changes..
epugh Aug 16, 2024
3f79110
we don't need this as part of UBI, so don't add it...
epugh Aug 16, 2024
2450734
we aren't using ubi with this minimal example
epugh Aug 16, 2024
d3b5cf3
lint
epugh Aug 16, 2024
81faf19
clean up the tests...
epugh Aug 16, 2024
7c31300
Strip out writing to a log file, we have a general purpose streaming..
epugh Aug 19, 2024
af8d3f0
Properly deal with solrClientCache and object tracking. Cleaning up …
epugh Aug 19, 2024
9b47b0b
lets use streaming for local
epugh Oct 8, 2024
7277248
Track progress so I dont lose it
epugh Oct 10, 2024
64948b9
Track changes
epugh Oct 12, 2024
a646b68
Introduce better test of the LogStream
epugh Oct 12, 2024
9c98ab6
Polishing up the rendering of the expression
epugh Oct 12, 2024
17f05ed
Handle exception when loading streams.
epugh Oct 12, 2024
d198f7e
Log --> Logging to avoid confusion with "logarithmic"
epugh Oct 12, 2024
b93457f
Lint
epugh Oct 12, 2024
e0a59bf
Manage streamcontexts better
epugh Oct 12, 2024
aaa3b06
Clean up unused code
epugh Oct 12, 2024
133c213
Language improvement
epugh Oct 12, 2024
c1dd697
lint
epugh Oct 12, 2024
49d9279
Merge branch 'main' into ubi
epugh Oct 12, 2024
ef90a7d
Rollback the idea of logging via log4j, it's too awkward...
epugh Oct 13, 2024
228bb99
Remove whitespace
epugh Oct 13, 2024
2e639c5
track the changes
epugh Oct 13, 2024
e15cf65
Merge remote-tracking branch 'upstream/main' into ubi
epugh Nov 13, 2024
d1e215e
Lint
epugh Nov 13, 2024
47d57ea
Integration test of simple UBI setup.
epugh Nov 13, 2024
9e045e6
Auditing this test to try and remember what it is for
epugh Nov 13, 2024
96da29c
Lint
epugh Nov 13, 2024
8d928fa
Simplify naming
epugh Nov 13, 2024
5e691ae
Lint
epugh Nov 13, 2024
d555b40
Now able to stream updates
epugh Nov 14, 2024
95880d6
Merge remote-tracking branch 'upstream/main' into ubi
epugh Nov 14, 2024
e539669
Source UBI events from the component
epugh Nov 14, 2024
fa209fe
Be clearer
epugh Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Sends tuples emitted by a wrapped {@link TupleStream} as writes to a log file. The log file will
* be created in the "userfiles" directory and formatted in the JSON w/ Lines format.
*
* <p>I really want to call this the DogStream, as it matches the CatStream.
*
* <p>Is this generically useful to be added to the streaming jar and Lang?
*
* <p>WriterStream? LoggingStream? FileoutputStream? JsonOutputStream? LoggingStream??
*
* @since 9.8.0
*/
public class LoggingStream extends TupleStream implements Expressible {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

// field name in summary tuple for #docs updated in batch
public static String BATCH_LOGGED_FIELD_NAME = "batchLogged";

private Path chroot;

/**
* The name of the log file that should be written to. This will be in the same directory that the
* CatStream is allowed to write to.
*/
private String filepath;

private int updateBatchSize;

private int batchNumber;
private long totalDocsIndex;
// private PushBackStream tupleSource;
private TupleStream tupleSource;
private List<SolrInputDocument> documentBatch = new ArrayList<>();

private OutputStream fos;
private final CharArr charArr = new CharArr(1024 * 2);
JSONWriter jsonWriter = new JSONWriter(charArr, -1);
private Writer writer;

public LoggingStream(StreamExpression expression, StreamFactory factory) throws IOException {

filepath = factory.getValueOperand(expression, 0);
if (filepath == null) {
throw new IllegalArgumentException("No filepath provided to log stream to");
}
final String filepathWithoutSurroundingQuotes = stripSurroundingQuotesIfTheyExist(filepath);
if (StrUtils.isNullOrEmpty(filepathWithoutSurroundingQuotes)) {
throw new IllegalArgumentException("No filepath provided to stream");
}

this.filepath = filepathWithoutSurroundingQuotes;

// Extract underlying TupleStream.
List<StreamExpression> streamExpressions =
factory.getExpressionOperandsRepresentingTypes(
expression, Expressible.class, TupleStream.class);
if (1 != streamExpressions.size()) {
throw new IOException(
String.format(
Locale.ROOT,
"Invalid expression %s - expecting a single stream but found %d",
expression,
streamExpressions.size()));
}
StreamExpression sourceStreamExpression = streamExpressions.get(0);
init(filepathWithoutSurroundingQuotes, factory.constructStream(sourceStreamExpression));
}

public LoggingStream(String filepath, TupleStream tupleSource) throws IOException {

init(filepath, tupleSource);
}

private void init(String filepath, TupleStream tupleSource) {
this.filepath = filepath;
this.tupleSource = tupleSource;
}

/** The name of the file being updated */
protected String getFilePath() {
return filepath;
}

@Override
public void open() throws IOException {
Path filePath = chroot.resolve(filepath).normalize();
if (!filePath.startsWith(chroot)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "file to log to must be under " + chroot);
}

fos = new FileOutputStream(filePath.toFile(), true);
writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8);

tupleSource.open();
}

@Override
public Tuple read() throws IOException {

Tuple tuple = tupleSource.read();
if (tuple.EOF) {

return tuple;
} else {
// tupleSource.pushBack(tuple);
uploadBatchToCollection(tuple);
// return createBatchSummaryTuple(b);
}

// uploadBatchToCollection(documentBatch);
// int b = documentBatch.size();
// documentBatch.clear();
int b = 1;
return createBatchSummaryTuple(b);
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.flush();
}
if (fos != null) {
fos.flush();
fos.close();
}
tupleSource.close();
}

@Override
public StreamComparator getStreamSort() {
return tupleSource.getStreamSort();
}

@Override
public List<TupleStream> children() {
ArrayList<TupleStream> sourceList = new ArrayList<>(1);
sourceList.add(tupleSource);
return sourceList;
}

@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
return toExpression(factory, true);
}

private StreamExpression toExpression(StreamFactory factory, boolean includeStreams)
throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(filepath);

if (includeStreams) {
if (tupleSource != null) {
expression.addParameter(((Expressible) tupleSource).toExpression(factory));
} else {
throw new IOException(
"This LoggingStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
} else {
expression.addParameter("<stream>");
}

return expression;
}

@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {

StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore");

explanation.setFunctionName(String.format(Locale.ROOT, "logging (%s)", filepath));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.DATASTORE);
explanation.setExpression("Log tuples into " + filepath);

// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId().toString());
child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass())));
child.setImplementingClass(getClass().getName());
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(toExpression(factory, false).toString());
child.addChild(tupleSource.toExplanation(factory));

explanation.addChild(child);

return explanation;
}

@Override
public void setStreamContext(StreamContext context) {
Object solrCoreObj = context.get("solr-core");
if (solrCoreObj == null || !(solrCoreObj instanceof SolrCore)) {
throw new SolrException(
SolrException.ErrorCode.INVALID_STATE,
"StreamContext must have SolrCore in solr-core key");
}
final SolrCore core = (SolrCore) context.get("solr-core");

this.chroot = core.getCoreContainer().getUserFilesPath();
if (!Files.exists(chroot)) {
try {
Files.createDirectories(this.chroot);
} catch (IOException ioe) {
throw new SolrException(
SolrException.ErrorCode.INVALID_STATE,
chroot + " directory used to load files must exist but and couldn't be created!");
}
}

// Pass down the stream context.
this.tupleSource.setStreamContext(context);
}

// private SolrInputDocument convertTupleTJson(Tuple tuple) {
// SolrInputDocument doc = new SolrInputDocument();
// for (String field : tuple.getFields().keySet()) {
//
// if (!(field.equals(CommonParams.VERSION_FIELD) )) {
// Object value = tuple.get(field);
// if (value instanceof List) {
// addMultivaluedField(doc, field, (List<?>) value);
// } else {
// doc.addField(field, value);
// }
// }
// }
// log.debug("Tuple [{}] was converted into SolrInputDocument [{}].", tuple, doc);
// jsonWriter
// return doc;
// }

private void addMultivaluedField(SolrInputDocument doc, String fieldName, List<?> values) {
for (Object value : values) {
doc.addField(fieldName, value);
}
}

/**
* This method will be called on every batch of tuples consumed, after converting each tuple in
* that batch to a Solr Input Document.
*/
protected void uploadBatchToCollection(Tuple doc) throws IOException {
charArr.reset();
// doc.toMap()
// Map<String, Object> m =doc.toMap(<String, Object>)
// doc.forEach(
// (s, field) -> {
// if (s.equals("_version_") || s.equals("_roor_")) return;
// if (field instanceof List) {
// if (((List<?>) field).size() == 1) {
// field = ((List<?>) field).get(0);
// }
// }
// field = constructDateStr(field);
// if (field instanceof List) {
// List<?> list = (List<?>) field;
// if (hasdate(list)) {
// ArrayList<Object> listCopy = new ArrayList<>(list.size());
// for (Object o : list) listCopy.add(constructDateStr(o));
// field = listCopy;
// }
// }
// m.put(s, field);
// });
// jsonWriter.write(m);
jsonWriter.write(doc);
writer.write(charArr.getArray(), charArr.getStart(), charArr.getEnd());
writer.append('\n');
}

private Tuple createBatchSummaryTuple(int batchSize) {
assert batchSize > 0;
Tuple tuple = new Tuple();
this.totalDocsIndex += batchSize;
++batchNumber;
tuple.put(BATCH_LOGGED_FIELD_NAME, batchSize);
tuple.put("totalIndexed", this.totalDocsIndex);
tuple.put("batchNumber", batchNumber);
// if (coreName != null) {
// tuple.put("worker", coreName);
// }
return tuple;
}

private String stripSurroundingQuotesIfTheyExist(String value) {
if (value.length() < 2) return value;
if ((value.startsWith("\"") && value.endsWith("\""))
|| (value.startsWith("'") && value.endsWith("'"))) {
return value.substring(1, value.length() - 1);
}

return value;
}
}
Loading
Loading