-
Notifications
You must be signed in to change notification settings - Fork 14
/
import.js
93 lines (85 loc) · 3 KB
/
import.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import repositoryMiddleware from '../services/repositoryMiddleware';
import {
startImport,
getLoader,
getStreamFromUrl,
getCustomLoader,
getStreamFromText,
} from '../services/import';
import { clearChunks, mergeChunks } from '../services/fsHelpers';
import { saveParsedStream } from '../services/saveParsedStream';
import publishDocuments from '../services/publishDocuments';
import publishFacets from '../controller/api/publishFacets';
import saveStream from '../services/saveStream';
import { CancelWorkerError, cleanWaitingJobsOfType } from '.';
export const IMPORT = 'import';
const listeners = [];
function getEzsMessageError(string) {
if (string.includes('Line')) {
return string.substring(0, string.indexOf('Line'));
} else if (string.includes('<SyntaxError')) {
return string.substring(0, string.indexOf('<SyntaxError'));
} else {
return string;
}
}
export const processImport = (job, done) => {
cleanWaitingJobsOfType(job.data.tenant, IMPORT);
startJobImport(job)
.then(async () => {
job.progress(100);
const isFailed = await job.isFailed();
notifyListeners(`${job.data.tenant}-import`, {
isImporting: false,
success: !isFailed,
});
done();
})
.catch((err) => {
handleImportError(job, err);
done(err);
});
};
const startJobImport = async (job) => {
notifyListeners(`${job.data.tenant}-import`, {
isImporting: true,
success: false,
});
const ctx = await prepareContext({ job });
await startImport(ctx);
};
const handleImportError = async (job, err) => {
const ctx = await prepareContext({ job });
if (err instanceof CancelWorkerError) {
await ctx.dataset.drop();
}
// very useful for identifying the origin of production errors.
console.warn('handleImportError', err);
notifyListeners(`${job.data.tenant}-import`, {
isImporting: false,
success: false,
message:
err instanceof CancelWorkerError
? 'cancelled_import'
: getEzsMessageError(err.message), // Ezs return all stack trace, we only want the message part. So we split on 'Line'
});
};
const prepareContext = async (ctx) => {
ctx.tenant = ctx.job.data.tenant;
await repositoryMiddleware(ctx, () => Promise.resolve());
ctx.getLoader = getLoader;
ctx.getCustomLoader = getCustomLoader;
ctx.getStreamFromUrl = getStreamFromUrl;
ctx.getStreamFromText = getStreamFromText;
ctx.mergeChunks = mergeChunks;
ctx.clearChunks = clearChunks;
ctx.saveStream = saveStream;
ctx.saveParsedStream = saveParsedStream;
ctx.publishDocuments = publishDocuments;
ctx.publishFacets = publishFacets;
return ctx;
};
export const addImportListener = (listener) => listeners.push(listener);
export const notifyListeners = (room, payload) => {
listeners.forEach((listener) => listener({ room, data: payload }));
};