Skip to content

Commit

Permalink
UBER-536: Fix test stability
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo committed Jun 28, 2023
1 parent db53abf commit 86e59e3
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 187 deletions.
7 changes: 6 additions & 1 deletion dev/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,12 @@ export function devTool (
.action(async (email: string, cmd) => {
const { mongodbUri } = prepareTools()
return await withDatabase(mongodbUri, async (db) => {
await confirmEmail(db, email)
const account = await getAccount(db, email)
if (account?.confirmed === true) {
console.log(`Already confirmed:${email}`)
} else {
await confirmEmail(db, email)
}
})
})

Expand Down
2 changes: 1 addition & 1 deletion models/all/src/version.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{ "major": 0, "minor": 6, "patch": 108 }
{ "major": 0, "minor": 6, "patch": 109 }
313 changes: 157 additions & 156 deletions server/backup/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -625,193 +625,194 @@ export async function restore (
model: 'upgrade'
})) as unknown as CoreClient & BackupClient

async function processDomain (c: Domain): Promise<boolean> {
try {
const changeset = await loadDigest(storage, snapshots, c, date)
// We need to load full changeset from server
const serverChangeset = new Map<Ref<Doc>, string>()

let idx: number | undefined
let loaded = 0
let last = 0
let el = 0
let chunks = 0
while (true) {
const st = Date.now()
const it = await connection.loadChunk(c, idx)
chunks++

idx = it.idx
el += Date.now() - st

for (const [_id, hash] of Object.entries(it.docs)) {
serverChangeset.set(_id as Ref<Doc>, hash)
loaded++
}
async function processDomain (c: Domain): Promise<void> {
const changeset = await loadDigest(storage, snapshots, c, date)
// We need to load full changeset from server
const serverChangeset = new Map<Ref<Doc>, string>()

let idx: number | undefined
let loaded = 0
let last = 0
let el = 0
let chunks = 0
while (true) {
const st = Date.now()
const it = await connection.loadChunk(c, idx)
chunks++

idx = it.idx
el += Date.now() - st

for (const [_id, hash] of Object.entries(it.docs)) {
serverChangeset.set(_id as Ref<Doc>, hash)
loaded++
}

const mr = Math.round(loaded / 10000)
if (mr !== last) {
last = mr
console.log(' loaded from server', loaded, el, chunks)
el = 0
chunks = 0
}
if (it.finished) {
break
}
const mr = Math.round(loaded / 10000)
if (mr !== last) {
last = mr
console.log(' loaded from server', loaded, el, chunks)
el = 0
chunks = 0
}
console.log(' loaded', loaded)
console.log('\tcompare documents', changeset.size, serverChangeset.size)

// Let's find difference
const docsToAdd = new Map(
Array.from(changeset.entries()).filter(
([it]) =>
!serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it))
)
)
const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it))

const docs: Doc[] = []
const blobs = new Map<string, { doc: Doc | undefined, buffer: Buffer | undefined }>()
let sendSize = 0
let totalSend = 0
async function sendChunk (doc: Doc | undefined, len: number): Promise<void> {
if (doc !== undefined) {
docsToAdd.delete(doc._id)
docs.push(doc)
}
sendSize = sendSize + len
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
totalSend += docs.length
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
}
if (it.finished) {
break
}
let processed = 0
}
console.log(' loaded', loaded)
console.log('\tcompare documents', changeset.size, serverChangeset.size)

