Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.

Commit 61af831

Browse files
Merge branch 'master' into release/v6.2.5
2 parents 428009d + 382b637 commit 61af831

File tree

8 files changed

+91
-30
lines changed

8 files changed

+91
-30
lines changed

lib/cache/cache_base.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const { promisify } = require('util');
1010
const loki = require('lokijs');
1111
const ReliabilityManager = require('./reliability_manager');
1212
const _ = require('lodash');
13+
const { Writable } = require('stream');
1314

1415
const kDbName = 'cache.db';
1516

@@ -176,7 +177,7 @@ class CacheBase extends EventEmitter {
176177
* @returns {Promise<any>}
177178
*/
178179
createPutTransaction(guid, hash) {
179-
return Promise.reject(new Error("Not implemented"));
180+
return Promise.resolve(new PutTransaction(guid, hash));
180181
}
181182

182183
/**
@@ -300,7 +301,9 @@ class PutTransaction extends EventEmitter {
300301
* @returns {Promise<any>}
301302
*/
302303
getWriteStream(type, size) {
303-
return Promise.reject(new Error("Not implemented"));
304+
return Promise.resolve(new Writable({
305+
write(chunk, encoding, cb){ setImmediate(cb); }
306+
}));
304307
}
305308

306309
async invalidate() {
@@ -313,7 +316,7 @@ class PutTransaction extends EventEmitter {
313316
* @returns {Promise<any>}
314317
*/
315318
async writeFilesToPath(targetPath) {
316-
return Promise.reject(new Error("Not implemented"));
319+
return Promise.resolve();
317320
}
318321
}
319322

lib/server/command_processor.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class CommandProcessor extends Duplex {
206206
if(this._sendFileQueueReadDuration > 0) {
207207
const totalTime = this._sendFileQueueReadDuration / 1000;
208208
const throughput = (this._sendFileQueueReadBytes / totalTime).toFixed(2);
209-
helpers.log(consts.LOG_INFO, `Sent ${this._sendFileQueueSentCount} of ${this._sendFileQueueCount} requested files (${this._sendFileQueueChunkReads} chunks) totaling ${filesize(this._sendFileQueueReadBytes)} in ${totalTime} seconds (${filesize(throughput)}/sec)`);
209+
helpers.log(consts.LOG_INFO, `Sent ${this._sendFileQueueSentCount} of ${this._sendFileQueueCount} requested files (${this._sendFileQueueChunkReads} chunks) totaling ${filesize(this._sendFileQueueReadBytes)} in ${totalTime} seconds (${filesize(throughput)}/sec) to ${this[kSource].clientAddress}`);
210210
}
211211
}
212212

lib/server/server.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ class CacheServer {
5757
return this._server;
5858
}
5959

60+
/**
61+
*
62+
* @returns {Array|*}
63+
*/
64+
get mirrors() {
65+
return this._mirrors;
66+
}
67+
6068
/**
6169
*
6270
* @param {Function} cb
@@ -82,10 +90,7 @@ class CacheServer {
8290
const mirrors = this._mirrors;
8391
if(mirrors.length > 0) {
8492
cmdProc.on('onTransactionEnd', (trx) => {
85-
mirrors.forEach(m => {
86-
if(m.address !== socket.remoteAddress)
87-
m.queueTransaction(trx);
88-
});
93+
mirrors.forEach(m => m.queueTransaction(trx));
8994
});
9095
}
9196

@@ -103,9 +108,7 @@ class CacheServer {
103108
.pipe(socket); // Connect back to socket to send files
104109

105110
socket['commandProcessor'] = cmdProc;
106-
});
107-
108-
this._server.on('error', err => {
111+
}).on('error', err => {
109112
if (err.code === 'EADDRINUSE') {
110113
helpers.log(consts.LOG_ERR, `Port ${this.port} is already in use...`);
111114
}

lib/server/transaction_mirror.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ class TransactionMirror {
3737
return this._connectOptions.host;
3838
}
3939

40+
get port() {
41+
return this._connectOptions.port;
42+
}
43+
4044
_connect() {
4145
helpers.log(consts.LOG_INFO, `[TransactionMirror] Connecting to ${this._connectOptions.host}:${this._connectOptions.port}`);
4246
return this._client.connect();

test/cache_base.js

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const path = require('path');
88
const randomBuffer = require('./test_utils').randomBuffer;
99
const consts = require('../lib/constants');
1010
const sinon = require('sinon');
11+
const { Writable } = require('stream');
1112

1213
describe("Cache: Base Class", () => {
1314
let cache;
@@ -124,9 +125,9 @@ describe("Cache: Base Class", () => {
124125
});
125126

126127
describe("createPutTransaction", () => {
127-
it("should require override implementation in subclasses by returning an error", () => {
128-
return cache.createPutTransaction()
129-
.then(() => { throw new Error("Expected error!"); }, () => {});
128+
it("should return an instance of a PutTransaction", async () => {
129+
const t = await cache.createPutTransaction();
130+
assert.ok(t instanceof PutTransaction);
130131
});
131132
});
132133

@@ -264,16 +265,16 @@ describe("PutTransaction: Base Class", () => {
264265
});
265266

266267
describe("getWriteStream", () => {
267-
it("should require override implementation in subclasses by returning an error", () => {
268-
return trx.getWriteStream(consts.FILE_TYPE.INFO, 0)
269-
.then(() => { throw new Error("Expected error!"); }, () => {});
268+
it("should return a Writable stream", async() => {
269+
const s = await trx.getWriteStream(consts.FILE_TYPE.INFO, 0);
270+
assert.ok(s instanceof Writable);
270271
});
271272
});
272273

273274
describe("writeFilesToPath", () => {
274-
it("should require override implementation in subclasses by returning an error", () => {
275-
return trx.writeFilesToPath()
276-
.then(() => { throw new Error("Expected error!"); }, () => {});
275+
it("should return a promise", () => {
276+
const p = trx.writeFilesToPath();
277+
assert.ok(p instanceof Promise);
277278
});
278279
});
279280
});

test/protocol.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ describe("Protocol", () => {
323323
});
324324

325325
const tests = [
326+
{cmd: cmd.getAsset, blob: self.data.bin, type: 'bin', packetSize: 1},
326327
{cmd: cmd.getAsset, blob: self.data.bin, type: 'bin', packetSize: SMALL_PACKET_SIZE},
327328
{cmd: cmd.getInfo, blob: self.data.info, type: 'info', packetSize: MED_PACKET_SIZE},
328329
{cmd: cmd.getResource, blob: self.data.resource, type: 'resource', packetSize: LARGE_PACKET_SIZE}

test/server.js

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,61 @@ const os = require('os');
66
const helpers = require('../lib/helpers');
77
const consts = require('../lib/constants');
88
const CacheServer = require('../lib/server/server');
9-
const Cache = require('../lib/cache/cache_base').CacheBase;
10-
const sleep = require('./test_utils').sleep;
11-
const cmd = require('./test_utils').cmd;
9+
const CacheBase = require('../lib/cache/cache_base').CacheBase;
10+
const TransactionMirror = require('../lib/server/transaction_mirror');
11+
const { generateCommandData, encodeCommand, clientWrite, sleep, cmd } = require('./test_utils');
12+
const sinon = require('sinon');
1213

13-
const cache = new Cache();
14-
const server = new CacheServer(cache, {port: 0});
14+
const cache = new CacheBase();
1515
let client;
1616

17+
describe("Server constructor", function() {
18+
it("should use the default port if no port is specified in options", () => {
19+
const s = new CacheServer(cache, { mirror:[] });
20+
assert.strictEqual(s.port, consts.DEFAULT_PORT);
21+
});
22+
});
23+
24+
describe("Server mirroring", function() {
25+
const server = new CacheServer(cache, {
26+
port: 0,
27+
mirror: [{host: "127.0.0.1", port: 8126}, {host: "1.2.3.4", port: 8126}, {host: "4.3.2.1", port: 8126}]
28+
});
29+
30+
before(function () {
31+
return server.start(err => { return Promise.reject(err); });
32+
});
33+
34+
after(function() {
35+
server.stop();
36+
});
37+
38+
beforeEach(function (done) {
39+
client = net.connect({port: server.port}, done);
40+
});
41+
42+
afterEach(() => client.end());
43+
44+
it("should mirror transactions to the configured list of mirrors", async () => {
45+
const spies = server.mirrors.map(m => {
46+
return sinon.spy(m, "queueTransaction");
47+
});
48+
49+
const testData = generateCommandData();
50+
51+
const buf = Buffer.from(helpers.encodeInt32(consts.PROTOCOL_VERSION) +
52+
encodeCommand(cmd.transactionStart, testData.guid, testData.hash) +
53+
encodeCommand(cmd.putAsset, null, null, testData.bin) +
54+
encodeCommand(cmd.transactionEnd), 'ascii');
55+
56+
await clientWrite(client, buf);
57+
58+
spies.forEach(s => assert(s.calledOnce));
59+
});
60+
});
61+
1762
describe("Server common", function() {
63+
const server = new CacheServer(cache, {port: 0});
1864

1965
before(function () {
2066
this._defaultErrCallback = err => assert(!err, `Cache Server reported error! ${err}`);
@@ -71,7 +117,7 @@ describe("Server common", function() {
71117
describe("Ipv6", function() {
72118
const ipv6Server = new CacheServer(cache, {port: 0, allowIpv6: true});
73119

74-
before(function () {
120+
before(function () {
75121
const interfaces = os.networkInterfaces();
76122
let ipv6Available = false;
77123
Object.keys(interfaces).forEach(function (interfaceName){
@@ -85,15 +131,15 @@ describe("Server common", function() {
85131
if(!ipv6Available){
86132
console.log("Skipping IPv6 tests because IPv6 is not available on this machine");
87133
this.skip();
88-
}
134+
}
89135

90136
return ipv6Server.start(err => assert(!err, `Cache Server reported error! ${err}`));
91137
});
92-
138+
93139
after(function() {
94140
ipv6Server.stop();
95141
});
96-
142+
97143
it("should bind to ipv6 when allowed", function(done) {
98144
const serverAddress = ipv6Server._server.address();
99145
assert.strictEqual(serverAddress.family, "IPv6");
@@ -108,7 +154,7 @@ describe("Server common", function() {
108154
before(function () {
109155
return ipv4Server.start(err => assert(!err, `Cache Server reported error! ${err}`));
110156
});
111-
157+
112158
after(function() {
113159
ipv4Server.stop();
114160
});

test/unity_cache_server.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ describe("Unity Cache Server bootstrap", () => {
208208
cachePath: tmpPath
209209
}
210210
}
211+
},
212+
Server: {
213+
port: 0
211214
}
212215
});
213216

0 commit comments

Comments
 (0)