Skip to content

Commit

Permalink
storage-node: temp folder refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mnaamani committed Dec 13, 2023
1 parent 5131e5c commit 06b200b
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 70 deletions.
3 changes: 3 additions & 0 deletions storage-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@
},
"operator": {
"description": "Storage provider(operator) commands."
},
"util": {
"description": "Useful utility commands."
}
}
},
Expand Down
51 changes: 29 additions & 22 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { flags } from '@oclif/command'
import { ApiPromise } from '@polkadot/api'
import { KeyringPair } from '@polkadot/keyring/types'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import fs from 'fs'
import sleep from 'sleep-promise'
import _ from 'lodash'
import path from 'path'
import rimraf from 'rimraf'
import sleep from 'sleep-promise'
import { promisify } from 'util'
import ApiCommandBase from '../command-base/ApiCommandBase'
import fs from 'fs'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import { KeyringPair } from '@polkadot/keyring/types'
import { customFlags } from '../command-base/CustomFlags'
import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
import logger, { DatePatternByFrequency, Frequency, initNewLogger } from '../services/logger'
Expand All @@ -23,6 +20,8 @@ import { getStorageBucketIdsByWorkerId } from '../services/sync/storageObligatio
import { PendingDirName, performSync, TempDirName } from '../services/sync/synchronizer'
import { createApp } from '../services/webApi/app'
import ExitCodes from './../command-base/ExitCodes'
import ApiCommandBase from '../command-base/ApiCommandBase'

const fsPromises = fs.promises