for (const s of rsnapshots) {
const d = s.domains[c]

if (d !== undefined && docsToAdd.size > 0) {
const sDigest = await loadDigest(storage, [s], c)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
if (requiredDocs.size > 0) {
console.log('updating', c, requiredDocs.size)
// We have required documents here.
for (const sf of d.storage ?? []) {
if (docsToAdd.size === 0) {
break
}
console.log('processing', sf, processed)

const readStream = await storage.load(sf)
const ex = extract()

ex.on('entry', (headers, stream, next) => {
const name = headers.name ?? ''
processed++
// We found blob data
if (requiredDocs.has(name as Ref<Doc>)) {
const chunks: Buffer[] = []
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
// Let's find difference
const docsToAdd = new Map(
Array.from(changeset.entries()).filter(
([it]) => !serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it))
)
)
const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it))

const docs: Doc[] = []
const blobs = new Map<string, { doc: Doc | undefined, buffer: Buffer | undefined }>()
let sendSize = 0
let totalSend = 0
async function sendChunk (doc: Doc | undefined, len: number): Promise<void> {
if (doc !== undefined) {
docsToAdd.delete(doc._id)
docs.push(doc)
}
sendSize = sendSize + len
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
totalSend += docs.length
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
}
}
let processed = 0

for (const s of rsnapshots) {
const d = s.domains[c]

if (d !== undefined && docsToAdd.size > 0) {
const sDigest = await loadDigest(storage, [s], c)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
if (requiredDocs.size > 0) {
console.log('updating', c, requiredDocs.size)
// We have required documents here.
for (const sf of d.storage ?? []) {
if (docsToAdd.size === 0) {
break
}
console.log('processing', sf, processed)

const readStream = await storage.load(sf)
const ex = extract()

ex.on('entry', (headers, stream, next) => {
const name = headers.name ?? ''
processed++
// We found blob data
if (requiredDocs.has(name as Ref<Doc>)) {
const chunks: Buffer[] = []
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const d = blobs.get(name)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
next()
} else {
const d = blobs.get(name)
blobs.delete(name)
const doc = d?.doc as BlobData
doc.base64Data = bf.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
const chunks: Buffer[] = []
const bname = name.substring(0, name.length - 5)
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.BlobData) {
const d = blobs.get(bname)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
blobs.set(bname, { doc, buffer: undefined })
next()
} else {
const d = blobs.get(name)
blobs.delete(name)
const doc = d?.doc as BlobData
doc.base64Data = bf.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
const chunks: Buffer[] = []
const bname = name.substring(0, name.length - 5)
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.BlobData) {
const d = blobs.get(bname)
if (d === undefined) {
blobs.set(bname, { doc, buffer: undefined })
next()
} else {
const d = blobs.get(bname)
blobs.delete(bname)
;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
} else {
blobs.delete(bname)
;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else {
next()
}
stream.resume() // just auto drain the stream
})

const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)
} else {
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else {
next()
}
stream.resume() // just auto drain the stream
})

const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)
})
const unzip = createGunzip()
readStream.pipe(unzip)
unzip.pipe(ex)
})
const unzip = createGunzip()
readStream.pipe(unzip)
unzip.pipe(ex)

await endPromise
}
} else {
console.log('domain had no changes', c)
await endPromise
}
} else {
console.log('domain had no changes', c)
}
}
}

await sendChunk(undefined, 0)
if (docsToRemove.length > 0 && merge !== true) {
console.log('cleanup', docsToRemove.length)
while (docsToRemove.length > 0) {
const part = docsToRemove.splice(0, 10000)
await connection.clean(c, part)
}
await sendChunk(undefined, 0)
if (docsToRemove.length > 0 && merge !== true) {
console.log('cleanup', docsToRemove.length)
while (docsToRemove.length > 0) {
const part = docsToRemove.splice(0, 10000)
await connection.clean(c, part)
}
return true
} catch (err: any) {
console.log('error', err)
return false
}
}

try {
for (const c of domains) {
console.log('loading server changeset for', c)
let retry = 3
let retry = 5
while (retry > 0) {
retry--
if (await processDomain(c)) {
try {
await processDomain(c)
break
} catch (err: any) {
if (retry === 0) {
console.log('error', err)
} else {
console.log('Wait for few seconds for elastic')
await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ services:
- 9002:9000
elastic:
image: 'elasticsearch:7.14.2'
command: |
/bin/sh -c "./bin/elasticsearch-plugin list | grep -q ingest-attachment || yes | ./bin/elasticsearch-plugin install --silent ingest-attachment;
/usr/local/bin/docker-entrypoint.sh eswrapper"
expose:
- 9200
ports:
Expand Down
2 changes: 2 additions & 0 deletions tests/prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ docker-compose -p sanity kill
docker-compose -p sanity down --volumes
docker-compose -p sanity up -d --force-recreate --renew-anon-volumes

./wait-elastic.sh 9201

# Creae workspace record in accounts
./tool.sh create-workspace sanity-ws -o SanityTest
# Create user record in accounts
Expand Down
Loading

0 comments on commit 86e59e3

Please sign in to comment.