diff --git a/.vscode/settings.json b/.vscode/settings.json index 3fc2277..26af3fd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -24,6 +24,7 @@ "INJECTEDAPTOS", "Irys", "knexfile", + "localstack", "MULTIAPTOS", "multistream", "NOWAIT", diff --git a/ecs/fulfillment-pipeline/src/app.ts b/ecs/fulfillment-pipeline/src/app.ts index 62f5a76..1782b2d 100644 --- a/ecs/fulfillment-pipeline/src/app.ts +++ b/ecs/fulfillment-pipeline/src/app.ts @@ -17,6 +17,7 @@ import { Message } from "@aws-sdk/client-sqs"; import { Consumer } from "sqs-consumer"; +import { ArweaveGateway } from "../../../src/arch/arweaveGateway"; import { PostgresDatabase } from "../../../src/arch/db/postgres"; import { TurboPaymentService } from "../../../src/arch/payment"; import { migrateOnStartup } from "../../../src/constants"; @@ -73,6 +74,7 @@ const uploadDatabase = new PostgresDatabase({ }); const objectStore = getS3ObjectStore(); const paymentService = new TurboPaymentService(); +const arweaveGateway = new ArweaveGateway(); // Set up queue handler configurations for jobs based on a planId export const queues: QueueHandlerConfig[] = [ @@ -115,6 +117,7 @@ const consumers: ConsumerQueue[] = queues.map((queue) => ({ database: uploadDatabase, objectStore, paymentService, + arweaveGateway, }), ...queue, })); @@ -306,6 +309,7 @@ if (process.env.PLAN_BUNDLE_ENABLED === "true") { planBundleJobScheduler = new PlanBundleJobScheduler({ intervalMs: +(process.env.PLAN_BUNDLE_INTERVAL_MS ?? 60_000), logger: globalLogger, + database: uploadDatabase, }); setUpAndStartJobScheduler(planBundleJobScheduler); } @@ -313,6 +317,7 @@ if (process.env.VERIFY_BUNDLE_ENABLED === "true") { verifyBundleJobScheduler = new VerifyBundleJobScheduler({ intervalMs: +(process.env.VERIFY_BUNDLE_INTERVAL_MS ?? 60_000), logger: globalLogger, + database: uploadDatabase, }); setUpAndStartJobScheduler(verifyBundleJobScheduler); } diff --git a/ecs/fulfillment-pipeline/src/jobs/plan.ts b/ecs/fulfillment-pipeline/src/jobs/plan.ts index c7633a9..2cee43d 100644 --- a/ecs/fulfillment-pipeline/src/jobs/plan.ts +++ b/ecs/fulfillment-pipeline/src/jobs/plan.ts @@ -16,22 +16,28 @@ */ import winston from "winston"; +import { Database } from "../../../../src/arch/db/database"; import { planBundleHandler } from "../../../../src/jobs/plan"; import { JobScheduler } from "../utils/jobScheduler"; export class PlanBundleJobScheduler extends JobScheduler { + private database: Database; + constructor({ intervalMs = 60_000, logger, + database, }: { intervalMs: number; logger: winston.Logger; + database: Database; }) { super({ intervalMs, schedulerName: "plan-bundle", logger }); + this.database = database; } async processJob(): Promise { - await planBundleHandler(undefined, undefined, this.logger).catch( + await planBundleHandler(this.database, undefined, this.logger).catch( (error) => { this.logger.error("Error planning bundle", error); } diff --git a/ecs/fulfillment-pipeline/src/jobs/verify.ts b/ecs/fulfillment-pipeline/src/jobs/verify.ts index d6ed1ed..caabadc 100644 --- a/ecs/fulfillment-pipeline/src/jobs/verify.ts +++ b/ecs/fulfillment-pipeline/src/jobs/verify.ts @@ -16,26 +16,34 @@ */ import winston from "winston"; +import { Database } from "../../../../src/arch/db/database"; import { verifyBundleHandler } from "../../../../src/jobs/verify"; import { JobScheduler } from "../utils/jobScheduler"; export class VerifyBundleJobScheduler extends JobScheduler { + private database: Database; constructor({ intervalMs = 60_000, logger, + database, }: { intervalMs: number; logger: winston.Logger; + database: Database; }) { super({ intervalMs, schedulerName: "verify-bundle", logger, }); + this.database = database; } async processJob(): Promise { - await verifyBundleHandler({ logger: this.logger }).catch((error) => { + await verifyBundleHandler({ + database: this.database, + logger: this.logger, + }).catch((error) => { this.logger.error("Error verifying bundle", error); }); } diff --git a/ecs/fulfillment-pipeline/src/utils/planIdMessageHandler.ts b/ecs/fulfillment-pipeline/src/utils/planIdMessageHandler.ts index bfd060b..d984348 100644 --- a/ecs/fulfillment-pipeline/src/utils/planIdMessageHandler.ts +++ b/ecs/fulfillment-pipeline/src/utils/planIdMessageHandler.ts @@ -18,6 +18,7 @@ import { Message, SQSClient, SQSClientConfig } from "@aws-sdk/client-sqs"; import { Consumer } from "sqs-consumer"; import winston from "winston"; +import { ArweaveGateway } from "../../../../src/arch/arweaveGateway"; import { Database } from "../../../../src/arch/db/database"; import { ObjectStore } from "../../../../src/arch/objectStore"; import { PaymentService } from "../../../../src/arch/payment"; @@ -31,6 +32,7 @@ export const planIdMessageHandler = ({ database, objectStore, paymentService, + arweaveGateway, }: { message: Message; logger: winston.Logger; @@ -38,6 +40,7 @@ export const planIdMessageHandler = ({ database: Database; objectStore: ObjectStore; paymentService: PaymentService; + arweaveGateway: ArweaveGateway; }) => { const messageLogger = logger.child({ messageId: message.MessageId, @@ -68,6 +71,7 @@ export const planIdMessageHandler = ({ database, objectStore, paymentService, + arweaveGateway, }, // provide our message logger to the handler messageLogger.child({ planId }) @@ -80,12 +84,14 @@ export function createPlanIdHandlingSQSConsumer({ database, objectStore, paymentService, + arweaveGateway, }: { queue: QueueHandlerConfig; sqsOptions?: Partial; database: Database; objectStore: ObjectStore; paymentService: PaymentService; + arweaveGateway: ArweaveGateway; }) { const { queueUrl, consumerOptions, logger } = queue; return Consumer.create({ @@ -98,6 +104,7 @@ export function createPlanIdHandlingSQSConsumer({ database, objectStore, paymentService, + arweaveGateway, }), sqs: new SQSClient(sqsOptions), batchSize: 1, diff --git a/src/arch/arweaveGateway.ts b/src/arch/arweaveGateway.ts index e1a348d..ecbde9f 100644 --- a/src/arch/arweaveGateway.ts +++ b/src/arch/arweaveGateway.ts @@ -80,7 +80,7 @@ export class ArweaveGateway implements Gateway { endpoint = gatewayUrl, retryStrategy = new ExponentialBackoffRetryStrategy({}), axiosInstance = axios.create(), // defaults to throwing errors for status codes >400 - }: GatewayAPIConstParams) { + }: GatewayAPIConstParams = {}) { this.endpoint = endpoint; this.retryStrategy = retryStrategy; this.axiosInstance = axiosInstance; diff --git a/src/arch/db/database.ts b/src/arch/db/database.ts index 1fc3248..d9b1c42 100644 --- a/src/arch/db/database.ts +++ b/src/arch/db/database.ts @@ -15,6 +15,7 @@ * along with this program. If not, see . */ import { + DataItemFailedReason, FinishedMultiPartUpload, InFlightMultiPartUpload, InsertNewBundleParams, @@ -27,7 +28,12 @@ import { PostedNewDataItem, SeededBundle, } from "../../types/dbTypes"; -import { TransactionId, UploadId, Winston } from "../../types/types"; +import { + DataItemId, + TransactionId, + UploadId, + Winston, +} from "../../types/types"; // TODO: this could be an interface since no functions have a default implementation export interface Database { @@ -125,11 +131,12 @@ export interface Database { /** Gets latest status of a data item from the database */ getDataItemInfo(dataItemId: TransactionId): Promise< | { - status: "new" | "pending" | "permanent"; + status: "new" | "pending" | "permanent" | "failed"; assessedWinstonPrice: Winston; bundleId?: TransactionId; uploadedTimestamp: number; deadlineHeight?: number; + failedReason?: DataItemFailedReason; } | undefined >; @@ -176,8 +183,10 @@ export interface Database { uploadId: UploadId ): Promise; - /** TODO: create failed_data_item table instead, remove this */ - deletePlannedDataItem(dataItemId: string): Promise; + updatePlannedDataItemAsFailed(params: { + dataItemId: DataItemId; + failedReason: DataItemFailedReason; + }): Promise; } export type UpdateDataItemsToPermanentParams = { diff --git a/src/arch/db/dbConstants.ts b/src/arch/db/dbConstants.ts index fd91193..4268944 100644 --- a/src/arch/db/dbConstants.ts +++ b/src/arch/db/dbConstants.ts @@ -22,6 +22,7 @@ export const tableNames = { permanentBundle: "permanent_bundle", permanentDataItem: "permanent_data_item", plannedDataItem: "planned_data_item", + failedDataItem: "failed_data_item", postedBundle: "posted_bundle", seededBundle: "seeded_bundle", /** @deprecated */ diff --git a/src/arch/db/dbMaps.ts b/src/arch/db/dbMaps.ts index f00bf07..31a9015 100644 --- a/src/arch/db/dbMaps.ts +++ b/src/arch/db/dbMaps.ts @@ -16,8 +16,11 @@ */ import { defaultPremiumFeatureType } from "../../constants"; import { + DataItemFailedReason, FailedBundle, FailedBundleDBResult, + FailedDataItem, + FailedDataItemDBResult, NewBundle, NewBundleDBResult, NewDataItem, @@ -173,3 +176,41 @@ export function permanentDataItemDbResultToPermanentDataItemMap({ deadlineHeight: deadline_height ? +deadline_height : undefined, }; } + +export function failedDataItemDbResultToFailedDataItemMap({ + assessed_winston_price, + byte_count, + data_item_id, + owner_public_address, + uploaded_date, + data_start, + failed_bundles, + signature_type, + content_type, + premium_feature_type, + plan_id, + planned_date, + deadline_height, + failed_date, + failed_reason, + signature, +}: FailedDataItemDBResult): FailedDataItem { + return { + assessedWinstonPrice: W(assessed_winston_price), + dataItemId: data_item_id, + ownerPublicAddress: owner_public_address, + byteCount: +byte_count, + uploadedDate: uploaded_date, + premiumFeatureType: premium_feature_type ?? defaultPremiumFeatureType, + failedBundles: failed_bundles ? failed_bundles.split(",") : [], + signatureType: signature_type ?? undefined, + payloadDataStart: data_start ?? undefined, + payloadContentType: content_type ?? undefined, + signature: signature ?? undefined, + planId: plan_id, + plannedDate: planned_date, + deadlineHeight: deadline_height ? +deadline_height : undefined, + failedDate: failed_date, + failedReason: failed_reason as DataItemFailedReason, + }; +} diff --git a/src/arch/db/migrator.ts b/src/arch/db/migrator.ts index c3afa69..797ecaa 100644 --- a/src/arch/db/migrator.ts +++ b/src/arch/db/migrator.ts @@ -16,7 +16,11 @@ */ import { Knex } from "knex"; -import { defaultPremiumFeatureType, maxSignatureLength } from "../../constants"; +import { + defaultPremiumFeatureType, + failedBundleCSVColumnLength, + maxSignatureLength, +} from "../../constants"; import globalLogger from "../../logger"; import { columnNames, tableNames } from "./dbConstants"; @@ -459,3 +463,108 @@ export class DeadlineHeightMigrator extends Migrator { }); } } + +export class FailedDataItemMigrator extends Migrator { + constructor(private readonly knex: Knex) { + super(); + } + + public migrate() { + return this.operate({ + name: "migrate to failed data item", + operation: async () => { + await this.knex.schema.createTableLike( + tableNames.failedDataItem, + tableNames.plannedDataItem, + async (table) => { + table + .timestamp(columnNames.failedDate) + .notNullable() + .defaultTo(this.knex.fn.now()) + .index(); + table.string(columnNames.failedReason).notNullable().index(); + } + ); + }, + }); + } + + public rollback() { + return this.operate({ + name: "rollback from failed data item", + operation: async () => { + await this.knex.schema.dropTableIfExists(tableNames.failedDataItem); + }, + }); + } +} + +export class BumpFailedBundlesCharLimitMigrator extends Migrator { + constructor(private readonly knex: Knex) { + super(); + } + + public migrate() { + return this.operate({ + name: "bump failed bundles char limit", + operation: async () => { + await this.knex.schema.alterTable(tableNames.newDataItem, (table) => { + table + .string(columnNames.failedBundles, failedBundleCSVColumnLength) + .nullable() + .alter(); + }); + await this.knex.schema.alterTable( + tableNames.plannedDataItem, + (table) => { + table + .string(columnNames.failedBundles, failedBundleCSVColumnLength) + .nullable() + .alter(); + } + ); + await this.knex.schema.alterTable( + tableNames.failedDataItem, + (table) => { + table + .string(columnNames.failedBundles, failedBundleCSVColumnLength) + .nullable() + .alter(); + } + ); + await this.knex.schema.alterTable( + tableNames.permanentDataItem, + (table) => { + table + .string(columnNames.failedBundles, failedBundleCSVColumnLength) + .nullable() + .alter(); + } + ); + }, + }); + } + + public rollback() { + return this.operate({ + name: "rollback from bump failed bundles char limit", + operation: async () => { + await this.knex.schema.alterTable(tableNames.newDataItem, (table) => { + table.string(columnNames.failedBundles).nullable().alter(); + }); + await this.knex.schema.alterTable( + tableNames.plannedDataItem, + (table) => { + table.string(columnNames.failedBundles).nullable().alter(); + } + ); + await this.knex.schema.alterTable( + tableNames.permanentDataItem, + (table) => { + table.string(columnNames.failedBundles).nullable().alter(); + } + ); + }, + }); + } +} diff --git a/src/arch/db/postgres.ts b/src/arch/db/postgres.ts index 2e4bd2b..0653eaa 100644 --- a/src/arch/db/postgres.ts +++ b/src/arch/db/postgres.ts @@ -15,7 +15,6 @@ * along with this program. If not, see . */ import knex, { Knex } from "knex"; -import pLimit from "p-limit"; import path from "path"; import winston from "winston"; @@ -23,12 +22,16 @@ import { batchingSize, failedReasons, maxDataItemsPerBundle, + retryLimitForFailedDataItems, } from "../../constants"; import logger from "../../logger"; import { BundlePlanDBResult, DataItemDbResults, + DataItemFailedReason, FailedBundleDbInsert, + FailedDataItemDBInsert, + FailedDataItemDBResult, FinishedMultiPartUpload, FinishedMultiPartUploadDBInsert, FinishedMultiPartUploadDBResult, @@ -59,7 +62,13 @@ import { SeededBundle, SeededBundleDBResult, } from "../../types/dbTypes"; -import { TransactionId, UploadId, W, Winston } from "../../types/types"; +import { + DataItemId, + TransactionId, + UploadId, + W, + Winston, +} from "../../types/types"; import { generateArrayChunks } from "../../utils/common"; import { BundlePlanExistsInAnotherStateWarning, @@ -116,7 +125,10 @@ export class PostgresDatabase implements Database { dataItem: restOfNewDataItem, }); - if (await this.dataItemExists(newDataItem.dataItemId)) { + if ( + (await this.getExistingDataItemsDbResultsById([newDataItem.dataItemId])) + .length > 0 + ) { throw new DataItemExistsWarning(newDataItem.dataItemId); } @@ -145,24 +157,48 @@ export class PostgresDatabase implements Database { tableNames.newDataItem, tableNames.plannedDataItem, tableNames.permanentDataItem, - // TODO: tableNames.failedDataItem, + tableNames.failedDataItem, ] as const; - private async getDataItemsDbResultsById( + private async getExistingDataItemsDbResultsById( dataItemIds: TransactionId[] ): Promise { - return this.reader.transaction(async (knexTransaction) => { - const dataItemResults = await Promise.all( - this.dataItemTables.map((tableName) => - knexTransaction(tableName).whereIn( - columnNames.dataItemId, - dataItemIds + const results: DataItemDbResults[] = await this.reader.transaction( + async (knexTransaction) => { + const dataItemResults = await Promise.all( + this.dataItemTables.map((tableName) => + knexTransaction(tableName).whereIn( + columnNames.dataItemId, + dataItemIds + ) ) - ) + ); + + return dataItemResults.flat(); + } + ); + + // Delete any failed data items, as this read is checking before a re-insert + const failedDataItems = results.filter( + (result) => "failed_reason" in result + ); + if (failedDataItems.length) { + this.log.warn( + "Data items already exist in database as failed! Removing from database to retry...", + { dataItemIds } ); + await this.writer(tableNames.failedDataItem) + .whereIn( + columnNames.dataItemId, + failedDataItems.map((r) => r.data_item_id) + ) + .del(); + failedDataItems.forEach((r) => { + results.splice(results.indexOf(r), 1); + }); + } - return dataItemResults.flat(); - }); + return results; } public async insertNewDataItemBatch( @@ -173,9 +209,10 @@ export class PostgresDatabase implements Database { }); // Check if any data items already exist in the database - const existingDataItemDbResults = await this.getDataItemsDbResultsById( - dataItemBatch.map((newDataItem) => newDataItem.dataItemId) - ); + const existingDataItemDbResults = + await this.getExistingDataItemsDbResultsById( + dataItemBatch.map((newDataItem) => newDataItem.dataItemId) + ); if (existingDataItemDbResults.length > 0) { const existingDataItemIds = new Set( existingDataItemDbResults.map((r) => r.data_item_id) @@ -203,33 +240,6 @@ export class PostgresDatabase implements Database { ); } - private async dataItemExists(data_item_id: TransactionId): Promise { - return this.reader.transaction(async (knexTransaction) => { - const dataItemResults = await Promise.all([ - knexTransaction(tableNames.newDataItem).where({ - data_item_id, - }), - knexTransaction( - tableNames.plannedDataItem - ).where({ - data_item_id, - }), - knexTransaction( - tableNames.permanentDataItem - ).where({ - data_item_id, - }), - ]); - - for (const result of dataItemResults) { - if (result.length > 0) { - return true; - } - } - return false; - }); - } - private newDataItemToDbInsert({ assessedWinstonPrice, byteCount, @@ -704,18 +714,34 @@ export class PostgresDatabase implements Database { .del() .returning("*"); - const dbInserts: RePackDataItemDbInsert[] = deletedDataItems.map( - ({ plan_id: _pi, planned_date: _pd, ...restOfDataItem }) => ({ - ...restOfDataItem, - failed_bundles: [ - ...(restOfDataItem.failed_bundles - ? restOfDataItem.failed_bundles.split(",") - : []), - failedBundleId, - ].join(","), - }) - ); - + // For any data items over the retry limit, we will move them to failed data items + const dbInserts: RePackDataItemDbInsert[] = []; + for (const { + failed_bundles, + plan_id, + planned_date, + ...restOfDataItem + } of deletedDataItems) { + const failedBundles = failed_bundles ? failed_bundles.split(",") : []; + failedBundles.push(failedBundleId); + if (failedBundles.length >= retryLimitForFailedDataItems) { + const failedDbInsert: FailedDataItemDBInsert = { + ...restOfDataItem, + failed_reason: "too_many_failures", + plan_id, + planned_date, + failed_bundles: failedBundles.join(","), + }; + await knexTransaction(tableNames.failedDataItem).insert( + failedDbInsert + ); + } else { + dbInserts.push({ + ...restOfDataItem, + failed_bundles: failedBundles.join(","), + }); + } + } await knexTransaction.batchInsert< RePackDataItemDbInsert, NewDataItemDBResult @@ -776,59 +802,34 @@ export class PostgresDatabase implements Database { planId: PlanId, failedBundleId: TransactionId ): Promise { + logger.debug("Repacking data items for plan...", { + planId, + failedBundleId, + }); const plannedDataItems = await this.reader( tableNames.plannedDataItem ).where({ plan_id: planId }); - const newDataItemInserts = plannedDataItems.map( - ({ plan_id: _pi, planned_date: _pd, failed_bundles, ...rest }) => { - const failedBundlesArray = failed_bundles - ? failed_bundles.split(",") - : []; - failedBundlesArray.push(failedBundleId); - - return { - ...rest, - failed_bundles: failedBundlesArray.join(","), - }; - } - ); - const rePackDataItemInsertBatches = [ - ...generateArrayChunks( - newDataItemInserts, + ...generateArrayChunks( + plannedDataItems.map((pdi) => pdi.data_item_id), batchingSize ), ]; - const parallelLimit = pLimit(1); - const transactionPromises = rePackDataItemInsertBatches.map((batch) => - parallelLimit(() => - // Each batch will insert and delete in its own atomic transaction - this.writer.transaction(async (dbTx) => { - await dbTx.batchInsert( - tableNames.newDataItem, - batch - ); - await dbTx(tableNames.plannedDataItem) - .whereIn( - columnNames.dataItemId, - batch.map((b) => b.data_item_id) - ) - .del(); - }) - ) - ); - await Promise.all(transactionPromises); + for (const batch of rePackDataItemInsertBatches) { + await this.updateDataItemsToBeRePacked(batch, failedBundleId); + } } public async getDataItemInfo(dataItemId: string): Promise< | { - status: "new" | "pending" | "permanent"; + status: "new" | "pending" | "permanent" | "failed"; assessedWinstonPrice: Winston; bundleId?: string | undefined; uploadedTimestamp: number; deadlineHeight?: number; + failedReason?: DataItemFailedReason; } | undefined > { @@ -912,6 +913,26 @@ export class PostgresDatabase implements Database { }; } + // Check for a failed data item + const failedDataItemDbResult = await this.reader( + tableNames.failedDataItem + ).where({ data_item_id: dataItemId }); + if (failedDataItemDbResult.length > 0) { + return { + status: "failed", + assessedWinstonPrice: W( + failedDataItemDbResult[0].assessed_winston_price + ), + uploadedTimestamp: new Date( + failedDataItemDbResult[0].uploaded_date + ).getTime(), + deadlineHeight: failedDataItemDbResult[0].deadline_height + ? +failedDataItemDbResult[0].deadline_height + : undefined, + failedReason: failedDataItemDbResult[0].failed_reason, + }; + } + // Data item not found return undefined; } @@ -1189,20 +1210,35 @@ export class PostgresDatabase implements Database { .forUpdate(); } - /** DEBUG tool for deleting data items that have had a catastrophic failure (e.g: deleted from S3) */ - public async deletePlannedDataItem(dataItemId: string): Promise { - this.log.debug("Deleting planned data item...", { + public async updatePlannedDataItemAsFailed({ + dataItemId, + failedReason, + }: { + dataItemId: DataItemId; + failedReason: DataItemFailedReason; + }): Promise { + this.log.warn("Updating planned data item as failed...", { dataItemId, + failedReason, }); - const dataItem = await this.writer( - tableNames.plannedDataItem - ) - .where({ data_item_id: dataItemId }) - .del() - .returning("*"); + await this.writer.transaction(async (knexTransaction) => { + const plannedDataItem = await knexTransaction( + tableNames.plannedDataItem + ) + .where({ data_item_id: dataItemId }) + .del() + .returning("*"); - logger.info("Deleted planned data item database info", { dataItem }); + const dbInsert: FailedDataItemDBInsert = { + ...plannedDataItem[0], + failed_reason: failedReason, + }; + + await knexTransaction( + tableNames.failedDataItem + ).insert(dbInsert); + }); } } diff --git a/src/constants.ts b/src/constants.ts index 1a5c6af..71dc25b 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -104,6 +104,10 @@ export const txConfirmationThreshold = 1; export const dropBundleTxThresholdNumberOfBlocks = 50; export const rePostDataItemThresholdNumberOfBlocks = 125; +export const retryLimitForFailedDataItems = 10; + +const txIdLength = 43; +export const failedBundleCSVColumnLength = (txIdLength + 1) * 20; // Allow up to 20 failed bundles in the schema export const defaultMaxConcurrentChunks = 32; diff --git a/src/jobs/post.ts b/src/jobs/post.ts index 14e672a..22168bd 100644 --- a/src/jobs/post.ts +++ b/src/jobs/post.ts @@ -32,7 +32,7 @@ import { getBundleTx, getS3ObjectStore } from "../utils/objectStoreUtils"; interface PostBundleJobInjectableArch { database?: Database; objectStore?: ObjectStore; - gateway?: Gateway; + arweaveGateway?: Gateway; paymentService?: PaymentService; } @@ -41,7 +41,7 @@ export async function postBundleHandler( { database = new PostgresDatabase(), objectStore = getS3ObjectStore(), - gateway = new ArweaveGateway({ + arweaveGateway = new ArweaveGateway({ endpoint: gatewayUrl, }), paymentService = new TurboPaymentService(), @@ -65,7 +65,9 @@ export async function postBundleHandler( try { // post bundle, throw error on failure - const transactionPostResponseData = await gateway.postBundleTx(bundleTx); + const transactionPostResponseData = await arweaveGateway.postBundleTx( + bundleTx + ); // fetch AR rate - but don't throw on failure const usdToArRate = await paymentService @@ -93,7 +95,7 @@ export async function postBundleHandler( error: message, }); - const balance = await gateway.getBalanceForWallet( + const balance = await arweaveGateway.getBalanceForWallet( ownerToNormalizedB64Address(bundleTx.owner) ); diff --git a/src/jobs/prepare.ts b/src/jobs/prepare.ts index 19bd844..9be8b8c 100644 --- a/src/jobs/prepare.ts +++ b/src/jobs/prepare.ts @@ -68,7 +68,7 @@ interface PrepareBundleJobInjectableArch { objectStore?: ObjectStore; jwk?: JWKInterface; pricing?: PricingService; - gateway?: Gateway; + arweaveGateway?: Gateway; arweave?: ArweaveInterface; } @@ -90,10 +90,10 @@ export async function prepareBundleHandler( database = new PostgresDatabase(), objectStore = getS3ObjectStore(), jwk, - gateway = new ArweaveGateway({ + arweaveGateway = new ArweaveGateway({ endpoint: gatewayUrl, }), - pricing = new PricingService(gateway), + pricing = new PricingService(arweaveGateway), arweave = new ArweaveInterface(), }: PrepareBundleJobInjectableArch, logger = defaultLogger.child({ job: "prepare-bundle-job", planId }) @@ -180,7 +180,10 @@ export async function prepareBundleHandler( } catch (error) { if (isNoSuchKeyS3Error(error)) { const dataItemId = error.Key.split("/")[1]; - await database.deletePlannedDataItem(dataItemId); + await database.updatePlannedDataItemAsFailed({ + dataItemId, + failedReason: "missing_from_object_store", + }); // TODO: This is a hack -- recurse to retry the job without the deleted data item return prepareBundleHandler(planId, { @@ -188,7 +191,7 @@ export async function prepareBundleHandler( objectStore, jwk, pricing, - gateway, + arweaveGateway, arweave, }); } diff --git a/src/jobs/verify.ts b/src/jobs/verify.ts index 5e000a7..644c3dc 100644 --- a/src/jobs/verify.ts +++ b/src/jobs/verify.ts @@ -41,14 +41,14 @@ import { getBundleTx, getS3ObjectStore } from "../utils/objectStoreUtils"; interface VerifyBundleJobArch { database?: Database; objectStore?: ObjectStore; - gateway?: Gateway; + arweaveGateway?: Gateway; logger?: winston.Logger; batchSize?: number; } async function hasBundleBeenPostedLongerThanTheDroppedThreshold( objectStore: ObjectStore, - gateway: Gateway, + arweaveGateway: Gateway, bundleId: TransactionId, transactionByteCount?: ByteCount ): Promise { @@ -58,11 +58,11 @@ async function hasBundleBeenPostedLongerThanTheDroppedThreshold( transactionByteCount ); const txAnchor = bundleTx.last_tx; - const blockHeightOfTxAnchor = await gateway.getBlockHeightForTxAnchor( + const blockHeightOfTxAnchor = await arweaveGateway.getBlockHeightForTxAnchor( txAnchor ); - const currentBlockHeight = await gateway.getCurrentBlockHeight(); + const currentBlockHeight = await arweaveGateway.getCurrentBlockHeight(); return ( currentBlockHeight - blockHeightOfTxAnchor > @@ -73,7 +73,7 @@ async function hasBundleBeenPostedLongerThanTheDroppedThreshold( export async function verifyBundleHandler({ database = new PostgresDatabase(), objectStore = getS3ObjectStore(), - gateway = new ArweaveGateway({ endpoint: gatewayUrl }), + arweaveGateway = new ArweaveGateway({ endpoint: gatewayUrl }), logger = defaultLogger.child({ job: "verify-bundle-job" }), batchSize = batchingSize, }: VerifyBundleJobArch): Promise { @@ -91,13 +91,15 @@ export async function verifyBundleHandler({ const { planId, bundleId, transactionByteCount, payloadByteCount } = bundle; try { - const transactionStatus = await gateway.getTransactionStatus(bundleId); + const transactionStatus = await arweaveGateway.getTransactionStatus( + bundleId + ); if (transactionStatus.status !== "found") { if ( await hasBundleBeenPostedLongerThanTheDroppedThreshold( objectStore, - gateway, + arweaveGateway, bundleId, transactionByteCount ) @@ -109,7 +111,7 @@ export async function verifyBundleHandler({ await database.updateSeededBundleToDropped(planId, bundleId); } } else { - // We found the bundle transaction from the gateway + // We found the bundle transaction from the arweaveGateway const { number_of_confirmations, block_height } = transactionStatus.transactionStatus; @@ -130,7 +132,7 @@ export async function verifyBundleHandler({ parallelLimit(() => checkGQLForBlocksThenUpdateDataItemBatch( batch, - gateway, + arweaveGateway, database, bundleId, block_height, @@ -208,7 +210,7 @@ export async function handler(eventPayload?: unknown) { async function checkGQLForBlocksThenUpdateDataItemBatch( dataItemBatch: PlannedDataItem[], - gateway: Gateway, + arweaveGateway: Gateway, database: Database, bundleId: TransactionId, block_height: number, @@ -217,7 +219,7 @@ async function checkGQLForBlocksThenUpdateDataItemBatch( logger: winston.Logger, planId: string ) { - const dataItemGQLResults = await gateway.getDataItemsFromGQL( + const dataItemGQLResults = await arweaveGateway.getDataItemsFromGQL( dataItemBatch.map(({ dataItemId }) => dataItemId) ); diff --git a/src/migrations/20240508184826_failed_data_items.ts b/src/migrations/20240508184826_failed_data_items.ts new file mode 100644 index 0000000..76acbbf --- /dev/null +++ b/src/migrations/20240508184826_failed_data_items.ts @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2022-2024 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import { Knex } from "knex"; + +import { FailedDataItemMigrator } from "../arch/db/migrator"; + +export async function up(knex: Knex): Promise { + return new FailedDataItemMigrator(knex).migrate(); +} + +export async function down(knex: Knex): Promise { + return new FailedDataItemMigrator(knex).rollback(); +} diff --git a/src/migrations/20240513192935_inc_failed_bundle_limit.ts b/src/migrations/20240513192935_inc_failed_bundle_limit.ts new file mode 100644 index 0000000..2a3808a --- /dev/null +++ b/src/migrations/20240513192935_inc_failed_bundle_limit.ts @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2022-2024 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import { Knex } from "knex"; + +import { BumpFailedBundlesCharLimitMigrator } from "../arch/db/migrator"; + +export async function up(knex: Knex): Promise { + return new BumpFailedBundlesCharLimitMigrator(knex).migrate(); +} + +export async function down(knex: Knex): Promise { + return new BumpFailedBundlesCharLimitMigrator(knex).rollback(); +} diff --git a/src/routes/status.ts b/src/routes/status.ts index ba17891..3faf991 100644 --- a/src/routes/status.ts +++ b/src/routes/status.ts @@ -29,10 +29,16 @@ export async function statusHandler(ctx: KoaContext, next: Next) { } ctx.body = { - status: info.status === "permanent" ? "FINALIZED" : "CONFIRMED", + status: + info.status === "permanent" + ? "FINALIZED" + : info.status === "failed" + ? "FAILED" + : "CONFIRMED", bundleId: info.bundleId, info: info.status, winc: info.assessedWinstonPrice, + reason: info.failedReason, }; } catch (error) { logger.error(`Error getting data item status: ${error}`); diff --git a/src/types/dbTypes.ts b/src/types/dbTypes.ts index 863b417..bef9cd4 100644 --- a/src/types/dbTypes.ts +++ b/src/types/dbTypes.ts @@ -33,7 +33,10 @@ export type Signature = Buffer; export type DataStart = number; export type BlockHeight = number; -export type FailedReason = "not_found" | "failed_to_post"; +export type BundleFailedReason = "not_found" | "failed_to_post"; +export type DataItemFailedReason = + | "missing_from_object_store" + | "too_many_failures"; export type ContentType = string; @@ -65,6 +68,11 @@ export interface PermanentDataItem extends Omit { blockHeight: BlockHeight; } +export interface FailedDataItem extends PlannedDataItem { + failedDate: Timestamp; + failedReason: DataItemFailedReason; +} + export interface BundlePlan { planId: PlanId; plannedDate: Timestamp; @@ -99,7 +107,7 @@ export interface SeededBundle extends PostedBundle { export interface FailedBundle extends SeededBundle { failedDate: Timestamp; - failedReason?: FailedReason; + failedReason?: BundleFailedReason; } export interface PermanentBundle extends SeededBundle { @@ -195,6 +203,14 @@ export interface PermanentDataItemDBResult extends PermanentDataItemDBInsert { permanent_date: string; } +export interface FailedDataItemDBInsert extends PlannedDataItemDBResult { + failed_reason: DataItemFailedReason; +} + +export interface FailedDataItemDBResult extends FailedDataItemDBInsert { + failed_date: string; +} + export interface BundlePlanDBInsert { plan_id: string; } diff --git a/src/utils/objectStoreUtils.ts b/src/utils/objectStoreUtils.ts index 12e2c6e..e44b319 100644 --- a/src/utils/objectStoreUtils.ts +++ b/src/utils/objectStoreUtils.ts @@ -35,17 +35,22 @@ const multiPartPrefix = "multipart-uploads"; const bundlePayloadPrefix = "bundle-payload"; const bundleTxPrefix = "bundle"; +let s3ObjectStore: S3ObjectStore | undefined; + export function getS3ObjectStore(): ObjectStore { const useMultiRegionAccessPoint = process.env.NODE_ENV === "dev" && !!process.env.DATA_ITEM_MULTI_REGION_ENDPOINT; - return new S3ObjectStore({ - bucketName: useMultiRegionAccessPoint - ? `${process.env.DATA_ITEM_MULTI_REGION_ENDPOINT}` - : // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - process.env.DATA_ITEM_BUCKET!, // Blow up if we can't fall back to this - backupBucketName: process.env.BACKUP_DATA_ITEM_BUCKET, - }); + if (!s3ObjectStore) { + s3ObjectStore = new S3ObjectStore({ + bucketName: useMultiRegionAccessPoint + ? `${process.env.DATA_ITEM_MULTI_REGION_ENDPOINT}` + : // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + process.env.DATA_ITEM_BUCKET!, // Blow up if we can't fall back to this + backupBucketName: process.env.BACKUP_DATA_ITEM_BUCKET, + }); + } + return s3ObjectStore; } export function putDataItemRaw( diff --git a/src/utils/opticalUtils.test.ts b/src/utils/opticalUtils.test.ts index 54d1850..1f31d86 100644 --- a/src/utils/opticalUtils.test.ts +++ b/src/utils/opticalUtils.test.ts @@ -142,7 +142,8 @@ describe("The filterForNestedBundles function", () => { describe("The getNestedDataItemHeaders function", () => { const objectStore = new FileSystemObjectStore(); - before(() => { + + it("returns correct nested data item headers for a real bundled data item", async () => { stub(objectStore, "getObject").resolves({ readable: createReadStream("tests/stubFiles/bdiDataItem"), etag: "stubEtag", @@ -151,9 +152,7 @@ describe("The getNestedDataItemHeaders function", () => { payloadContentType: "application/octet-stream", payloadDataStart: 1100, }); - }); - it("returns correct nested data item headers for a real bundled data item", async () => { expect( await getNestedDataItemHeaders({ objectStore, @@ -227,4 +226,37 @@ describe("The getNestedDataItemHeaders function", () => { }, ]); }); + + it("returns an empty array when passed a BDI header that is not a bundle", async () => { + stub(objectStore, "getObject").resolves({ + readable: createReadStream("tests/stubFiles/stub1115ByteDataItem"), + etag: "stubEtag", + }); + stub(objectStore, "getObjectPayloadInfo").resolves({ + payloadContentType: "application/octet-stream", + payloadDataStart: 1100, + }); + + expect( + await getNestedDataItemHeaders({ + objectStore, + logger: logger, + potentialBDIHeaders: [ + { + id: "cTbz16hHhGW4HF-uMJ5u8RoCg9atYmyMFWGd-kzhF_Q", + owner: "owner", + owner_address: "owner_address", + signature: "signature", + target: "target", + content_type: "content_type", + data_size: 1234, + tags: [ + { name: "QnVuZGxlLUZvcm1hdA", value: "YmluYXJ5" }, + { name: "QnVuZGxlLVZlcnNpb24", value: "Mi4wLjA" }, + ], + }, + ], + }) + ).to.deep.equalInAnyOrder([]); + }); }); diff --git a/src/utils/opticalUtils.ts b/src/utils/opticalUtils.ts index 99fab8d..6364f1f 100644 --- a/src/utils/opticalUtils.ts +++ b/src/utils/opticalUtils.ts @@ -118,10 +118,19 @@ export async function getNestedDataItemHeaders({ bdiDataItemId, }); - // Process it as a bundle and get all the data item info - const parsedDataItemHeaders = (await processStream( - dataItemReadable - )) as ParsedDataItemHeader[]; + let parsedDataItemHeaders: ParsedDataItemHeader[] = []; + try { + // Process it as a bundle and get all the data item info + parsedDataItemHeaders = (await processStream( + dataItemReadable + )) as ParsedDataItemHeader[]; + } catch (error) { + logger.error("Error processing BDI stream.", { + bdiDataItemId, + error, + }); + return []; + } logger.debug("Finished processing BDI stream.", { bdiDataItemId, diff --git a/tests/helpers/dbTestHelpers.ts b/tests/helpers/dbTestHelpers.ts index 6a8c277..ba88bff 100644 --- a/tests/helpers/dbTestHelpers.ts +++ b/tests/helpers/dbTestHelpers.ts @@ -20,6 +20,8 @@ import { Knex } from "knex"; import { columnNames, tableNames } from "../../src/arch/db/dbConstants"; import { PostgresDatabase } from "../../src/arch/db/postgres"; import { + DataItemFailedReason, + FailedDataItemDBInsert, KnexRawResult, NewBundleDBInsert, NewDataItemDBInsert, @@ -38,6 +40,7 @@ import { stubDataItemBufferSignature, stubDates, stubOwnerAddress, + stubPlanId, stubWinstonPrice, } from "../stubs"; @@ -83,12 +86,26 @@ export function stubPlannedDataItemInsert({ } { return { ...stubNewDataItemInsert({ dataItemId, signature, failedBundles }), - plan_id: planId, + plan_id: planId ?? stubPlanId, uploaded_date: stubDates.earliestDate, planned_date: plannedDate, }; } +function stubFailedDataItemInsert({ + failedDate = stubDates.earliestDate, + failedReason = "too_many_failures", + ...params +}: InsertStubFailedDataItemParams): FailedDataItemDBInsert & { + failed_date: string | undefined; +} { + return { + ...stubPlannedDataItemInsert(params), + failed_date: failedDate, + failed_reason: failedReason, + }; +} + function stubPermanentDataItemInsert({ dataItemId, planId, @@ -106,7 +123,7 @@ function stubPermanentDataItemInsert({ failed_bundles: "", content_type: "text/plain", premium_feature_type: "test", - plan_id: planId, + plan_id: planId ?? stubPlanId, planned_date: stubDates.earliestDate, bundle_id: bundleId, block_height: stubBlockHeight.toString(), @@ -190,6 +207,14 @@ export class DbTestHelper { ); } + public async insertStubFailedDataItem( + insertParams: InsertStubFailedDataItemParams + ): Promise { + return this.knex(tableNames.failedDataItem).insert( + stubFailedDataItemInsert(insertParams) + ); + } + public async insertStubPermanentDataItem( insertParams: InsertStubPermanentDataItemParams ): Promise { @@ -399,10 +424,16 @@ interface InsertStubNewDataItemParams { interface InsertStubPlannedDataItemParams extends Omit { - planId: PlanId; + planId?: PlanId; plannedDate?: string; } +interface InsertStubFailedDataItemParams + extends Omit { + failedDate?: string; + failedReason?: DataItemFailedReason; +} + interface InsertStubPermanentDataItemParams extends Omit { bundleId: TransactionId; diff --git a/tests/jobs.int.test.ts b/tests/jobs.int.test.ts index 76b64c3..e343829 100644 --- a/tests/jobs.int.test.ts +++ b/tests/jobs.int.test.ts @@ -198,7 +198,7 @@ describe("Post bundle job handler function integrated with PostgresDatabase clas await postBundleHandler(planId, { objectStore, database: db, - gateway, + arweaveGateway: gateway, paymentService, }); @@ -238,7 +238,7 @@ describe("Post bundle job handler function integrated with PostgresDatabase clas await postBundleHandler(planId, { objectStore, database: db, - gateway, + arweaveGateway: gateway, paymentService, }); @@ -276,7 +276,7 @@ describe("Post bundle job handler function integrated with PostgresDatabase clas await postBundleHandler(planId, { objectStore, database: db, - gateway: gateway, + arweaveGateway: gateway, paymentService, }); @@ -317,7 +317,7 @@ describe("Post bundle job handler function integrated with PostgresDatabase clas promiseToError: postBundleHandler(planId, { objectStore, database: db, - gateway: gateway, + arweaveGateway: gateway, paymentService, }), errorMessage: diff --git a/tests/postgres.test.ts b/tests/postgres.test.ts index 47fc15c..93d3f1d 100644 --- a/tests/postgres.test.ts +++ b/tests/postgres.test.ts @@ -23,9 +23,11 @@ import { plannedDataItemDbResultToPlannedDataItemMap, } from "../src/arch/db/dbMaps"; import { PostgresDatabase } from "../src/arch/db/postgres"; +import { retryLimitForFailedDataItems } from "../src/constants"; import { BundlePlanDBResult, FailedBundleDBResult, + FailedDataItemDBResult, NewBundleDBResult, NewDataItemDBResult, PermanentBundleDBResult, @@ -60,7 +62,6 @@ import { stubTxId7, stubTxId8, stubTxId9, - stubTxId13, stubTxId14, stubTxId15, stubTxId16, @@ -71,45 +72,87 @@ import { describe("PostgresDatabase class", () => { const db = new PostgresDatabase(); const dbTestHelper = new DbTestHelper(db); + describe("insertNewDataItem method", () => { + const uniqueDataItemId = "Unique data ID for the new data item tests."; + + it("adds a new_data_item to the database", async () => { + await db.insertNewDataItem({ + dataItemId: uniqueDataItemId, + ownerPublicAddress: stubOwnerAddress, + byteCount: stubByteCount, + assessedWinstonPrice: stubWinstonPrice, + payloadDataStart: 1500, + failedBundles: [], + signatureType: 1, + uploadedDate: stubDates.earliestDate, + payloadContentType: "application/json", + premiumFeatureType: "default", + signature: stubDataItemBufferSignature, + deadlineHeight: 200, + }); + + const newDataItems = await db["writer"]( + "new_data_item" + ) + .where({ data_item_id: uniqueDataItemId }) + .del() + .returning("*"); + expect(newDataItems.length).to.equal(1); - it("insertNewDataItem method adds a new_data_item to the database", async () => { - await db.insertNewDataItem({ - dataItemId: stubTxId13, - ownerPublicAddress: stubOwnerAddress, - byteCount: stubByteCount, - assessedWinstonPrice: stubWinstonPrice, - payloadDataStart: 1500, - failedBundles: [], - signatureType: 1, - uploadedDate: stubDates.earliestDate, - payloadContentType: "application/json", - premiumFeatureType: "default", - signature: stubDataItemBufferSignature, - deadlineHeight: 200, + const { + assessed_winston_price, + owner_public_address, + byte_count, + data_item_id, + uploaded_date, + content_type, + } = newDataItems[0]; + + expect(assessed_winston_price).to.equal(stubWinstonPrice.toString()); + expect(owner_public_address).to.equal(stubOwnerAddress); + expect(byte_count).to.equal(stubByteCount.toString()); + expect(data_item_id).to.equal(uniqueDataItemId); + expect(uploaded_date).to.exist; + expect(content_type).to.equal("application/json"); }); - const newDataItems = await db["writer"]( - "new_data_item" - ).where({ data_item_id: stubTxId13 }); - expect(newDataItems.length).to.equal(1); - - const { - assessed_winston_price, - owner_public_address, - byte_count, - data_item_id, - uploaded_date, - content_type, - } = newDataItems[0]; - - expect(assessed_winston_price).to.equal(stubWinstonPrice.toString()); - expect(owner_public_address).to.equal(stubOwnerAddress); - expect(byte_count).to.equal(stubByteCount.toString()); - expect(data_item_id).to.equal(stubTxId13); - expect(uploaded_date).to.exist; - expect(content_type).to.equal("application/json"); - - await dbTestHelper.cleanUpEntityInDb(tableNames.newDataItem, stubTxId13); + it("deletes an existing failed_data_item if it exists in the database", async () => { + await dbTestHelper.insertStubFailedDataItem({ + dataItemId: uniqueDataItemId, + failedReason: "too_many_failures", + }); + + await db.insertNewDataItem({ + dataItemId: uniqueDataItemId, + ownerPublicAddress: stubOwnerAddress, + byteCount: stubByteCount, + assessedWinstonPrice: stubWinstonPrice, + payloadDataStart: 1500, + failedBundles: [], + signatureType: 1, + uploadedDate: stubDates.earliestDate, + payloadContentType: "application/json", + premiumFeatureType: "default", + signature: stubDataItemBufferSignature, + deadlineHeight: 200, + }); + + const newDataItems = await db["writer"]( + tableNames.newDataItem + ) + .where({ data_item_id: uniqueDataItemId }) + .del() + .returning("*"); + expect(newDataItems.length).to.equal(1); + + const failedDataItems = await db["writer"]( + tableNames.failedDataItem + ) + .where({ data_item_id: uniqueDataItemId }) + .del() + .returning("*"); + expect(failedDataItems.length).to.equal(0); + }); }); it("getNewDataItems method gets all new_data_item in the database sorted by uploaded_date", async () => { @@ -203,7 +246,7 @@ describe("PostgresDatabase class", () => { }); it("insertNewBundle method deletes existing bundle_plan and inserts new_bundle as expected", async () => { - const bundleId = stubTxId13; + const bundleId = "unique bundle ID insertNewBundle"; const planId = stubPlanId; await dbTestHelper.insertStubBundlePlan({ @@ -236,7 +279,7 @@ describe("PostgresDatabase class", () => { }); it("insertPostedBundle method deletes existing new_bundle and inserts posted_bundle and seed_result as expected", async () => { - const bundleId = stubTxId13; + const bundleId = "Unique insertPostedBundle Bundle ID"; await dbTestHelper.insertStubNewBundle({ planId: stubPlanId, @@ -268,7 +311,7 @@ describe("PostgresDatabase class", () => { }); it("insertSeededBundle method deletes existing posted_bundle and inserts seeded_bundle", async () => { - const bundleId = stubTxId13; + const bundleId = "Unique insertSeededBundle Bundle ID"; const planId = stubPlanId; const usdToArRate = stubUsdToArRate; @@ -379,76 +422,122 @@ describe("PostgresDatabase class", () => { ); }); - it("updateSeededBundleToDropped method deletes existing seeded_bundle, inserts failed_bundle, deletes each planned_data_item, and inserts them as new_data_items", async () => { - const bundleId = "Stub bundle ID updateSeededBundleToDropped"; - const planId = "Stub plan ID updateSeededBundleToDropped"; - const dataItemIds = [stubTxId5, stubTxId13, stubTxId4]; - const usdToArRate = stubUsdToArRate; + describe("updateSeededBundleToDropped method", () => { + it("deletes existing seeded_bundle, inserts failed_bundle, deletes each planned_data_item, and inserts them as new_data_items", async () => { + const bundleId = "Stub bundle ID updateSeededBundleToDropped"; + const planId = "Stub plan ID updateSeededBundleToDropped"; + const dataItemIds = [ + "testOne updateSeededBundleToDropped", + "testTwo updateSeededBundleToDropped", + "testThree updateSeededBundleToDropped", + ]; + const usdToArRate = stubUsdToArRate; - await dbTestHelper.insertStubSeededBundle({ - planId, - bundleId, - dataItemIds, - usdToArRate, - failedBundles: ["testOne", "testTwo"], + await dbTestHelper.insertStubSeededBundle({ + planId, + bundleId, + dataItemIds, + usdToArRate, + failedBundles: ["testOne", "testTwo"], + }); + await db.updateSeededBundleToDropped(planId, bundleId); + + // New data items are inserted as expected + const results = await dbTestHelper.getAndDeleteNewDataItemDbResultsByIds( + dataItemIds + ); + expect(results.length).to.equal(3); + results.forEach((result) => { + expect(result.failed_bundles).to.equal(`testOne,testTwo,${bundleId}`); + }); + + // Seeded bundle is removed + expect( + ( + await db["writer"](tableNames.seededBundle).where({ + bundle_id: bundleId, + }) + ).length + ).to.equal(0); + + // Failed bundle exists as expected + const failedBundleDbResult = await db["writer"]( + tableNames.failedBundle + ).where({ bundle_id: bundleId }); + expect(failedBundleDbResult.length).to.equal(1); + failedBundleExpectations( + failedBundleDbResultToFailedBundleMap(failedBundleDbResult[0]), + { + expectedBundleId: bundleId, + expectedPlanId: planId, + } + ); + + // Planned data items are removed + await Promise.all([ + dataItemIds.map(async (data_item_id) => { + expect( + ( + await db["writer"](tableNames.plannedDataItem).where({ + data_item_id, + }) + ).length + ).to.equal(0); + }), + ]); + + await dbTestHelper.cleanUpSeededBundleInDb({ + bundleId, + dataItemIds, + bundleTable: "failed_bundle", + }); }); - await db.updateSeededBundleToDropped(planId, bundleId); - // Seeded bundle is removed - expect( - ( - await db["writer"](tableNames.seededBundle).where({ - bundle_id: bundleId, - }) - ).length - ).to.equal(0); + it("moves planned_data_item to failed_data_item table if they contain more than the retry limit of failed bundles", async () => { + const dataItemId = "updateSeededBundleToDropped f ailed test"; + const planId = "updateSeededBundleToDrop ailed test"; + const bundleId = "updateSeededBundleToD TxID failed test"; - // Failed bundle exists as expected - const failedBundleDbResult = await db["writer"]( - tableNames.failedBundle - ).where({ bundle_id: bundleId }); - expect(failedBundleDbResult.length).to.equal(1); - failedBundleExpectations( - failedBundleDbResultToFailedBundleMap(failedBundleDbResult[0]), - { - expectedBundleId: bundleId, - expectedPlanId: planId, - } - ); + const failedBundles = Array.from( + { length: retryLimitForFailedDataItems + 2 }, + (_, i) => `failed ${i}` + ); - // Planned data items are removed - await Promise.all([ - dataItemIds.map(async (data_item_id) => { - expect( - ( - await db["writer"](tableNames.plannedDataItem).where({ - data_item_id, - }) - ).length - ).to.equal(0); - }), - ]); + await dbTestHelper.insertStubSeededBundle({ + planId, + bundleId, + dataItemIds: [dataItemId], + failedBundles, + usdToArRate: stubUsdToArRate, + }); - // New data items are inserted as expected - await Promise.all( - dataItemIds.map(async (data_item_id) => { - const dbResult = await db["writer"]( - tableNames.newDataItem - ).where({ - data_item_id, - }); - expect(dbResult.length).to.equal(1); - expect(dbResult[0].data_item_id).to.equal(data_item_id); - expect(dbResult[0].failed_bundles).to.equal( - `testOne,testTwo,${bundleId}` - ); - }) - ); + await db.updateSeededBundleToDropped(planId, bundleId); - await dbTestHelper.cleanUpSeededBundleInDb({ - bundleId, - dataItemIds, - bundleTable: "failed_bundle", + // No new data items are inserted as expected + expect( + ( + await db["writer"](tableNames.newDataItem).where({ + data_item_id: dataItemId, + }) + ).length + ).to.equal(0); + + // Failed data item exists as expected + const failedDataItemDbResult = await db["writer"]( + tableNames.failedDataItem + ) + .where({ data_item_id: dataItemId }) + .del() + .returning("*"); + + expect(failedDataItemDbResult.length).to.equal(1); + expect(failedDataItemDbResult[0].data_item_id).to.equal(dataItemId); + expect(failedDataItemDbResult[0].failed_reason).to.equal( + "too_many_failures" + ); + expect(failedDataItemDbResult[0].failed_bundles).to.equal( + [...failedBundles, bundleId].join(",") + ); }); }); @@ -552,7 +641,7 @@ describe("PostgresDatabase class", () => { it("permanentDataItem", async () => { const dataItemId = stubTxId7; const planId = stubPlanId2; - const bundleId = stubTxId13; + const bundleId = "Unique bundle ID permanentDataItem"; await dbTestHelper.insertStubPermanentDataItem({ dataItemId, @@ -592,7 +681,7 @@ describe("PostgresDatabase class", () => { "permanent data item test 3", ]; const blockHeight = stubBlockHeight; - const bundleId = stubTxId13; + const bundleId = "unique bundle ID permanent data item"; await Promise.all( dataItemIds.map((dataItemId) => @@ -639,7 +728,7 @@ describe("PostgresDatabase class", () => { "re pack data item test 2", "re pack data item test 3", ]; - const bundleId = stubTxId13; + const bundleId = "re pack data item test bundle id"; const previouslyFailedBundle = "already has a failed bundle"; await Promise.all( @@ -661,10 +750,8 @@ describe("PostgresDatabase class", () => { expect(plannedDbResult.length).to.equal(0); // New data items are inserted as expected - const newDbResult = await db["writer"](tableNames.newDataItem).whereIn( - "data_item_id", - dataItemIds - ); + const newDbResult = + await dbTestHelper.getAndDeleteNewDataItemDbResultsByIds(dataItemIds); expect(newDbResult.length).to.equal(3); expect(newDbResult[0].failed_bundles).to.equal( previouslyFailedBundle + "," + bundleId @@ -676,6 +763,47 @@ describe("PostgresDatabase class", () => { ) ); }); + + it("moves data items to failed_data_item if they are already tried beyond the limit", async () => { + const dataItemIds = [ + "re pack data item test 1 unique failed", + "re pack data item test 2 unique failed", + "re pack data item test 3 unique failed", + ]; + const bundleId = "re pack data item test bundle id to failed"; + + const failedBundles = Array.from( + { length: retryLimitForFailedDataItems + 1 }, + (_, i) => i.toString() + ); + await Promise.all( + dataItemIds.map((dataItemId) => + dbTestHelper.insertStubPlannedDataItem({ + dataItemId, + planId: "A great Unique plan ID", + failedBundles, + }) + ) + ); + + await db.updateDataItemsToBeRePacked(dataItemIds, bundleId); + + // Planned data items are removed + const plannedDbResult = await db["writer"]( + tableNames.plannedDataItem + ).whereIn("data_item_id", dataItemIds); + expect(plannedDbResult.length).to.equal(0); + + // Failed data items are inserted as expected + const failedDbResult = await db["writer"](tableNames.failedDataItem) + .whereIn("data_item_id", dataItemIds) + .del() + .returning("*"); + expect(failedDbResult.length).to.equal(3); + expect(failedDbResult[0].failed_bundles).to.equal( + failedBundles.join(",") + "," + bundleId + ); + }); }); describe("insertNewDataItemBatch method", () => { @@ -721,5 +849,59 @@ describe("PostgresDatabase class", () => { expect(newDataItems.length).to.equal(1); expect(newDataItems[0].data_item_id).to.equal(testIds[1]); }); + + it("deletes failed data items if they exist in the database", async () => { + const testIds = ["unique failed data item id one"]; + const dataItemBatch = testIds.map((dataItemId) => + stubNewDataItem(dataItemId) + ); + + // insert the first data item into the failed data item table + await dbTestHelper.insertStubFailedDataItem({ + dataItemId: testIds[0], + failedReason: "missing_from_object_store", + }); + + // Run batch insert with the data item + await db.insertNewDataItemBatch(dataItemBatch); + + const newDataItems = + await dbTestHelper.getAndDeleteNewDataItemDbResultsByIds(testIds); + + // Expect only the second data item to have been inserted to new data item table + expect(newDataItems.length).to.equal(1); + expect(newDataItems[0].data_item_id).to.equal(testIds[0]); + + // Expect the failed data item to have been removed + const failedDataItems = await db["writer"]( + tableNames.failedDataItem + ).where({ + data_item_id: testIds[0], + }); + expect(failedDataItems.length).to.equal(0); + }); + }); + + describe("updatePlannedDataItemAsFailed method", () => { + it("updates the expected data item", async () => { + const dataItemId = "updatePlannedDataItemAsFailed test"; + + await dbTestHelper.insertStubPlannedDataItem({ + dataItemId, + planId: "Unique plan ID", + }); + + await db.updatePlannedDataItemAsFailed({ + dataItemId, + failedReason: "missing_from_object_store", + }); + + const failedDataItemDbResult = await db["writer"]( + tableNames.failedDataItem + ).where({ data_item_id: dataItemId }); + expect(failedDataItemDbResult.length).to.equal(1); + expect(failedDataItemDbResult[0].plan_id).to.equal("Unique plan ID"); + expect(failedDataItemDbResult[0].failed_date).to.exist; + }); }); }); diff --git a/tests/verify.int.test.ts b/tests/verify.int.test.ts index a37fb60..f0938a5 100644 --- a/tests/verify.int.test.ts +++ b/tests/verify.int.test.ts @@ -105,7 +105,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, }); @@ -161,7 +161,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, // Test 100 batches of 5 data items each batchSize: 5, @@ -199,7 +199,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, }); @@ -238,7 +238,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, }); @@ -263,7 +263,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, }); @@ -295,7 +295,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, }); @@ -335,7 +335,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl await verifyBundleHandler({ database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, }); @@ -390,7 +390,7 @@ describe("Verify bundle job handler function integrated with PostgresDatabase cl const input = { database: db, - gateway: gateway, + arweaveGateway: gateway, objectStore, };