Skip to content

Commit

Permalink
fix(blob-propagation-jobs-cli): fix blob iteration in date range for …
Browse files Browse the repository at this point in the history
…`create` command (#642)

* feat(blob-propagation-cli): add support for creating jobs given a range of blocks

* fix(blob-propagation-jobs-cli): resolve an issue in create command

Use the lowest or highest block in the database to define the range for job creation when the provided range is significantly out of bounds.

* chore: add changeset

* test(blob-propagation-jobs-cli): remove `only` modifier

* refactor(blob-propagation-jobs-cli): refactor blob iteration on `create` command

* fix(blob-propagation-jobs-cli): properly await job creation promise

* style(blob-propagation-jobs-cli): improve log messages for `create` command

* chore: add changeset

* style(blob-propagation-jobs-cli): fix flag description typo

Co-authored-by: Pablo Castellano <pablo@anche.no>

---------

Co-authored-by: Pablo Castellano <pablo@anche.no>
  • Loading branch information
PJColombo and PabloCastellano authored Nov 27, 2024
1 parent 1efc55d commit 6e947ba
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 148 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-knives-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@blobscan/blob-propagation-jobs-cli": minor
---

Added support for creating jobs based on a specified block number range
5 changes: 5 additions & 0 deletions .changeset/many-buckets-sparkle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@blobscan/blob-propagation-jobs-cli": patch
---

It fixes an issue where the `create` command wasn't properly iterating over blobs within a given date range
261 changes: 177 additions & 84 deletions clis/blob-propagation-jobs-cli/src/commands/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import type {
BlobPropagationJob,
BlobPropagationQueue,
} from "@blobscan/blob-propagator";
import dayjs from "@blobscan/dayjs";
import type { Prisma } from "@blobscan/db";
import { prisma } from "@blobscan/db";
import { env } from "@blobscan/env";

Expand All @@ -14,6 +16,7 @@ import { context } from "../context-instance";
import type { Command } from "../types";
import {
blobHashOptionDef,
blockRangeOptionDefs,
datePeriodOptionDefs,
helpOptionDef,
queuesOptionDef,
Expand All @@ -24,6 +27,7 @@ import {
const PRISMA_BATCH_OPERATIONS_MAX_SIZE = parseInt(
env.PRISMA_BATCH_OPERATIONS_MAX_SIZE.toString()
);

const createCommandOptDefs: commandLineArgs.OptionDefinition[] = [
helpOptionDef,
{
Expand All @@ -42,6 +46,15 @@ const createCommandOptDefs: commandLineArgs.OptionDefinition[] = [
...datePeriodOptionDefs.to,
description: "Date to which to retrieve blobs to create jobs for.",
},
{
...blockRangeOptionDefs.from,
description:
"Block number from which to retrieve blobs to create jobs for.",
},
{
...blockRangeOptionDefs.to,
description: "Block number to which to retrieve blobs for creating jobs.",
},
];

export const createCommandUsage = commandLineUsage([
Expand All @@ -65,36 +78,86 @@ function createBlobPropagationJobs(
createBlobStorageJob(q.name, hash)
);

q.addBulk(storageJobs as BlobPropagationJob[]);
return q.addBulk(storageJobs as BlobPropagationJob[]);
})
);
}

async function createBlobPropagationJobsInBatches(
storageQueues: BlobPropagationQueue[],
blobHashesBatchFetcher: (cursorId?: string) => Promise<{
blobHashes: string[];
nextCursorId?: string;
}>
) {
let cursorId: string | undefined;
const blockSelect = {
number: true,
} satisfies Prisma.BlockSelect;

do {
const { blobHashes, nextCursorId } = await blobHashesBatchFetcher(cursorId);
type BlockPayload = Prisma.BlockGetPayload<{
select: typeof blockSelect;
}>;

async function findNearestBlock({
slotOrDate,
limit,
}: {
slotOrDate?: string | number;
limit?: "lower" | "upper";
} = {}) {
console.log(
`Finding nearest ${limit} block${slotOrDate ? ` for ${slotOrDate}` : ""}…`
);
let block: BlockPayload | null = null;

const isLower = limit === "lower";
const operator = limit ? (isLower ? "gte" : "lte") : "equals";
const sort = limit ? (isLower ? "asc" : "desc") : "asc";

if (!isNaN(Number(slotOrDate))) {
block = await prisma.block.findFirst({
select: blockSelect,
where: {
slot: {
[operator]: Number(slotOrDate),
},
},
orderBy: {
slot: sort,
},
});
} else if (slotOrDate && dayjs(slotOrDate).utc().isValid()) {
const date = dayjs(slotOrDate).utc();

block = await prisma.block.findFirst({
select: blockSelect,
where: {
timestamp: {
[operator]: date.format(),
},
},
orderBy: {
timestamp: sort,
},
});
}

if (!block) {
const lowestUppestBlock = await prisma.block.findFirst({
select: blockSelect,
orderBy: {
number: sort,
},
});

await createBlobPropagationJobs(storageQueues, blobHashes);
block = lowestUppestBlock;
}

cursorId = nextCursorId;
} while (cursorId);
return block?.number;
}

export const create: Command = async function (argv) {
const {
blobHash: rawBlobHashes,
help,
queue: queueNames,
fromDate,
toDate,
fromDate: fromDateArg,
toDate: toDateArg,
fromBlock: fromBlockArg,
toBlock: toBlockArg,
} = commandLineArgs(createCommandOptDefs, {
argv,
}) as {
Expand All @@ -103,6 +166,8 @@ export const create: Command = async function (argv) {
queue?: QueueHumanName[];
fromDate?: string;
toDate?: string;
fromBlock?: number;
toBlock?: number;
};

if (help) {
Expand All @@ -114,6 +179,7 @@ export const create: Command = async function (argv) {
const storageQueues = queueNames
? context.getQueuesOrThrow(queueNames)
: context.getAllStorageQueues();
const storageQueueNames = storageQueues.map((q) => q.name).join(", ");
let blobHashes: string[];

if (rawBlobHashes?.length) {
Expand All @@ -139,75 +205,102 @@ export const create: Command = async function (argv) {
);
}

await createBlobPropagationJobs(storageQueues, blobHashes);
} else if (fromDate || toDate) {
await createBlobPropagationJobsInBatches(
storageQueues,
async (cursorId) => {
const dbBlocks = await prisma.block.findMany({
take: PRISMA_BATCH_OPERATIONS_MAX_SIZE,
skip: cursorId ? 1 : undefined,
cursor: cursorId
? {
hash: cursorId,
}
: undefined,
select: {
hash: true,
transactions: {
select: {
blobs: {
select: {
blobHash: true,
},
},
},
},
},
where: {
timestamp: {
gte: fromDate,
lte: toDate,
},
},
});

const blobHashes = [
...new Set(
dbBlocks
.map((b) =>
b.transactions.map((t) => t.blobs.map((bl) => bl.blobHash))
)
.flat(2)
),
];

return {
blobHashes,
nextCursorId: dbBlocks[dbBlocks.length - 1]?.hash,
};
}
);
console.log(`Creating jobs for storage queues: ${storageQueueNames}…`);

const jobs = await createBlobPropagationJobs(storageQueues, blobHashes);

console.log(`${jobs.length} jobs created`);

return;
}

let fromBlock: number;
let toBlock: number;

if (fromBlockArg) {
fromBlock = fromBlockArg;
} else {
await createBlobPropagationJobsInBatches(
const nearestBlock = await findNearestBlock({
slotOrDate: fromDateArg,
limit: "lower",
});

if (!nearestBlock) {
console.log("Skipping job creation as database is empty.");
return;
}

fromBlock = nearestBlock;
}

if (toBlockArg) {
toBlock = toBlockArg;
} else {
const nearestBlock = await findNearestBlock({
slotOrDate: toDateArg,
limit: "upper",
});

if (!nearestBlock) {
console.log("Skipping job creation as database is empty.");
return;
}

toBlock = nearestBlock;
}

console.log(
`Creating propagation jobs for blobs between blocks ${fromBlock} and ${toBlock} for storage queues: ${storageQueueNames}…`
);

let batchFromBlock = fromBlock,
batchToBlock = Math.min(
fromBlock + PRISMA_BATCH_OPERATIONS_MAX_SIZE,
toBlock
);
let totalJobsCreated = 0;
let blobVersionedHashes = [];

do {
const dbBlobs = await prisma.blobsOnTransactions.findMany({
select: {
blockNumber: true,
blobHash: true,
},
where: {
blockNumber: {
lte: batchToBlock,
gte: batchFromBlock,
},
},
orderBy: {
blockNumber: "asc",
},
});

console.log(
`Blocks ${batchFromBlock} - ${batchToBlock}: fetched ${dbBlobs.length} blobs`
);

blobVersionedHashes = [...new Set(dbBlobs.map((b) => b.blobHash))];

const jobs = await createBlobPropagationJobs(
storageQueues,
async (cursorId) => {
const dbBlobs = await prisma.blob.findMany({
take: PRISMA_BATCH_OPERATIONS_MAX_SIZE,
cursor: cursorId
? {
commitment: cursorId,
}
: undefined,
skip: cursorId ? 1 : undefined,
});
const blobHashes = dbBlobs.map((b) => b.versionedHash);

return {
blobHashes,
nextCursorId: dbBlobs[dbBlobs.length - 1]?.commitment,
};
}
blobVersionedHashes
);
}

console.log(
`Block ${batchFromBlock} - ${batchToBlock}: ${jobs.length} jobs created`
);

batchFromBlock = batchToBlock + 1;
batchToBlock = Math.min(
batchToBlock + PRISMA_BATCH_OPERATIONS_MAX_SIZE,
toBlock
);

totalJobsCreated += blobVersionedHashes.length;
} while (blobVersionedHashes.length && batchFromBlock <= toBlock);

console.log(`Total jobs created: ${totalJobsCreated}`);
};
Empty file modified clis/blob-propagation-jobs-cli/src/runner.ts
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions clis/blob-propagation-jobs-cli/src/utils/options-defs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ const slotType = (input: string): number => {
};

const dateType = (input: string): string => {
const date = dayjs(input);
const date = dayjs.utc(input);
if (!date.isValid()) {
throw new Error(`Invalid date "${input}". Expected a ISO 8601 date.`);
}

return date.toISOString();
return date.format();
};

const queueType = (input: string): string => {
Expand Down
Empty file.
Loading

0 comments on commit 6e947ba

Please sign in to comment.