/**
Expand Down Expand Up @@ -52,6 +51,10 @@ export default class Server extends ApiCommandBase {
required: true,
description: 'Data uploading directory (absolute path).',
}),
tempFolder: flags.string({
description:
'Directory to store tempory files during sync and upload (absolute path).\n,Temporary directory (absolute path). If not specified a subfolder under the uploads directory will be used.',
}),
port: flags.integer({
char: 'o',
required: true,
Expand Down Expand Up @@ -219,10 +222,21 @@ Supported values: warn, error, debug, info. Default:debug`,
const enableUploadingAuth = false
const operatorRoleKey = undefined

await recreateTempDirectory(flags.uploads, TempDirName)
const tempFolder = flags.tempFolder || path.join(flags.uploads, TempDirName)

// Prevent tempFolder and uploadsFolder being at the same location. This is a simple check
// and doesn't deal with possibility that different path can point to the same location. eg. symlinks or
// a volume being mounted on multiple paths
if (tempFolder === flags.uploads) {
this.error('Please use unique paths for temp and uploads folder paths.')
}

// TODO: Check that uploads and temp folders are writeable

await createTempDirectory(tempFolder)

if (fs.existsSync(flags.uploads)) {
await loadDataObjectIdCache(flags.uploads, TempDirName, PendingDirName)
await loadDataObjectIdCache(flags.uploads)
}

if (flags.dev) {
Expand Down Expand Up @@ -256,7 +270,7 @@ Supported values: warn, error, debug, info. Default:debug`,
selectedBuckets,
qnApi,
flags.uploads,
TempDirName,
tempFolder,
flags.syncWorkersNumber,
flags.syncWorkersTimeout,
flags.syncInterval,
Expand Down Expand Up @@ -292,7 +306,6 @@ Supported values: warn, error, debug, info. Default:debug`,
try {
const port = flags.port
const maxFileSize = await api.consts.storage.maxDataObjectSize.toNumber()
const tempFileUploadingDir = path.join(flags.uploads, TempDirName)
logger.debug(`Max file size runtime parameter: ${maxFileSize}`)

const app = await createApp({
Expand All @@ -303,7 +316,7 @@ Supported values: warn, error, debug, info. Default:debug`,
workerId,
maxFileSize,
uploadsDir: flags.uploads,
tempFileUploadingDir,
tempFileUploadingDir: tempFolder,
pendingDataObjectsDir,
acceptPendingObjectsService,
process: this.config,
Expand Down Expand Up @@ -418,23 +431,17 @@ async function runCleanupWithInterval(
}

/**
* Removes and recreates the temporary directory from the uploading directory.
* All files in the temp directory are deleted.
* Creates the temporary directory.
* If folder exists, all files with extension `.temp` are deleted.
*
* @param uploadsDirectory - data uploading directory
* @param tempDirName - temporary directory name within the uploading directory
* @returns void promise.
*/
async function recreateTempDirectory(uploadsDirectory: string, tempDirName: string): Promise<void> {
async function createTempDirectory(tempDirName: string): Promise<void> {
try {
const tempFileUploadingDir = path.join(uploadsDirectory, tempDirName)

logger.info(`Removing temp directory ...`)
const rimrafAsync = promisify(rimraf)
await rimrafAsync(tempFileUploadingDir)

logger.info(`Creating temp directory ...`)
await fsPromises.mkdir(tempFileUploadingDir)
await fsPromises.mkdir(tempDirName)
} catch (err) {
logger.error(`Temp directory IO error: ${err}`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import { performCleanup } from '../../services/sync/cleanupService'
/**
* CLI command:
* Prunes outdated data objects: removes all the local stored data objects that the operator is no longer obliged to store.
* storage.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:cleanup"
* Shell command: "util:cleanup"
*/
export default class DevCleanup extends Command {
export default class Cleanup extends Command {
static description = `Runs the data objects cleanup/pruning workflow. It removes all the local stored data objects that the operator is no longer obliged to store`

static flags = {
Expand Down Expand Up @@ -48,7 +46,7 @@ export default class DevCleanup extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevCleanup)
const { flags } = this.parse(Cleanup)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Cleanup...')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { Command, flags } from '@oclif/command'
import stringify from 'fast-safe-stringify'
import logger from '../../services/logger'
import { QueryNodeApi } from '../../services/queryNode/api'
import { performSync } from '../../services/sync/synchronizer'
import { QueryNodeApi } from '../..//services/queryNode/api'
import logger from '../../services/logger'
import stringify from 'fast-safe-stringify'
import path from 'path'

/**
* CLI command:
* Synchronizes data: fixes the difference between node obligations and local
* storage.
* Fetch all data objects from a bucket into local store.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:sync"
* Should not be executed while server is running.
* Shell command: "util:fetch-bucket"
*/
export default class DevSync extends Command {
static description =
'Synchronizes the data - it fixes the differences between local data folder and worker ID obligations from the runtime.'
export default class FetchBucket extends Command {
static description = 'Downloads all data objects of specified bucket, that matches worker ID obligations.'

static flags = {
help: flags.help({ char: 'h' }),
Expand All @@ -27,10 +26,10 @@ export default class DevSync extends Command {
bucketId: flags.integer({
char: 'b',
required: true,
description: 'The buckerId to sync',
description: 'The buckerId to fetch',
}),
syncWorkersNumber: flags.integer({
char: 'p',
char: 'n',
required: false,
description: 'Sync workers number (max async operations in progress).',
default: 20,
Expand All @@ -44,27 +43,30 @@ export default class DevSync extends Command {
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
default: 'http://localhost:8081/graphql',
description: 'Query node endpoint (e.g.: http://some.com:8081/graphql)',
default: 'https://query.joystream.org/graphql',
description: 'Query node endpoint (e.g.: https://query.joystream.org/graphql)',
}),
dataSourceOperatorUrl: flags.string({
char: 'o',
required: false,
description: 'Storage node url base (e.g.: http://some.com:3333) to get data from.',
default: 'http://localhost:3333',
}),
uploads: flags.string({
char: 'd',
required: true,
description: 'Data uploading directory (absolute path).',
}),
tempFolder: flags.string({
description:
'Directory to store tempory files during sync and upload (absolute path).\n,Temporary directory (absolute path). If not specified a subfolder under the uploads directory will be used.',
}),
}

async run(): Promise<void> {
const { flags } = this.parse(DevSync)
const { flags } = this.parse(FetchBucket)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Syncing...')
logger.info('Fetching bucket...')

try {
await performSync(
Expand All @@ -75,6 +77,7 @@ export default class DevSync extends Command {
flags.syncWorkersTimeout,
qnApi,
flags.uploads,
flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'),
flags.dataSourceOperatorUrl
)
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import { print } from '../../services/helpers/stdout'
* format.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:multihash"
* Shell command: "util:multihash"
*/
export default class DevMultihash extends Command {
export default class Multihash extends Command {
static description = 'Creates a multihash (blake3) for a file.'

static flags = {
Expand All @@ -25,7 +24,7 @@ export default class DevMultihash extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevMultihash)
const { flags } = this.parse(Multihash)

logger.info(`Hashing ${flags.file} ....`)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import { customFlags } from '../../command-base/CustomFlags'
* Verifies supported bag ID types in the string format.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:verify-bag-id"
*/
export default class DevVerifyBagId extends Command {
export default class VerifyBagId extends Command {
static description = 'The command verifies bag id supported by the storage node. Requires chain connection.'

static flags = {
Expand All @@ -21,7 +20,7 @@ export default class DevVerifyBagId extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevVerifyBagId)
const { flags } = this.parse(VerifyBagId)

logger.info(`Parsed: ${flags.bagId}`)
}
Expand Down
24 changes: 6 additions & 18 deletions storage-node/src/services/caching/localDataObjects.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import AwaitLock from 'await-lock'
import fs from 'fs'
import path from 'path'
import logger from '../logger'
const fsPromises = fs.promises

Expand Down Expand Up @@ -32,23 +31,11 @@ export async function getDataObjectIDs(): Promise<string[]> {
* @returns empty promise.
*
* @param uploadDir - uploading directory
* @param tempDirName - temp directory name
*/
export async function loadDataObjectIdCache(
uploadDir: string,
tempDirName: string,
pendingDirName: string
): Promise<void> {
export async function loadDataObjectIdCache(uploadDir: string): Promise<void> {
await lock.acquireAsync()

const localIds = await getLocalFileNames(uploadDir)
// Filter temporary & pending directory name.
const tempDirectoryName = path.parse(tempDirName).name
const pendingDirectoryName = path.parse(pendingDirName).name
const ids = localIds.filter(
(dataObjectId) => dataObjectId !== tempDirectoryName && dataObjectId !== pendingDirectoryName
)

const ids = await getLocalFileNames(uploadDir)
ids.forEach((id) => idCache.set(id, 0))
logger.debug(`Local ID cache loaded.`)

Expand Down Expand Up @@ -132,10 +119,11 @@ export async function getDataObjectIdFromCache(
}

/**
* Returns file names from the local directory.
* Returns file names from the local directory, ignoring subfolders.
*
* @param directory - local directory to get file names from
*/
function getLocalFileNames(directory: string): Promise<string[]> {
return fsPromises.readdir(directory)
async function getLocalFileNames(directory: string): Promise<string[]> {
const result = await fsPromises.readdir(directory, { withFileTypes: true })
return result.filter((entry) => entry.isFile()).map((entry) => entry.name)
}
2 changes: 1 addition & 1 deletion storage-node/src/services/sync/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class DownloadFileTask implements SyncTask {
const filepath = path.join(this.uploadsDirectory, this.dataObjectId)
// We create tempfile first to mitigate partial downloads on app (or remote node) crash.
// This partial downloads will be cleaned up during the next sync iteration.
const tempFilePath = path.join(this.uploadsDirectory, this.tempDirectory, uuidv4())
const tempFilePath = path.join(this.tempDirectory, uuidv4())
try {
const timeoutMs = this.downloadTimeout * 60 * 1000
// Casting because of:
Expand Down

0 comments on commit 06b200b

Please sign in to comment.