Skip to content

Commit

Permalink
fix: simplify TicketResolver implementations
Browse files Browse the repository at this point in the history
Fixes a bug in SharedTicketResolver that was not noticed due to the duplicate logic needed to implement a TicketResolver.
  • Loading branch information
devinrsmith committed Oct 18, 2024
1 parent 22c20cf commit e208308
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.deephaven.server.session.TicketRouter;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -92,34 +93,14 @@ private <T> SessionState.ExportObject<T> resolve(final AppFieldId id, final Stri
}

@Override
public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
@Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
protected Flight.Ticket getTicket(FlightDescriptor descriptor, String logId) {
final AppFieldId id = appFieldIdFor(descriptor, logId);
if (id.app == null) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': field does not belong to an application");
}

final Flight.FlightInfo info;
synchronized (id.app) {
Field<?> field = id.app.getField(id.fieldName);
if (field == null) {
throw newNotFoundSRE(logId, id);
}
Object value = field.value();
if (value instanceof Table) {
// may return null if the table is not authorized
value = authorization.transform(value);
}

if (value instanceof Table) {
info = TicketRouter.getFlightInfo((Table) value, descriptor, flightTicketForName(id.app, id.fieldName));
} else {
throw newNotFoundSRE(logId, id);
}
}
return flightTicketForName(id.app, id.fieldName);
}

