diff --git a/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java b/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java index a2084a87ebd..ac5845cd618 100644 --- a/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java @@ -19,6 +19,7 @@ import io.deephaven.server.session.TicketRouter; import io.grpc.StatusRuntimeException; import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.impl.Flight.FlightDescriptor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -92,34 +93,14 @@ private SessionState.ExportObject resolve(final AppFieldId id, final Stri } @Override - public SessionState.ExportObject flightInfoFor( - @Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) { + protected Flight.Ticket getTicket(FlightDescriptor descriptor, String logId) { final AppFieldId id = appFieldIdFor(descriptor, logId); - if (id.app == null) { - throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, - "Could not resolve '" + logId + "': field does not belong to an application"); - } - - final Flight.FlightInfo info; - synchronized (id.app) { - Field field = id.app.getField(id.fieldName); - if (field == null) { - throw newNotFoundSRE(logId, id); - } - Object value = field.value(); - if (value instanceof Table) { - // may return null if the table is not authorized - value = authorization.transform(value); - } - - if (value instanceof Table) { - info = TicketRouter.getFlightInfo((Table) value, descriptor, flightTicketForName(id.app, id.fieldName)); - } else { - throw newNotFoundSRE(logId, id); - } - } + return flightTicketForName(id.app, id.fieldName); + } - return SessionState.wrapAsExport(info); + @Override + protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) { + return newNotFoundSRE(logId, appFieldIdFor(descriptor, logId)); } @Override diff --git a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java index ac3bedf066d..07edc145d36 100644 --- a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java @@ -265,8 +265,8 @@ public void getSchema( description, session == null ? null : session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY); try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) { - final SessionState.ExportObject export = - ticketRouter.flightInfoFor(session, request, "request"); + final SessionState.ExportObject export = + ticketRouter.getSchema(session, request, "request"); if (session != null) { session.nonExport() @@ -275,7 +275,7 @@ public void getSchema( .onError(responseObserver) .submit(() -> { responseObserver.onNext(Flight.SchemaResult.newBuilder() - .setSchema(export.get().getSchema()) + .setSchema(export.get()) .build()); responseObserver.onCompleted(); }); @@ -287,7 +287,7 @@ public void getSchema( try { if (export.getState() == ExportNotification.State.EXPORTED) { GrpcUtil.safelyOnNext(responseObserver, Flight.SchemaResult.newBuilder() - .setSchema(export.get().getSchema()) + .setSchema(export.get()) .build()); GrpcUtil.safelyComplete(responseObserver); } @@ -295,7 +295,7 @@ public void getSchema( export.dropReference(); } } else { - exception = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info"); + exception = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find schema"); responseObserver.onError(exception); } diff --git a/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java b/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java index 2ee01446d39..9180b0fe2df 100644 --- a/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java @@ -20,6 +20,7 @@ import io.deephaven.server.session.TicketRouter; import io.grpc.StatusRuntimeException; import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.impl.Flight.FlightDescriptor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -48,28 +49,13 @@ public String getLogNameFor(final ByteBuffer ticket, final String logId) { } @Override - public SessionState.ExportObject flightInfoFor( - @Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) { - // there is no mechanism to wait for a scope variable to resolve; require that the scope variable exists now - final String scopeName = nameForDescriptor(descriptor, logId); - - final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); - final Object scopeVar = queryScope.unwrapObject(queryScope.readParamValue(scopeName, null)); - if (scopeVar == null) { - throw newNotFoundSRE(logId, scopeName); - } - if (!(scopeVar instanceof Table)) { - throw newNotFoundSRE(logId, scopeName); - } - - final Table transformed = authorization.transform((Table) scopeVar); - if (transformed == null) { - throw newNotFoundSRE(logId, scopeName); - } - final Flight.FlightInfo flightInfo = - TicketRouter.getFlightInfo(transformed, descriptor, flightTicketForName(scopeName)); + protected Flight.Ticket getTicket(FlightDescriptor descriptor, String logId) { + return descriptorToTicket(descriptor, logId); + } - return SessionState.wrapAsExport(flightInfo); + @Override + protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) { + return newNotFoundSRE(logId, nameForDescriptor(descriptor, logId)); } @Override diff --git a/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java b/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java index b72448b5166..52668a1d7d8 100644 --- a/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java @@ -4,12 +4,14 @@ package io.deephaven.server.session; import com.google.rpc.Code; -import io.deephaven.engine.table.Table; import io.deephaven.proto.flight.util.FlightExportTicketHelper; import io.deephaven.proto.util.Exceptions; import io.deephaven.proto.util.ExportTicketHelper; import io.deephaven.server.auth.AuthorizationProvider; +import io.grpc.StatusRuntimeException; import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.impl.Flight.FlightDescriptor; +import org.apache.arrow.flight.impl.Flight.Ticket; import org.jetbrains.annotations.Nullable; import javax.inject.Inject; @@ -38,25 +40,14 @@ public String getLogNameFor(final ByteBuffer ticket, final String logId) { } @Override - public SessionState.ExportObject flightInfoFor( - @Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) { - if (session == null) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, - "Could not resolve '" + logId + "': no exports can exist without a session to search"); - } - - final SessionState.ExportObject export = resolve(session, descriptor, logId); - return session.nonExport() - .require(export) - .submit(() -> { - if (export.get() instanceof Table) { - return TicketRouter.getFlightInfo((Table) export.get(), descriptor, - FlightExportTicketHelper.descriptorToFlightTicket(descriptor, logId)); - } + protected Ticket getTicket(FlightDescriptor descriptor, String logId) { + return FlightExportTicketHelper.descriptorToFlightTicket(descriptor, logId); + } - throw Exceptions.statusRuntimeException(Code.NOT_FOUND, - "Could not resolve '" + logId + "': flight '" + descriptor + "' not found"); - }); + @Override + protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) { + throw Exceptions.statusRuntimeException(Code.NOT_FOUND, + "Could not resolve '" + logId + "': flight '" + descriptor + "' not found"); } @Override diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 74b5aafc304..d9c603152b1 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -55,6 +55,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; +import java.util.function.Function; import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; @@ -117,7 +118,7 @@ public static ExportObject wrapAsExport(final T export) { */ public static ExportObject wrapAsFailedExport(final Exception caughtException) { ExportObject exportObject = new ExportObject<>(null); - exportObject.caughtException = caughtException; + exportObject.caughtException = Objects.requireNonNull(caughtException); return exportObject; } @@ -633,6 +634,43 @@ public boolean isNonExport() { return exportId == NON_EXPORT_ID; } + /** + * Applies the function {@code f} to the results of {@code this} export. + * + *

+ * In the case where {@code this} export was built with a {@link SessionState}, the function {@code f} will be + * invoked inside {@link ExportBuilder#submit(Callable) submit} with the same session. + * + *

+ * In the case where {@code this} export was not built with a {@link SessionState}, the function {@code f} will + * be invoked on this thread. + * + *

+ * In both cases, the resulting export is a "non-export". + * + * @param f the function + * @return the new export after applying + * @param the new exported type + */ + public ExportObject map(Function f) { + if (session == null) { + final T localResult = result; + if (localResult == null) { + return wrapAsFailedExport(caughtException); + } + final R r; + try { + r = f.apply(localResult); + } catch (RuntimeException e) { + return wrapAsFailedExport(e); + } + return wrapAsExport(r); + } + return session.nonExport() + .require(this) + .submit(() -> f.apply(ExportObject.this.get())); + } + private synchronized void setQueryPerformanceRecorder( final QueryPerformanceRecorder queryPerformanceRecorder) { if (this.queryPerformanceRecorder != null) { diff --git a/server/src/main/java/io/deephaven/server/session/SharedTicketResolver.java b/server/src/main/java/io/deephaven/server/session/SharedTicketResolver.java index 02230b4b45e..d10fa91a0a0 100644 --- a/server/src/main/java/io/deephaven/server/session/SharedTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/session/SharedTicketResolver.java @@ -7,9 +7,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.ByteStringAccess; import com.google.rpc.Code; -import io.deephaven.engine.table.Table; import io.deephaven.proto.backplane.grpc.ExportNotification; -import io.deephaven.proto.flight.util.FlightExportTicketHelper; import io.deephaven.proto.flight.util.TicketRouterHelper; import io.deephaven.proto.util.ByteHelper; import io.deephaven.proto.util.Exceptions; @@ -17,6 +15,8 @@ import io.deephaven.server.auth.AuthorizationProvider; import io.grpc.StatusRuntimeException; import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.impl.Flight.FlightDescriptor; +import org.apache.arrow.flight.impl.Flight.Ticket; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -53,34 +53,13 @@ public String getLogNameFor(ByteBuffer ticket, String logId) { } @Override - public SessionState.ExportObject flightInfoFor( - @Nullable final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) { - if (session == null) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, String.format( - "Could not resolve '%s': no session to handoff to", logId)); - } - - final ByteString sharedId = idForDescriptor(descriptor, logId); - - SessionState.ExportObject export = sharedVariables.get(sharedId); - if (export == null) { - throw newNotFoundSRE(logId, toHexString(sharedId)); - } - - return session.nonExport() - .require(export) - .submit(() -> { - Object result = export.get(); - if (result instanceof Table) { - result = authorization.transform(result); - } - if (result instanceof Table) { - return TicketRouter.getFlightInfo((Table) result, descriptor, - FlightExportTicketHelper.descriptorToFlightTicket(descriptor, logId)); - } + protected Ticket getTicket(FlightDescriptor descriptor, String logId) { + return descriptorToTicket(descriptor, logId); + } - throw newNotFoundSRE(logId, toHexString(sharedId)); - }); + @Override + protected StatusRuntimeException notFound(FlightDescriptor descriptor, String logId) { + return newNotFoundSRE(logId, toHexString(idForDescriptor(descriptor, logId))); } @Override diff --git a/server/src/main/java/io/deephaven/server/session/TicketResolver.java b/server/src/main/java/io/deephaven/server/session/TicketResolver.java index cb74ffcc49f..78b1311d398 100644 --- a/server/src/main/java/io/deephaven/server/session/TicketResolver.java +++ b/server/src/main/java/io/deephaven/server/session/TicketResolver.java @@ -3,6 +3,7 @@ // package io.deephaven.server.session; +import com.google.protobuf.ByteString; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.PartitionedTable; import io.deephaven.engine.table.Table; @@ -148,11 +149,21 @@ default void publish( .submit(source::get); } + /** + * Retrieve the ByteString Schema for a given FlightDescriptor. + * + * @param descriptor the flight descriptor to retrieve a schema for + * @param logId an end-user friendly identification of the descriptor should an error occur + * @return a FlightInfo describing this flight + */ + SessionState.ExportObject getSchema(@Nullable SessionState session, Flight.FlightDescriptor descriptor, + String logId); + /** * Retrieve a FlightInfo for a given FlightDescriptor. * - * @param descriptor the flight descriptor to retrieve a ticket for - * @param logId an end-user friendly identification of the ticket should an error occur + * @param descriptor the flight descriptor to retrieve flight info for + * @param logId an end-user friendly identification of the descriptor should an error occur * @return a FlightInfo describing this flight */ SessionState.ExportObject flightInfoFor(@Nullable SessionState session, diff --git a/server/src/main/java/io/deephaven/server/session/TicketResolverBase.java b/server/src/main/java/io/deephaven/server/session/TicketResolverBase.java index b206000f334..1ad2f599282 100644 --- a/server/src/main/java/io/deephaven/server/session/TicketResolverBase.java +++ b/server/src/main/java/io/deephaven/server/session/TicketResolverBase.java @@ -3,7 +3,15 @@ // package io.deephaven.server.session; +import com.google.protobuf.ByteString; +import io.deephaven.engine.table.Table; import io.deephaven.server.auth.AuthorizationProvider; +import io.deephaven.server.session.SessionState.ExportObject; +import io.grpc.StatusRuntimeException; +import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.impl.Flight.FlightDescriptor; +import org.apache.arrow.flight.impl.Flight.FlightInfo; +import org.jetbrains.annotations.Nullable; public abstract class TicketResolverBase implements TicketResolver { @@ -28,4 +36,35 @@ public byte ticketRoute() { public String flightDescriptorRoute() { return flightDescriptorRoute; } + + // These methods can be elevated to TicketResolver in the future if necessary. + + protected abstract Flight.Ticket getTicket(FlightDescriptor descriptor, String logId); + + protected abstract StatusRuntimeException notFound(FlightDescriptor descriptor, String logId); + + private SessionState.ExportObject resolveTable(@Nullable SessionState session, + Flight.FlightDescriptor descriptor, String logId) { + return resolve(session, descriptor, logId).map(x -> asTable(x, descriptor, logId)); + } + + private Table asTable(Object o, FlightDescriptor descriptor, String logId) { + if (!(o instanceof Table)) { + throw notFound(descriptor, logId); + } + return (Table) o; + } + + @Override + public final ExportObject getSchema(@Nullable SessionState session, FlightDescriptor descriptor, + String logId) { + return resolveTable(session, descriptor, logId).map(TicketRouter::getSchema); + } + + @Override + public final ExportObject flightInfoFor(@Nullable SessionState session, FlightDescriptor descriptor, + String logId) { + return resolveTable(session, descriptor, logId) + .map(table -> TicketRouter.getFlightInfo(table, descriptor, getTicket(descriptor, logId))); + } } diff --git a/server/src/main/java/io/deephaven/server/session/TicketRouter.java b/server/src/main/java/io/deephaven/server/session/TicketRouter.java index 400bb3cf4b8..15d4dd3bea0 100644 --- a/server/src/main/java/io/deephaven/server/session/TicketRouter.java +++ b/server/src/main/java/io/deephaven/server/session/TicketRouter.java @@ -3,6 +3,7 @@ // package io.deephaven.server.session; +import com.google.protobuf.ByteString; import com.google.rpc.Code; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; @@ -266,6 +267,16 @@ public SessionState.ExportObject flightInfoFor( } } + public SessionState.ExportObject getSchema( + @Nullable final SessionState session, + final Flight.FlightDescriptor descriptor, + final String logId) { + try (final SafeCloseable ignored = QueryPerformanceRecorder.getInstance().getNugget( + "getSchemaDescriptor:" + descriptor)) { + return getResolver(descriptor, logId).getSchema(session, descriptor, logId); + } + } + /** * Create a human readable string to identify this ticket. * @@ -309,6 +320,10 @@ public void visitFlightInfo(@Nullable final SessionState session, final Consumer byteResolverMap.iterator().forEachRemaining(resolver -> resolver.forAllFlightInfo(session, visitor)); } + public static ByteString getSchema(final Table table) { + return BarrageUtil.schemaBytesFromTable(table); + } + public static Flight.FlightInfo getFlightInfo(final Table table, final Flight.FlightDescriptor descriptor, final Flight.Ticket ticket) {