-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreceiver.js
106 lines (90 loc) · 3.03 KB
/
receiver.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
const corelink = require('corelink-client');
const { spawn } = require('child_process');
const sharp = require('sharp');
const tf = require('@tensorflow/tfjs-node');
const fs = require('fs');
const config = {
ControlPort: 20012,
ControlIP: 'corelink.hpc.nyu.edu',
autoReconnect: false,
};
const username = 'Testuser';
const password = 'Testpassword';
const workspace = 'Fenton';
const protocol = 'ws';
const datatype = 'distance';
const fileParts = {};
let allFrames = [];
counter = 0;
const run = async () => {
if (await corelink.connect({ username, password }, config).catch((err) => { console.log(err) })) {
const receiver = await corelink.createReceiver({
workspace,
protocol,
type: datatype,
echo: true,
alert: true,
}).catch((err) => { console.log(err) });
corelink.on('receiver', async (data) => {
const options = { streamIDs: [data.streamID] };
await corelink.subscribe(options);
console.log('Receiver and sender connected, subscribing to data.');
});
corelink.on('data', (streamID, data) => {
const frameNumber = data[0];
const sliceIndex = data[1];
const totalSlices = data[2];
const content = data.slice(3);
console.log(`Frame number ${frameNumber} and slice number ${sliceIndex}`)
if (!fileParts[frameNumber]) {
fileParts[frameNumber] = new Array(totalSlices).fill(null);
}
fileParts[frameNumber][sliceIndex] = content;
// Check if all parts are received
if (fileParts[frameNumber].every(part => part !== null)) {
const fullFile = Buffer.concat(fileParts[frameNumber]);
console.log(`Frame ${frameNumber} reassembled.`);
// Process the reassembled frame
processFrame(fullFile);
}
});
}
};
const processFrame = (frameBuffer) => {
sharp(frameBuffer)
.raw()
.toBuffer((err, data, info) => {
if (err) {
console.error('Error processing frame:', err);
return;
}
const frame = tf.tensor3d(new Uint8Array(data), [info.height, info.width, info.channels]);
allFrames.push(frame);
// Check for a condition to finalize TIFF
// Replace with the actual condition to check when all frames are received
if (/* condition to finalize */) {
saveAsTiff(allFrames, 'output.tiff');
allFrames = [];
callFiolaPipeline('output.tiff');
}
});
};
const saveAsTiff = async (frames, outputPath) => {
const imageTensor = tf.stack(frames);
const buffer = await tf.node.encodeTiff(imageTensor);
fs.writeFileSync(outputPath, buffer);
console.log(`TIFF image saved at ${outputPath}`);
};
const callFiolaPipeline = (tiffPath) => {
const pythonProcess = spawn('python', ['fiola_pipeline.py', tiffPath]);
pythonProcess.stdout.on('data', (data) => {
console.log(`Python output: ${data}`);
});
pythonProcess.stderr.on('data', (data) => {
console.error(`Python error: ${data}`);
});
pythonProcess.on('close', (code) => {
console.log(`Python script finished with code ${code}`);
});
};
run();