return SessionState.wrapAsExport(info);
@Override
protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) {
return newNotFoundSRE(logId, appFieldIdFor(descriptor, logId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ public void getSchema(
description, session == null ? null : session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY);

try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) {
final SessionState.ExportObject<Flight.FlightInfo> export =
ticketRouter.flightInfoFor(session, request, "request");
final SessionState.ExportObject<ByteString> export =
ticketRouter.getSchema(session, request, "request");

if (session != null) {
session.nonExport()
Expand All @@ -275,7 +275,7 @@ public void getSchema(
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.setSchema(export.get())
.build());
responseObserver.onCompleted();
});
Expand All @@ -287,15 +287,15 @@ public void getSchema(
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
GrpcUtil.safelyOnNext(responseObserver, Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.setSchema(export.get())
.build());
GrpcUtil.safelyComplete(responseObserver);
}
} finally {
export.dropReference();
}
} else {
exception = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info");
exception = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find schema");
responseObserver.onError(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.deephaven.server.session.TicketRouter;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -48,28 +49,13 @@ public String getLogNameFor(final ByteBuffer ticket, final String logId) {
}

@Override
public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
@Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
// there is no mechanism to wait for a scope variable to resolve; require that the scope variable exists now
final String scopeName = nameForDescriptor(descriptor, logId);

final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
final Object scopeVar = queryScope.unwrapObject(queryScope.readParamValue(scopeName, null));
if (scopeVar == null) {
throw newNotFoundSRE(logId, scopeName);
}
if (!(scopeVar instanceof Table)) {
throw newNotFoundSRE(logId, scopeName);
}

final Table transformed = authorization.transform((Table) scopeVar);
if (transformed == null) {
throw newNotFoundSRE(logId, scopeName);
}
final Flight.FlightInfo flightInfo =
TicketRouter.getFlightInfo(transformed, descriptor, flightTicketForName(scopeName));
protected Flight.Ticket getTicket(FlightDescriptor descriptor, String logId) {
return descriptorToTicket(descriptor, logId);
}

return SessionState.wrapAsExport(flightInfo);
@Override
protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) {
return newNotFoundSRE(logId, nameForDescriptor(descriptor, logId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package io.deephaven.server.session;

import com.google.rpc.Code;
import io.deephaven.engine.table.Table;
import io.deephaven.proto.flight.util.FlightExportTicketHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.server.auth.AuthorizationProvider;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.apache.arrow.flight.impl.Flight.Ticket;
import org.jetbrains.annotations.Nullable;

import javax.inject.Inject;
Expand Down Expand Up @@ -38,25 +40,14 @@ public String getLogNameFor(final ByteBuffer ticket, final String logId) {
}

@Override
public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
@Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
if (session == null) {
throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED,
"Could not resolve '" + logId + "': no exports can exist without a session to search");
}

final SessionState.ExportObject<?> export = resolve(session, descriptor, logId);
return session.<Flight.FlightInfo>nonExport()
.require(export)
.submit(() -> {
if (export.get() instanceof Table) {
return TicketRouter.getFlightInfo((Table) export.get(), descriptor,
FlightExportTicketHelper.descriptorToFlightTicket(descriptor, logId));
}
protected Ticket getTicket(FlightDescriptor descriptor, String logId) {
return FlightExportTicketHelper.descriptorToFlightTicket(descriptor, logId);
}

throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': flight '" + descriptor + "' not found");
});
@Override
protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) {
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': flight '" + descriptor + "' not found");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
Expand Down Expand Up @@ -117,7 +118,7 @@ public static <T> ExportObject<T> wrapAsExport(final T export) {
*/
public static <T> ExportObject<T> wrapAsFailedExport(final Exception caughtException) {
ExportObject<T> exportObject = new ExportObject<>(null);
exportObject.caughtException = caughtException;
exportObject.caughtException = Objects.requireNonNull(caughtException);
return exportObject;
}

Expand Down Expand Up @@ -633,6 +634,43 @@ public boolean isNonExport() {
return exportId == NON_EXPORT_ID;
}

/**
* Applies the function {@code f} to the results of {@code this} export.
*
* <p>
* In the case where {@code this} export was built with a {@link SessionState}, the function {@code f} will be
* invoked inside {@link ExportBuilder#submit(Callable) submit} with the same session.
*
* <p>
* In the case where {@code this} export was not built with a {@link SessionState}, the function {@code f} will
* be invoked on this thread.
*
* <p>
* In both cases, the resulting export is a "non-export".
*
* @param f the function
* @return the new export after applying
* @param <R> the new exported type
*/
public <R> ExportObject<R> map(Function<? super T, ? extends R> f) {
if (session == null) {
final T localResult = result;
if (localResult == null) {
return wrapAsFailedExport(caughtException);
}
final R r;
try {
r = f.apply(localResult);
} catch (RuntimeException e) {
return wrapAsFailedExport(e);
}
return wrapAsExport(r);
}
return session.<R>nonExport()
.require(this)
.submit(() -> f.apply(ExportObject.this.get()));
}

private synchronized void setQueryPerformanceRecorder(
final QueryPerformanceRecorder queryPerformanceRecorder) {
if (this.queryPerformanceRecorder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.engine.table.Table;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.flight.util.FlightExportTicketHelper;
import io.deephaven.proto.flight.util.TicketRouterHelper;
import io.deephaven.proto.util.ByteHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.SharedTicketHelper;
import io.deephaven.server.auth.AuthorizationProvider;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.apache.arrow.flight.impl.Flight.Ticket;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -53,34 +53,13 @@ public String getLogNameFor(ByteBuffer ticket, String logId) {
}

@Override
public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
@Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
if (session == null) {
throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, String.format(
"Could not resolve '%s': no session to handoff to", logId));
}

final ByteString sharedId = idForDescriptor(descriptor, logId);

SessionState.ExportObject<?> export = sharedVariables.get(sharedId);
if (export == null) {
throw newNotFoundSRE(logId, toHexString(sharedId));
}

return session.<Flight.FlightInfo>nonExport()
.require(export)
.submit(() -> {
Object result = export.get();
if (result instanceof Table) {
result = authorization.transform(result);
}
if (result instanceof Table) {
return TicketRouter.getFlightInfo((Table) result, descriptor,
FlightExportTicketHelper.descriptorToFlightTicket(descriptor, logId));
}
protected Ticket getTicket(FlightDescriptor descriptor, String logId) {
return descriptorToTicket(descriptor, logId);
}

throw newNotFoundSRE(logId, toHexString(sharedId));
});
@Override
protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) {
return newNotFoundSRE(logId, toHexString(idForDescriptor(descriptor, logId)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.server.session;

import com.google.protobuf.ByteString;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.PartitionedTable;
import io.deephaven.engine.table.Table;
Expand Down Expand Up @@ -148,11 +149,21 @@ default <T> void publish(
.submit(source::get);
}

/**
* Retrieve the ByteString Schema for a given FlightDescriptor.
*
* @param descriptor the flight descriptor to retrieve a schema for
* @param logId an end-user friendly identification of the descriptor should an error occur
* @return a FlightInfo describing this flight
*/
SessionState.ExportObject<ByteString> getSchema(@Nullable SessionState session, Flight.FlightDescriptor descriptor,
String logId);

/**
* Retrieve a FlightInfo for a given FlightDescriptor.
*
* @param descriptor the flight descriptor to retrieve a ticket for
* @param logId an end-user friendly identification of the ticket should an error occur
* @param descriptor the flight descriptor to retrieve flight info for
* @param logId an end-user friendly identification of the descriptor should an error occur
* @return a FlightInfo describing this flight
*/
SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(@Nullable SessionState session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
//
package io.deephaven.server.session;

import com.google.protobuf.ByteString;
import io.deephaven.engine.table.Table;
import io.deephaven.server.auth.AuthorizationProvider;
import io.deephaven.server.session.SessionState.ExportObject;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.apache.arrow.flight.impl.Flight.FlightInfo;
import org.jetbrains.annotations.Nullable;

public abstract class TicketResolverBase implements TicketResolver {

Expand All @@ -28,4 +36,35 @@ public byte ticketRoute() {
public String flightDescriptorRoute() {
return flightDescriptorRoute;
}

// These methods can be elevated to TicketResolver in the future if necessary.

protected abstract Flight.Ticket getTicket(FlightDescriptor descriptor, String logId);

protected abstract StatusRuntimeException notFound(FlightDescriptor descriptor, String logId);

private SessionState.ExportObject<Table> resolveTable(@Nullable SessionState session,
Flight.FlightDescriptor descriptor, String logId) {
return resolve(session, descriptor, logId).map(x -> asTable(x, descriptor, logId));
}

private Table asTable(Object o, FlightDescriptor descriptor, String logId) {
if (!(o instanceof Table)) {
throw notFound(descriptor, logId);
}
return (Table) o;
}

@Override
public final ExportObject<ByteString> getSchema(@Nullable SessionState session, FlightDescriptor descriptor,
String logId) {
return resolveTable(session, descriptor, logId).map(TicketRouter::getSchema);
}

@Override
public final ExportObject<FlightInfo> flightInfoFor(@Nullable SessionState session, FlightDescriptor descriptor,
String logId) {
return resolveTable(session, descriptor, logId)
.map(table -> TicketRouter.getFlightInfo(table, descriptor, getTicket(descriptor, logId)));
}
}
Loading

0 comments on commit e208308

Please sign in to comment.