Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temp folder refactor #5000

Merged
merged 7 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docker-compose-no-bind-volumes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ services:
# - OTEL_RESOURCE_ATTRIBUTES=service.name=colossus-1,deployment.environment=production
entrypoint: ['yarn']
command: [
'start', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data',
'start', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data/uploads',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-1:
Expand Down Expand Up @@ -103,11 +104,12 @@ services:
- ACCOUNT_URI=${COLOSSUS_2_TRANSACTOR_URI}
entrypoint: ['yarn', 'storage-node']
command: [
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data',
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data/uploads',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-2:
Expand Down
10 changes: 6 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ services:
- OTEL_RESOURCE_ATTRIBUTES=service.name=colossus-1,deployment.environment=production
entrypoint: ['/joystream/entrypoints/storage.sh']
command: [
'server', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data',
'server', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data/uploads/',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-1:
Expand Down Expand Up @@ -106,11 +107,12 @@ services:
- ACCOUNT_URI=${COLOSSUS_2_TRANSACTOR_URI}
entrypoint: ['yarn', 'storage-node']
command: [
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data',
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data/uploads',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-2:
Expand Down
3 changes: 3 additions & 0 deletions storage-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@
},
"operator": {
"description": "Storage provider(operator) commands."
},
"util": {
"description": "Useful utility commands."
}
}
},
Expand Down
79 changes: 42 additions & 37 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { flags } from '@oclif/command'
import { ApiPromise } from '@polkadot/api'
import { KeyringPair } from '@polkadot/keyring/types'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import fs from 'fs'
import sleep from 'sleep-promise'
import _ from 'lodash'
import path from 'path'
import rimraf from 'rimraf'
import sleep from 'sleep-promise'
import { promisify } from 'util'
import ApiCommandBase from '../command-base/ApiCommandBase'
import fs from 'fs'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import { KeyringPair } from '@polkadot/keyring/types'
import { customFlags } from '../command-base/CustomFlags'
import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
import logger, { DatePatternByFrequency, Frequency, initNewLogger } from '../services/logger'
Expand All @@ -23,6 +20,7 @@ import { getStorageBucketIdsByWorkerId } from '../services/sync/storageObligatio
import { PendingDirName, performSync, TempDirName } from '../services/sync/synchronizer'
import { createApp } from '../services/webApi/app'
import ExitCodes from './../command-base/ExitCodes'
import ApiCommandBase from '../command-base/ApiCommandBase'
import { v4 as uuidv4 } from 'uuid'
const fsPromises = fs.promises

Expand Down Expand Up @@ -53,6 +51,10 @@ export default class Server extends ApiCommandBase {
required: true,
description: 'Data uploading directory (absolute path).',
}),
tempFolder: flags.string({
description:
'Directory to store tempory files during sync and upload (absolute path).\nIf not specified a subfolder under the uploads directory will be used.',
}),
port: flags.integer({
char: 'o',
required: true,
Expand Down Expand Up @@ -158,11 +160,19 @@ Supported values: warn, error, debug, info. Default:debug`,
async run(): Promise<void> {
const { flags } = this.parse(Server)

const logSource = `StorageProvider_${flags.worker}`
const api = await this.getApi()

if (flags.dev) {
await this.ensureDevelopmentChain()
}

if (flags.logFilePath && path.relative(flags.logFilePath, flags.uploads) === '') {
this.error('Paths for logs and uploads must be unique.')
}
Comment on lines +169 to +171
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (flags.logFilePath && path.relative(flags.logFilePath, flags.uploads) === '') {
this.error('Paths for logs and uploads must be unique.')
}
if (flags.logFilePath && path.relative(flags.logFilePath, flags.uploads) === '..') {
this.error('Paths for logs and uploads must be unique.')
}

Shouldn't it be path.relative(flags.logFilePath, flags.uploads) === '..', I tested the path.relative function as path.relative('/uploads/temp', '/uploads') and I got .. as return value

Copy link
Member Author

@mnaamani mnaamani Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is was only trying to protect against the two paths being the same location. I was not concerned with temp or logs being subfolders of the main uploads directory. That is why I was comparing with '' null string.

Originally I was doing if (lags.logFilePath === flags.uploads) { } but I decided to use path.relative() to take into account trailing / path separator..

Given that I added proper filtering of filenames when they are loaded from uploads directory, and if we assume the file names of logs and temp files will not clash with the object id name, perhaps this constraint is not really necessary at all?


if (!_.isEmpty(flags.elasticSearchEndpoint) || !_.isEmpty(flags.logFilePath)) {
initNewLogger({
elasticSearchlogSource: logSource,
elasticSearchlogSource: `StorageProvider_${flags.worker}`,
elasticSearchEndpoint: flags.elasticSearchEndpoint,
elasticSearchIndexPrefix: flags.elasticSearchIndexPrefix,
elasticSearchUser: flags.elasticSearchUser,
Expand All @@ -176,8 +186,6 @@ Supported values: warn, error, debug, info. Default:debug`,

logger.info(`Query node endpoint set: ${flags.queryNodeEndpoint}`)

const api = await this.getApi()

const workerId = flags.worker

if (!(await verifyWorkerId(api, workerId))) {
Expand Down Expand Up @@ -220,16 +228,26 @@ Supported values: warn, error, debug, info. Default:debug`,
const enableUploadingAuth = false
const operatorRoleKey = undefined

await recreateTempDirectory(flags.uploads, TempDirName)

if (fs.existsSync(flags.uploads)) {
await loadDataObjectIdCache(flags.uploads, TempDirName, PendingDirName)
if (!flags.tempFolder) {
logger.warn(
'You did not specify a path to the temporary directory. ' +
'A temp folder under the uploads folder willl be used. ' +
'In a future release passing an absolute path to a temporary directory with the ' +
'"tempFolder" argument will be required.'
)
}

if (flags.dev) {
await this.ensureDevelopmentChain()
const tempFolder = flags.tempFolder || path.join(flags.uploads, TempDirName)

if (path.relative(tempFolder, flags.uploads) === '') {
this.error('Paths for temporary and uploads folders must be unique.')
Comment on lines +242 to +243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (path.relative(tempFolder, flags.uploads) === '') {
this.error('Paths for temporary and uploads folders must be unique.')
if (path.relative(tempFolder, flags.uploads) === '..') {
this.error('Paths for temporary and uploads folders must be unique.')

Please see the other comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented here #5000 (comment)

}

await createDirectory(flags.uploads)
await loadDataObjectIdCache(flags.uploads)

await createDirectory(tempFolder)

const X_HOST_ID = uuidv4()

const pendingDataObjectsDir = path.join(flags.uploads, PendingDirName)
Expand Down Expand Up @@ -259,7 +277,7 @@ Supported values: warn, error, debug, info. Default:debug`,
selectedBuckets,
qnApi,
flags.uploads,
TempDirName,
tempFolder,
flags.syncWorkersNumber,
flags.syncWorkersTimeout,
flags.syncInterval,
Expand Down Expand Up @@ -297,7 +315,6 @@ Supported values: warn, error, debug, info. Default:debug`,
try {
const port = flags.port
const maxFileSize = await api.consts.storage.maxDataObjectSize.toNumber()
const tempFileUploadingDir = path.join(flags.uploads, TempDirName)
logger.debug(`Max file size runtime parameter: ${maxFileSize}`)

const app = await createApp({
Expand All @@ -308,7 +325,7 @@ Supported values: warn, error, debug, info. Default:debug`,
workerId,
maxFileSize,
uploadsDir: flags.uploads,
tempFileUploadingDir,
tempFileUploadingDir: tempFolder,
pendingDataObjectsDir,
acceptPendingObjectsService,
process: this.config,
Expand Down Expand Up @@ -426,26 +443,14 @@ async function runCleanupWithInterval(
}

/**
* Removes and recreates the temporary directory from the uploading directory.
* All files in the temp directory are deleted.
* Creates a directory recursivly. Like `mkdir -p`
*
* @param uploadsDirectory - data uploading directory
* @param tempDirName - temporary directory name within the uploading directory
* @param tempDirName - full path to temporary directory
* @returns void promise.
*/
async function recreateTempDirectory(uploadsDirectory: string, tempDirName: string): Promise<void> {
try {
const tempFileUploadingDir = path.join(uploadsDirectory, tempDirName)

logger.info(`Removing temp directory ...`)
const rimrafAsync = promisify(rimraf)
await rimrafAsync(tempFileUploadingDir)

logger.info(`Creating temp directory ...`)
await fsPromises.mkdir(tempFileUploadingDir)
} catch (err) {
logger.error(`Temp directory IO error: ${err}`)
}
async function createDirectory(dirName: string): Promise<void> {
logger.info(`Creating directory ${dirName}`)
await fsPromises.mkdir(dirName, { recursive: true })
}

async function verifyWorkerId(api: ApiPromise, workerId: number): Promise<boolean> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import { performCleanup } from '../../services/sync/cleanupService'
/**
* CLI command:
* Prunes outdated data objects: removes all the local stored data objects that the operator is no longer obliged to store.
* storage.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:cleanup"
* Shell command: "util:cleanup"
*/
export default class DevCleanup extends Command {
export default class Cleanup extends Command {
static description = `Runs the data objects cleanup/pruning workflow. It removes all the local stored data objects that the operator is no longer obliged to store`

static flags = {
Expand Down Expand Up @@ -48,7 +46,7 @@ export default class DevCleanup extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevCleanup)
const { flags } = this.parse(Cleanup)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Cleanup...')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { Command, flags } from '@oclif/command'
import stringify from 'fast-safe-stringify'
import logger from '../../services/logger'
import { QueryNodeApi } from '../../services/queryNode/api'
import { performSync } from '../../services/sync/synchronizer'
import { QueryNodeApi } from '../..//services/queryNode/api'
import logger from '../../services/logger'
import stringify from 'fast-safe-stringify'
import path from 'path'

/**
* CLI command:
* Synchronizes data: fixes the difference between node obligations and local
* storage.
* Fetch all data objects from a bucket into local store.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:sync"
* Should not be executed while server is running.
* Shell command: "util:fetch-bucket"
*/
export default class DevSync extends Command {
static description =
'Synchronizes the data - it fixes the differences between local data folder and worker ID obligations from the runtime.'
export default class FetchBucket extends Command {
static description = 'Downloads all data objects of specified bucket, that matches worker ID obligations.'

static flags = {
help: flags.help({ char: 'h' }),
Expand All @@ -27,10 +26,10 @@ export default class DevSync extends Command {
bucketId: flags.integer({
char: 'b',
required: true,
description: 'The buckerId to sync',
description: 'The buckerId to fetch',
}),
syncWorkersNumber: flags.integer({
char: 'p',
char: 'n',
required: false,
description: 'Sync workers number (max async operations in progress).',
default: 20,
Expand All @@ -44,27 +43,30 @@ export default class DevSync extends Command {
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
default: 'http://localhost:8081/graphql',
description: 'Query node endpoint (e.g.: http://some.com:8081/graphql)',
default: 'https://query.joystream.org/graphql',
description: 'Query node endpoint (e.g.: https://query.joystream.org/graphql)',
}),
dataSourceOperatorUrl: flags.string({
char: 'o',
required: false,
description: 'Storage node url base (e.g.: http://some.com:3333) to get data from.',
default: 'http://localhost:3333',
}),
uploads: flags.string({
char: 'd',
required: true,
description: 'Data uploading directory (absolute path).',
}),
tempFolder: flags.string({
description:
'Directory to store tempory files during sync and upload (absolute path).\n,Temporary directory (absolute path). If not specified a subfolder under the uploads directory will be used.',
}),
}

async run(): Promise<void> {
const { flags } = this.parse(DevSync)
const { flags } = this.parse(FetchBucket)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Syncing...')
logger.info('Fetching bucket...')

try {
await performSync(
Expand All @@ -74,6 +76,7 @@ export default class DevSync extends Command {
flags.syncWorkersTimeout,
qnApi,
flags.uploads,
flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'),
'',
flags.dataSourceOperatorUrl
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import { print } from '../../services/helpers/stdout'
* format.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:multihash"
* Shell command: "util:multihash"
*/
export default class DevMultihash extends Command {
export default class Multihash extends Command {
static description = 'Creates a multihash (blake3) for a file.'

static flags = {
Expand All @@ -25,7 +24,7 @@ export default class DevMultihash extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevMultihash)
const { flags } = this.parse(Multihash)

logger.info(`Hashing ${flags.file} ....`)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import { customFlags } from '../../command-base/CustomFlags'
* Verifies supported bag ID types in the string format.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:verify-bag-id"
*/
export default class DevVerifyBagId extends Command {
export default class VerifyBagId extends Command {
static description = 'The command verifies bag id supported by the storage node. Requires chain connection.'

static flags = {
Expand All @@ -21,7 +20,7 @@ export default class DevVerifyBagId extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevVerifyBagId)
const { flags } = this.parse(VerifyBagId)

logger.info(`Parsed: ${flags.bagId}`)
}
Expand Down
Loading
Loading