From 86e59e31fc2726e438bca63c784d2083199d06ce Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 28 Jun 2023 14:16:42 +0700 Subject: [PATCH] UBER-536: Fix test stability Signed-off-by: Andrey Sobolev --- dev/tool/src/index.ts | 7 +- models/all/src/version.json | 2 +- server/backup/src/index.ts | 313 ++++++++++++++++++------------------ tests/docker-compose.yaml | 3 - tests/prepare.sh | 2 + tests/setup-elastic.sh | 26 --- tests/wait-elastic.sh | 17 ++ 7 files changed, 183 insertions(+), 187 deletions(-) delete mode 100755 tests/setup-elastic.sh create mode 100755 tests/wait-elastic.sh diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 8c1f4419884..a76e9f8db2e 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -357,7 +357,12 @@ export function devTool ( .action(async (email: string, cmd) => { const { mongodbUri } = prepareTools() return await withDatabase(mongodbUri, async (db) => { - await confirmEmail(db, email) + const account = await getAccount(db, email) + if (account?.confirmed === true) { + console.log(`Already confirmed:${email}`) + } else { + await confirmEmail(db, email) + } }) }) diff --git a/models/all/src/version.json b/models/all/src/version.json index 253bbe8b6dc..a83e1615d2a 100644 --- a/models/all/src/version.json +++ b/models/all/src/version.json @@ -1 +1 @@ -{ "major": 0, "minor": 6, "patch": 108 } +{ "major": 0, "minor": 6, "patch": 109 } diff --git a/server/backup/src/index.ts b/server/backup/src/index.ts index d0179841e04..bb0f3f41ef0 100644 --- a/server/backup/src/index.ts +++ b/server/backup/src/index.ts @@ -625,193 +625,194 @@ export async function restore ( model: 'upgrade' })) as unknown as CoreClient & BackupClient - async function processDomain (c: Domain): Promise { - try { - const changeset = await loadDigest(storage, snapshots, c, date) - // We need to load full changeset from server - const serverChangeset = new Map, string>() - - let idx: number | undefined - let loaded = 0 - let last = 0 - let el = 0 - let chunks = 0 - while (true) { - const st = Date.now() - const it = await connection.loadChunk(c, idx) - chunks++ - - idx = it.idx - el += Date.now() - st - - for (const [_id, hash] of Object.entries(it.docs)) { - serverChangeset.set(_id as Ref, hash) - loaded++ - } + async function processDomain (c: Domain): Promise { + const changeset = await loadDigest(storage, snapshots, c, date) + // We need to load full changeset from server + const serverChangeset = new Map, string>() + + let idx: number | undefined + let loaded = 0 + let last = 0 + let el = 0 + let chunks = 0 + while (true) { + const st = Date.now() + const it = await connection.loadChunk(c, idx) + chunks++ + + idx = it.idx + el += Date.now() - st + + for (const [_id, hash] of Object.entries(it.docs)) { + serverChangeset.set(_id as Ref, hash) + loaded++ + } - const mr = Math.round(loaded / 10000) - if (mr !== last) { - last = mr - console.log(' loaded from server', loaded, el, chunks) - el = 0 - chunks = 0 - } - if (it.finished) { - break - } + const mr = Math.round(loaded / 10000) + if (mr !== last) { + last = mr + console.log(' loaded from server', loaded, el, chunks) + el = 0 + chunks = 0 } - console.log(' loaded', loaded) - console.log('\tcompare documents', changeset.size, serverChangeset.size) - - // Let's find difference - const docsToAdd = new Map( - Array.from(changeset.entries()).filter( - ([it]) => - !serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it)) - ) - ) - const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it)) - - const docs: Doc[] = [] - const blobs = new Map() - let sendSize = 0 - let totalSend = 0 - async function sendChunk (doc: Doc | undefined, len: number): Promise { - if (doc !== undefined) { - docsToAdd.delete(doc._id) - docs.push(doc) - } - sendSize = sendSize + len - if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) { - console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize) - totalSend += docs.length - await connection.upload(c, docs) - docs.length = 0 - sendSize = 0 - } + if (it.finished) { + break } - let processed = 0 + } + console.log(' loaded', loaded) + console.log('\tcompare documents', changeset.size, serverChangeset.size) - for (const s of rsnapshots) { - const d = s.domains[c] - - if (d !== undefined && docsToAdd.size > 0) { - const sDigest = await loadDigest(storage, [s], c) - const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) - if (requiredDocs.size > 0) { - console.log('updating', c, requiredDocs.size) - // We have required documents here. - for (const sf of d.storage ?? []) { - if (docsToAdd.size === 0) { - break - } - console.log('processing', sf, processed) - - const readStream = await storage.load(sf) - const ex = extract() - - ex.on('entry', (headers, stream, next) => { - const name = headers.name ?? '' - processed++ - // We found blob data - if (requiredDocs.has(name as Ref)) { - const chunks: Buffer[] = [] - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks) + // Let's find difference + const docsToAdd = new Map( + Array.from(changeset.entries()).filter( + ([it]) => !serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it)) + ) + ) + const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it)) + + const docs: Doc[] = [] + const blobs = new Map() + let sendSize = 0 + let totalSend = 0 + async function sendChunk (doc: Doc | undefined, len: number): Promise { + if (doc !== undefined) { + docsToAdd.delete(doc._id) + docs.push(doc) + } + sendSize = sendSize + len + if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) { + console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize) + totalSend += docs.length + await connection.upload(c, docs) + docs.length = 0 + sendSize = 0 + } + } + let processed = 0 + + for (const s of rsnapshots) { + const d = s.domains[c] + + if (d !== undefined && docsToAdd.size > 0) { + const sDigest = await loadDigest(storage, [s], c) + const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) + if (requiredDocs.size > 0) { + console.log('updating', c, requiredDocs.size) + // We have required documents here. + for (const sf of d.storage ?? []) { + if (docsToAdd.size === 0) { + break + } + console.log('processing', sf, processed) + + const readStream = await storage.load(sf) + const ex = extract() + + ex.on('entry', (headers, stream, next) => { + const name = headers.name ?? '' + processed++ + // We found blob data + if (requiredDocs.has(name as Ref)) { + const chunks: Buffer[] = [] + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks) + const d = blobs.get(name) + if (d === undefined) { + blobs.set(name, { doc: undefined, buffer: bf }) + next() + } else { const d = blobs.get(name) + blobs.delete(name) + const doc = d?.doc as BlobData + doc.base64Data = bf.toString('base64') ?? '' + sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + }) + } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { + const chunks: Buffer[] = [] + const bname = name.substring(0, name.length - 5) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks) + const doc = JSON.parse(bf.toString()) as Doc + if (doc._class === core.class.BlobData) { + const d = blobs.get(bname) if (d === undefined) { - blobs.set(name, { doc: undefined, buffer: bf }) + blobs.set(bname, { doc, buffer: undefined }) next() } else { - const d = blobs.get(name) - blobs.delete(name) - const doc = d?.doc as BlobData - doc.base64Data = bf.toString('base64') ?? '' - sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - }) - } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { - const chunks: Buffer[] = [] - const bname = name.substring(0, name.length - 5) - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks) - const doc = JSON.parse(bf.toString()) as Doc - if (doc._class === core.class.BlobData) { const d = blobs.get(bname) - if (d === undefined) { - blobs.set(bname, { doc, buffer: undefined }) - next() - } else { - const d = blobs.get(bname) - blobs.delete(bname) - ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' - sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - } else { + blobs.delete(bname) + ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' sendChunk(doc, bf.length).finally(() => { requiredDocs.delete(doc._id) next() }) } - }) - } else { - next() - } - stream.resume() // just auto drain the stream - }) - - const endPromise = new Promise((resolve) => { - ex.on('finish', () => { - resolve(null) + } else { + sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } }) + } else { + next() + } + stream.resume() // just auto drain the stream + }) + + const endPromise = new Promise((resolve) => { + ex.on('finish', () => { + resolve(null) }) - const unzip = createGunzip() - readStream.pipe(unzip) - unzip.pipe(ex) + }) + const unzip = createGunzip() + readStream.pipe(unzip) + unzip.pipe(ex) - await endPromise - } - } else { - console.log('domain had no changes', c) + await endPromise } + } else { + console.log('domain had no changes', c) } } + } - await sendChunk(undefined, 0) - if (docsToRemove.length > 0 && merge !== true) { - console.log('cleanup', docsToRemove.length) - while (docsToRemove.length > 0) { - const part = docsToRemove.splice(0, 10000) - await connection.clean(c, part) - } + await sendChunk(undefined, 0) + if (docsToRemove.length > 0 && merge !== true) { + console.log('cleanup', docsToRemove.length) + while (docsToRemove.length > 0) { + const part = docsToRemove.splice(0, 10000) + await connection.clean(c, part) } - return true - } catch (err: any) { - console.log('error', err) - return false } } try { for (const c of domains) { console.log('loading server changeset for', c) - let retry = 3 + let retry = 5 while (retry > 0) { retry-- - if (await processDomain(c)) { + try { + await processDomain(c) break + } catch (err: any) { + if (retry === 0) { + console.log('error', err) + } else { + console.log('Wait for few seconds for elastic') + await new Promise((resolve) => setTimeout(resolve, 1000)) + } } } } diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index b526432f39c..883cd8e8b50 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -18,9 +18,6 @@ services: - 9002:9000 elastic: image: 'elasticsearch:7.14.2' - command: | - /bin/sh -c "./bin/elasticsearch-plugin list | grep -q ingest-attachment || yes | ./bin/elasticsearch-plugin install --silent ingest-attachment; - /usr/local/bin/docker-entrypoint.sh eswrapper" expose: - 9200 ports: diff --git a/tests/prepare.sh b/tests/prepare.sh index 6d53bc50b34..d6b521bbbe8 100755 --- a/tests/prepare.sh +++ b/tests/prepare.sh @@ -4,6 +4,8 @@ docker-compose -p sanity kill docker-compose -p sanity down --volumes docker-compose -p sanity up -d --force-recreate --renew-anon-volumes +./wait-elastic.sh 9201 + # Creae workspace record in accounts ./tool.sh create-workspace sanity-ws -o SanityTest # Create user record in accounts diff --git a/tests/setup-elastic.sh b/tests/setup-elastic.sh deleted file mode 100755 index 1150e15fb8e..00000000000 --- a/tests/setup-elastic.sh +++ /dev/null @@ -1,26 +0,0 @@ -res='' -port=$1 -echo "Warning Elastic to up and running with attachment processor... ${port}" -while true -do - res=$(curl -s -XPUT "localhost:${port}/_ingest/pipeline/attachment?pretty" -H 'Content-Type: application/json' -d' - { - "description" : "Field for processing file attachments", - "processors" : [ - { - "attachment" : { - "field" : "data" - }, - "remove" : { - "field" : "data" - } - } - ] - } - ') - if [[ $res = *"acknowledged"* ]]; then - echo "Elastic processor is up and running..." - exit 0 - fi - sleep 1 -done \ No newline at end of file diff --git a/tests/wait-elastic.sh b/tests/wait-elastic.sh new file mode 100755 index 00000000000..b2ce1e26772 --- /dev/null +++ b/tests/wait-elastic.sh @@ -0,0 +1,17 @@ +res='' +port=$1 +echo "Warning Elastic to up and running with attachment processor... ${port}" +for i in `seq 1 30`; +do + res='qwe' # $(curl -s http://localhost:9200/_cluster/health ) + echo "$res" + if [[ $res = *"yellow"* ]]; then + echo "Elastic up and running..." + exit 0 + fi + if [[ $res = *"green"* ]]; then + echo "Elastic up and running..." + exit 0 + fi + sleep 1 +done \ No newline at end of file