Skip to content

Commit

Permalink
perf(datasource/graphene): group together segment id requests within …
Browse files Browse the repository at this point in the history
…the same bounding box (leaves_many)
  • Loading branch information
chrisj committed Apr 25, 2024
1 parent 2d7862a commit 6895ccf
Showing 1 changed file with 93 additions and 44 deletions.
137 changes: 93 additions & 44 deletions src/datasource/graphene/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,33 @@

import { debounce } from "lodash-es";
import {
WithParameters,
withChunkManager,
Chunk,
ChunkSource,
WithParameters,
withChunkManager,
} from "#src/chunk_manager/backend.js";
import { ChunkPriorityTier, ChunkState } from "#src/chunk_manager/base.js";
import type { CredentialsProvider } from "#src/credentials_provider/index.js";
import { WithSharedCredentialsProviderCounterpart } from "#src/credentials_provider/shared_counterpart.js";
import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js";
import {
getGrapheneFragmentKey,
GRAPHENE_MESH_NEW_SEGMENT_RPC_ID,
responseIdentity,
ChunkedGraphSourceParameters,
MeshSourceParameters,
CHUNKED_GRAPH_LAYER_RPC_ID,
CHUNKED_GRAPH_RENDER_LAYER_UPDATE_SOURCES_RPC_ID,
ChunkedGraphSourceParameters,
GRAPHENE_MESH_NEW_SEGMENT_RPC_ID,
MeshSourceParameters,
RENDER_RATIO_LIMIT,
getGrapheneFragmentKey,
isBaseSegmentId,
} from "#src/datasource/graphene/base.js";
import { decodeManifestChunk } from "#src/datasource/precomputed/backend.js";
import type { FragmentChunk, ManifestChunk } from "#src/mesh/backend.js";
import { assignMeshFragmentData, MeshSource } from "#src/mesh/backend.js";
import { MeshSource, assignMeshFragmentData } from "#src/mesh/backend.js";
import { decodeDraco } from "#src/mesh/draco/index.js";
import type { DisplayDimensionRenderInfo } from "#src/navigation_state.js";
import type {
RenderedViewBackend,
RenderLayerBackendAttachment,
RenderedViewBackend,
} from "#src/render_layer_backend.js";
import { RenderLayerBackend } from "#src/render_layer_backend.js";
import { withSegmentationLayerBackendState } from "#src/segmentation_display_state/backend.js";
Expand All @@ -51,8 +51,8 @@ import type { SharedWatchableValue } from "#src/shared_watchable_value.js";
import type { SliceViewChunkSourceBackend } from "#src/sliceview/backend.js";
import { deserializeTransformedSources } from "#src/sliceview/backend.js";
import type {
TransformedSource,
SliceViewProjectionParameters,
TransformedSource,
} from "#src/sliceview/base.js";
import {
forEachPlaneIntersectingVolumetricChunk,
Expand All @@ -61,9 +61,13 @@ import {
import { computeChunkBounds } from "#src/sliceview/volume/backend.js";
import { Uint64Set } from "#src/uint64_set.js";
import { fetchSpecialHttpByteRange } from "#src/util/byte_range_http_requests.js";
import type { CancellationToken } from "#src/util/cancellation.js";
import {
CancellationTokenSource,
type CancellationToken,
} from "#src/util/cancellation.js";
import { vec3, vec3Key } from "#src/util/geom.js";
import { responseArrayBuffer, responseJson } from "#src/util/http_request.js";
import { Signal } from "#src/util/signal.js";
import type {
SpecialProtocolCredentials,
SpecialProtocolCredentialsProvider,
Expand All @@ -76,7 +80,7 @@ import {
withSharedVisibility,
} from "#src/visibility_priority/backend.js";
import type { RPC } from "#src/worker_rpc.js";
import { registerSharedObject, registerRPC } from "#src/worker_rpc.js";
import { registerRPC, registerSharedObject } from "#src/worker_rpc.js";

function getVerifiedFragmentPromise(
credentialsProvider: SpecialProtocolCredentialsProvider,
Expand Down Expand Up @@ -260,6 +264,74 @@ function decodeChunkedGraphChunk(leaves: string[]) {
return final;
}

class LeavesManyProxy {
pendingRequests = new Map<
string,
[Signal<(response: any) => void>, Uint64Set, CancellationTokenSource]
>();

constructor(
private parameters: ChunkedGraphSourceParameters,
private credentialsProvider?: CredentialsProvider<any>,
) {}

async request(
segment: Uint64,
bounds: string,
cancellationToken: CancellationToken,
): Promise<any> {
const { pendingRequests } = this;
let pendingRequest = pendingRequests.get(bounds);
if (!pendingRequest) {
const { parameters, credentialsProvider } = this;
const signal = new Signal<(request: any) => void>();
(signal as any).start = performance.now();
const requestCancellationToken = new CancellationTokenSource();
const segments = new Uint64Set();
pendingRequest = [signal, segments, requestCancellationToken];
pendingRequests.set(bounds, pendingRequest);
setTimeout(async () => {
pendingRequests.delete(bounds);
try {
const response = await cancellableFetchSpecialOk(
credentialsProvider,
`${parameters.url}/leaves_many?int64_as_str=1&bounds=${bounds}`,
{
method: "POST",
body: JSON.stringify({
node_ids: [...segments],
}),
},
responseJson,
requestCancellationToken,
);
signal.dispatch(response);
} catch (e) {
signal.dispatch(e);
}
}, 0);
}
const [request, segments, requestCancellationToken] = pendingRequest;
segments.add(segment);
cancellationToken.add(() => {
segments.delete(segment);
if (segments.size === 0) {
requestCancellationToken.cancel();
}
});
return new Promise((f, r) => {
const unregister = request.add((response) => {
unregister();
if (response instanceof Error) {
r(response);
} else {
f(response[segment.toJSON()]);
}
});
});
}
}

@registerSharedObject()
export class GrapheneChunkedGraphChunkSource extends WithParameters(
WithSharedCredentialsProviderCounterpart<SpecialProtocolCredentials>()(
Expand All @@ -271,43 +343,37 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
chunks: Map<string, ChunkedGraphChunk>;
tempChunkDataSize: Uint32Array;
tempChunkPosition: Float32Array;
leavesManyProxy: LeavesManyProxy;

constructor(rpc: RPC, options: any) {
super(rpc, options);
this.spec = options.spec;
const rank = this.spec.rank;
this.tempChunkDataSize = new Uint32Array(rank);
this.tempChunkPosition = new Float32Array(rank);
this.leavesManyProxy = new LeavesManyProxy(
this.parameters,
this.credentialsProvider,
);
}

async download(
chunk: ChunkedGraphChunk,
cancellationToken: CancellationToken,
): Promise<void> {
const { parameters } = this;
const chunkPosition = this.computeChunkBounds(chunk);
const chunkDataSize = chunk.chunkDataSize!;
const bounds =
`${chunkPosition[0]}-${chunkPosition[0] + chunkDataSize[0]}_` +
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`;

const request = cancellableFetchSpecialOk(
this.credentialsProvider,
`${parameters.url}/${chunk.segment}/leaves?int64_as_str=1&bounds=${bounds}`,
{},
responseIdentity,
const request = await this.leavesManyProxy.request(
chunk.segment,
bounds,
cancellationToken,
);
await this.withErrorMessage(
request,
`Fetching leaves of segment ${chunk.segment} in region ${bounds}: `,
)
.then((res) => res.json())
.then((res) => {
chunk.leaves = decodeChunkedGraphChunk(res.leaf_ids);
})
.catch((err) => console.error(err));
chunk.leaves = decodeChunkedGraphChunk(request);
}

getChunk(chunkGridPosition: Float32Array, segment: Uint64) {
Expand All @@ -325,23 +391,6 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
computeChunkBounds(chunk: ChunkedGraphChunk) {
return computeChunkBounds(this, chunk);
}

async withErrorMessage(
promise: Promise<Response>,
errorPrefix: string,
): Promise<Response> {
const response = await promise;
if (response.ok) {
return response;
}
let msg: string;
try {
msg = (await response.json()).message;
} catch {
msg = await response.text();
}
throw new Error(`[${response.status}] ${errorPrefix}${msg}`);
}
}

interface ChunkedGraphRenderLayerAttachmentState {
Expand Down

0 comments on commit 6895ccf

Please sign in to comment.