v0.13.0
Migration guide from v0.12.x to v0.13.x is here.
Breaking changes
Streaming support
Built-in streaming support has just been implemented. Node.js streams can be transferred as request params
or as response. You can use it to transfer uploaded file from a gateway or encode/decode or compress/decompress streams.
Why is it a breaking change?
Because the protocol has been extended with a new field and it caused a breaking change in schema-based serializators (ProtoBuf, Avro). Therefore, if you use ProtoBuf or Avro, you won't able to communicate with the previous (<=0.12) brokers. Using JSON or MsgPack serializer, there is nothing extra to do.
Examples
Send a file to a service as a stream
const stream = fs.createReadStream(fileName);
broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});
Please note, the params
should be a stream, you cannot add any more variables to the request. Use the meta
property to transfer additional data.
Receiving a stream in a service
module.exports = {
name: "storage",
actions: {
save(ctx) {
const s = fs.createWriteStream(`/tmp/${ctx.meta.filename}`);
ctx.params.pipe(s);
}
}
};
Return a stream as response in a service
module.exports = {
name: "storage",
actions: {
get: {
params: {
filename: "string"
},
handler(ctx) {
return fs.createReadStream(`/tmp/${ctx.params.filename}`);
}
}
}
};
Process received stream on the caller side
const filename = "avatar-123.jpg";
broker.call("storage.get", { filename })
.then(stream => {
const s = fs.createWriteStream(`./${filename}`);
stream.pipe(s);
s.on("close", () => broker.logger.info("File has been received"));
})
AES encode/decode example service
const crypto = require("crypto");
const password = "moleculer";
module.exports = {
name: "aes",
actions: {
encrypt(ctx) {
const encrypt = crypto.createCipher("aes-256-ctr", password);
return ctx.params.pipe(encrypt);
},
decrypt(ctx) {
const decrypt = crypto.createDecipher("aes-256-ctr", password);
return ctx.params.pipe(decrypt);
}
}
};
Better Service & Broker lifecycle handling
The ServiceBroker & Service lifecycle handler logic has already been improved. The reason for amendment was a problem occuring during loading more services locally; they could call each others' actions before started
execution. It generally causes errors if database connecting process started in the started
event handler.
This problem has been fixed with a probable side effect: causing errors (mostly in unit tests) if you call the local services without broker.start()
.
It works in the previous version
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();
broker.loadService("./math.service.js");
broker.call("math.add", { a: 5, b: 3 }).then(res => console.log);
// Prints: 8
From v0.13 it throws a ServiceNotFoundError
exception, because the service is only loaded but not started yet.
Correct logic
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();
broker.loadService("./math.service.js");
broker.start().then(() => {
broker.call("math.add", { a: 5, b: 3 }).then(res => console.log);
// Prints: 8
});
or with await
broker.loadService("./math.service.js");
await broker.start();
const res = await broker.call("math.add", { a: 5, b: 3 });
console.log(res);
// Prints: 8
Similar issue has been fixed at broker shutdown. Previously when you stopped a broker, which while started to stop local services, it still acccepted incoming requests from remote nodes.
The shutdown logic has also been changed. When you call broker.stop
, at first broker publishes an empty service list to remote nodes, so they route the requests to other instances.
Default console logger
No longer need to set logger: console
in broker options, because ServiceBroker uses console
as default logger.
const broker = new ServiceBroker();
// It will print log messages to the console
Disable loggging (e.g. in tests)
const broker = new ServiceBroker({ logger: false });
Changes in internal event sending logic
The $
prefixed internal events will be transferred if they are called by emit
or broadcast
. If you don't want to transfer them, use the broadcastLocal
method.
From v0.13, the
$
prefixed events mean built-in core events instead of internal "only-local" events.
Improved Circuit Breaker
Threshold-based circuit-breaker solution has been implemented. It uses a time window to check the failed request rate. Once the threshold
value is reached, it trips the circuit breaker.
const broker = new ServiceBroker({
nodeID: "node-1",
circuitBreaker: {
enabled: true,
threshold: 0.5,
minRequestCount: 20,
windowTime: 60, // in seconds
halfOpenTime: 5 * 1000,
check: err => err && err.code >= 500
}
});
Instead of failureOnTimeout
and failureOnReject
properties, there is a new check()
function property in the options. It is used by circuit breaker in order to detect which error is considered as a failed request.
You can override these global options in action definition, as well.
module.export = {
name: "users",
actions: {
create: {
circuitBreaker: {
// All CB options can be overwritten from broker options.
threshold: 0.3,
windowTime: 30
},
handler(ctx) {}
}
}
};
CB metrics events removed
The metrics circuit breaker events have been removed due to internal event logic changes.
Use the $circuit-breaker.*
events instead of metrics.circuit-breaker.*
events.
Improved Retry feature (with exponential backoff)
The old retry feature has been improved. Now it uses exponential backoff for retries. The old solution retries the request immediately in failures.
The retry options have also been changed in the broker options. Every option is under the retryPolicy
property.
const broker = new ServiceBroker({
nodeID: "node-1",
retryPolicy: {
enabled: true,
retries: 5,
delay: 100,
maxDelay: 2000,
factor: 2,
check: err => err && !!err.retryable
}
});
Overwrite the retries
value in calling option
The retryCount
calling options has been renamed to retries
.
broker.call("posts.find", {}, { retries: 3 });
There is a new check()
function property in the options. It is used by the Retry middleware in order to detect which error is a failed request and needs a retry. The default function checks the retryable
property of errors.
These global options can be overridden in action definition, as well.
module.export = {
name: "users",
actions: {
find: {
retryPolicy: {
// All Retry policy options can be overwritten from broker options.
retries: 3,
delay: 500
},
handler(ctx) {}
},
create: {
retryPolicy: {
// Disable retries for this action
enabled: false
},
handler(ctx) {}
}
}
};
Changes in context tracker
There are also some changes in context tracker configuration.
const broker = new ServiceBroker({
nodeID: "node-1",
tracking: {
enabled: true,
shutdownTimeout: 5000
}
});
Disable tracking in calling option at calling
broker.call("posts.find", {}, { tracking: false });
The shutdown timeout can be overwritten by $shutdownTimeout
property in service settings.
Removed internal statistics module
The internal statistics module ($node.stats
) is removed. Yet you need it, download from here, load as a service and call the stat.snapshot
to receive the collected statistics.
Renamed errors
Some errors have been renamed in order to follow name conventions.
ServiceNotAvailable
->ServiceNotAvailableError
RequestRejected
->RequestRejectedError
QueueIsFull
->QueueIsFullError
InvalidPacketData
->InvalidPacketDataError
Context nodeID changes
The ctx.callerNodeID
has been removed. The ctx.nodeID
contains the target or caller nodeID. If you need the current nodeID, use ctx.broker.nodeID
.
Enhanced ping method
It returns Promise
with results of ping responses. Moreover, the method is renamed to broker.ping
.
Ping a node with 1sec timeout
broker.ping("node-123", 1000).then(res => broker.logger.info(res));
Output:
{
nodeID: 'node-123',
elapsedTime: 16,
timeDiff: -3
}
Ping all known nodes
broker.ping().then(res => broker.logger.info(res));
Output:
{
"node-100": {
nodeID: 'node-100',
elapsedTime: 10,
timeDiff: -2
} ,
"node-101": {
nodeID: 'node-101',
elapsedTime: 18,
timeDiff: 32
},
"node-102": {
nodeID: 'node-102',
elapsedTime: 250,
timeDiff: 850
}
}
Amended cacher key generation logic
When you didn't define keys
at caching, the cacher hashed the whole ctx.params
and used as a key to store the content. This method was too slow and difficult to implement to other platforms. Therefore we have changed it. The new method is simpler, the key generator concatenates all property names & values from ctx.params
.
However, the problem with this new logic is that the key can be very long. It can cause performance issues when you use too long keys to get or save cache entries. To avoid it, there is a maxParamsLength
option to limit the key length. If it is longer than the configured limit, the cacher calculates a hash (SHA256) from the full key and add it to the end of key.
The minimum of
maxParamsLength
is44
(SHA 256 hash length in Base64).To disable this feature, set it to
0
ornull
.
Generate a full key from the whole params
cacher.getCacheKey("posts.find", { id: 2, title: "New post", content: "It can be very very looooooooooooooooooong content. So this key will also be too long" });
// Key: 'posts.find:id|2|title|New post|content|It can be very very looooooooooooooooooong content. So this key will also be too long'
Generate a limited key with hash
const broker = new ServiceBroker({
logger: console,
cacher: {
type: "Memory",
options: {
maxParamsLength: 60
}
}
});
cacher.getCacheKey("posts.find", { id: 2, title: "New post", content: "It can be very very looooooooooooooooooong content. So this key will also be too long" });
// Key: 'posts.find:id|2|title|New pL4ozUU24FATnNpDt1B0t1T5KP/T5/Y+JTIznKDspjT0='
Of course, you can use your custom solution with keygen
cacher options like earlier.
Cacher matcher changed
The cacher matcher code also changed in cacher.clean
method. The previous (wrong) matcher couldn't handle dots (.) properly in patterns. E.g the posts.*
pattern cleaned the posts.find.something
keys, too. Now it has been fixed, but it means that you should use posts.**
pattern because the params
and meta
values can contain dots.
Changed Moleculer errors signature
The following Moleculer Error classes constructor arguments is changed to constructor(data)
:
ServiceNotFoundError
ServiceNotAvailableError
RequestTimeoutError
RequestSkippedError
RequestRejectedError
QueueIsFullError
MaxCallLevelError
ProtocolVersionMismatchError
InvalidPacketDataError
Before
throw new ServiceNotFoundError("posts.find", "node-123");
Now
throw new ServiceNotFoundError({ action: "posts.find", nodeID: "node-123" });
New
New state-of-the-art middlewares
We have been improved the current middleware handler and enriched it with a lot of useful features. As a result, you can hack more internal flow logic with custom middlewares (e.g. event sending, service creating, service starting...etc)
The new one is an Object
with hooks instead of a simple Function
. However, the new solution is backward compatible, so you don't need to migrate your old middlewares.
A new middleware with all available hooks
const MyCustomMiddleware = {
// Wrap local action handlers (legacy middleware handler)
localAction(next, action) {
},
// Wrap remote action handlers
remoteAction(next, action) {
},
// Wrap local event handlers
localEvent(next, event) {
}
// Wrap broker.createService method
createService(next) {
}
// Wrap broker.destroyService method
destroyService(next) {
}
// Wrap broker.call method
call(next) {
}
// Wrap broker.mcall method
mcall(next) {
}
// Wrap broker.emit method
emit(next) {
},
// Wrap broker.broadcast method
broadcast(next) {
},
// Wrap broker.broadcastLocal method
broadcastLocal(next) {
},
// After a new local service created (sync)
serviceCreated(service) {
},
// Before a local service started (async)
serviceStarting(service) {
},
// After a local service started (async)
serviceStarted(service) {
},
// Before a local service stopping (async)
serviceStopping(service) {
},
// After a local service stopped (async)
serviceStopped(service) {
},
// After broker is created (async)
created(broker) {
},
// Before broker starting (async)
starting(broker) {
},
// After broker started (async)
started(broker) {
},
// Before broker stopping (async)
stopping(broker) {
},
// After broker stopped (async)
stopped(broker) {
}
}
Use it in broker options
const broker = new ServiceBroker({
middlewares: [
MyCustomMiddleware
]
});
Wrapping handlers
Some hooks are wrappers. It means you need to wrap the original handler and return a new Function.
Wrap hooks where the first parameter is next
.
Wrap local action handler
const MyDoSomethingMiddleware = {
localAction(next, action) {
if (action.myFunc) {
// Wrap the handler
return function(ctx) {
doSomethingBeforeHandler(ctx);
return handler(ctx)
.then(res => {
doSomethingAfterHandler(res);
// Return the original result
return res;
})
.catch(err => {
doSomethingAfterHandlerIfFailed(err);
// Throw further the error
throw err;
});
}
}
// If the feature is disabled we don't wrap it, return the original handler
// So it won't cut down the performance for actions where the feature is disabled.
return handler;
}
};
Decorate broker (to extend functions)
Other hooks are to help you to decorate new features in ServiceBroker & services.
Decorate broker with a new allCall
method
const broker = new ServiceBroker({
middlewares: [
{
// After broker is created
created(broker) {
// Call action on all available nodes
broker.allCall = function(action, params, opts = {}) {
const nodeIDs = this.registry.getNodeList({ onlyAvailable: true })
.map(node => node.id);
// Make direct call to the given Node ID
return Promise.all(nodeIDs.map(nodeID => broker.call(action, params, Object.assign({ nodeID }, opts))));
}
}
}
]
});
await broker.start();
// Call `$node.health` on every nodes & collect results
const res = await broker.allCall("$node.health");
Decorate services with a new method
const broker = new ServiceBroker({
middlewares: [
{
// After a new local service created
serviceCreated(service) {
// Call action on all available nodes
service.customFunc = function() {
// Do something
}.bind(service);
}
}
]
});
In service schema:
module.export = {
name: "users",
actions: {
find(ctx) {
// Call the new custom function
this.customFunc();
}
}
};
The mixins can do similar things, so we prefer mixins to this decorating.
Many internal features are exposed to internal middlewares
Due to the new advanced middlewares, we could bring out many integrated features to middlewares. They are available under require("moleculer").Middlewares
property, but they load automatically.
New internal middlewares:
- Action hook handling
- Validator
- Bulkhead
- Cacher
- Context tracker
- Circuit Breaker
- Timeout
- Retry
- Fallback
- Error handling
- Metrics
Turn off the automatic loading with
internalMiddlewares: false
broker option. In this case you have to add them tomiddlewares: []
broker option.
The
broker.use
method is deprecated. Usemiddlewares: []
in the broker options instead.
Action hooks
Define action hooks to wrap certain actions coming from mixins.
There are before
, after
and error
hooks. Assign it to a specified action or all actions (*
) in service.
The hook can be a Function
or a String
. The latter must be a local service method name.
Before hooks
const DbService = require("moleculer-db");
module.exports = {
name: "posts",
mixins: [DbService]
hooks: {
before: {
// Define a global hook for all actions
// The hook will call the `resolveLoggedUser` method.
"*": "resolveLoggedUser",
// Define multiple hooks
remove: [
function isAuthenticated(ctx) {
if (!ctx.user)
throw new Error("Forbidden");
},
function isOwner(ctx) {
if (!this.checkOwner(ctx.params.id, ctx.user.id))
throw new Error("Only owner can remove it.");
}
]
}
},
methods: {
async resolveLoggedUser(ctx) {
if (ctx.meta.user)
ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });
}
}
}
After & Error hooks
const DbService = require("moleculer-db");
module.exports = {
name: "users",
mixins: [DbService]
hooks: {
after: {
// Define a global hook for all actions to remove sensitive data
"*": function(ctx, res) {
// Remove password
delete res.password;
// Please note, must return result (either the original or a new)
return res;
},
get: [
// Add a new virtual field to the entity
async function (ctx, res) {
res.friends = await ctx.call("friends.count", { query: { follower: res._id }});
return res;
},
// Populate the `referrer` field
async function (ctx, res) {
if (res.referrer)
res.referrer = await ctx.call("users.get", { id: res._id });
return res;
}
]
},
error: {
// Global error handler
"*": function(ctx, err) {
this.logger.error(`Error occurred when '${ctx.action.name}' action was called`, err);
// Throw further the error
throw err;
}
}
}
};
The recommended use case is to create mixins filling up the service with methods and in hooks
set method names.
Mixin
module.exports = {
methods: {
checkIsAuthenticated(ctx) {
if (!ctx.meta.user)
throw new Error("Unauthenticated");
},
checkUserRole(ctx) {
if (ctx.action.role && ctx.meta.user.role != ctx.action.role)
throw new Error("Forbidden");
},
checkOwner(ctx) {
// Check the owner of entity
}
}
}
Use mixin methods in hooks
const MyAuthMixin = require("./my.mixin");
module.exports = {
name: "posts",
mixins: [MyAuthMixin]
hooks: {
before: {
"*": ["checkIsAuthenticated"],
create: ["checkUserRole"],
update: ["checkUserRole", "checkOwner"],
remove: ["checkUserRole", "checkOwner"]
}
},
actions: {
find: {
// No required role
handler(ctx) {}
},
create: {
role: "admin",
handler(ctx) {}
},
update: {
role: "user",
handler(ctx) {}
}
}
};
New Bulkhead fault-tolerance feature
Bulkhead feature is an internal middleware in Moleculer. Use it to control the concurrent request handling of actions.
Global settings in the broker options. Applied to all registered local actions.
const broker = new ServiceBroker({
bulkhead: {
enabled: true,
concurrency: 3,
maxQueueSize: 10,
}
});
The concurrency
value restricts the concurrent request executions. If maxQueueSize
is bigger than 0, broker queues additional requests, if all slots are taken. If queue size reaches maxQueueSize
limit or it is 0, broker will throw QueueIsFull
error for every addition request.
These global options can be overriden in action definition, as well.
module.export = {
name: "users",
actions: {
find: {
bulkhead: {
enabled: false
},
handler(ctx) {}
},
create: {
bulkhead: {
// Increment the concurrency value
// for this action
concurrency: 10
},
handler(ctx) {}
}
}
};
Fallback in action definition
Due to the exposed Fallback middleware, fallback response can be set in the action definition, too.
Please note, this fallback response will only be used if the error occurs within action handler. If the request is called from a remote node and the request is timed out on the remote node, the fallback response is not be used. In this case, use the
fallbackResponse
in calling option.
Fallback as function
module.exports = {
name: "recommends",
actions: {
add: {
fallback: (ctx, err) => "Some cached result",
//fallback: "fakeResult",
handler(ctx) {
// Do something
}
}
}
};
Fallback as method name string
module.exports = {
name: "recommends",
actions: {
add: {
// Call the 'getCachedResult' method when error occurred
fallback: "getCachedResult",
handler(ctx) {
// Do something
}
}
},
methods: {
getCachedResult(ctx, err) {
return "Some cached result";
}
}
};
Action visibility
The action has a new visibility
property to control the visibility & callability of service actions.
Available values:
published
ornull
: public action. It can be called locally, remotely and can be published via API Gatewaypublic
: public action, can be called locally & remotely but not published via API GWprotected
: can be called only locally (from local services)private
: can be called only internally (viathis.actions.xy()
within service)
module.exports = {
name: "posts",
actions: {
// It's published by default
find(ctx) {},
clean: {
// Callable only via `this.actions.clean`
visibility: "private",
handler(ctx) {}
}
},
methods: {
cleanEntities() {
// Call the action directly
return this.actions.clean();
}
}
}
The default value is
null
(meanspublished
) due to backward compatibility.
New Thrift serializer
There is a new built-in Thrift serializer.
const broker = new ServiceBroker({
serializer: "Thrift"
});
To use this serializer install the
thrift
module withnpm install thrift --save
command.
Enhanced log level configuration
A new module-based log level configuration was added. The log level can be set for every Moleculer module. Use of wildcard is allowed.
const broker = new ServiceBroker({
logger: console,
logLevel: {
"MY.**": false, // Disable logs
"TRANS": "warn", // Only 'warn ' and 'error' log entries
"*.GREETER": "debug", // All log entries
"**": "debug", // All other modules use this level
}
});
Please note, it works only with default console logger. In case of external loggers (Pino, Windows, Bunyan, ...etc), these log levels must be applied.
These settings are evaluated from top to bottom, so the
**
level must be the last property.
Internal modules:
BROKER
,TRANS
,TX
as transporter,CACHER
,REGISTRY
.For services, the name comes from the service name. E.g.
POSTS
.
A version is used as a prefix. E.g.V2.POSTS
The old global log level settings works, as well.
const broker = new ServiceBroker({
logger: console,
logLevel: "warn"
});
New short
log formatter
A new short
log formatter was also added. It is similar to the default, but doesn't print the date and nodeID
.
const broker = new ServiceBroker({
logFormatter: "short"
});
Output
[19:42:49.055Z] INFO MATH: Service started.
Load services also with glob patterns
Moleculer Runner loads services also from glob patterns. It is useful when loading all services except certain ones.
$ moleculer-runner services !services/others/**/*.service.js services/others/mandatory/main.service.js
Explanations:
services
- legacy mode. Load all services from theservices
folder with**/*.service.js
file mask!services/others/**/*.service.js
- skip all services in theservices/others
folder and sub-folders.services/others/mandatory/main.service.js
- load the exact service
Glob patterns work in the
SERVICES
enviroment variables, as well.
MemoryCacher cloning
There is a new clone
property in the MemoryCacher
options. If it's true
, the cacher clones the cached data before returning.
If received value is modified, enable this option. Note: it cuts down the performance.
Enable cloning
const broker = new ServiceBroker({
cacher: {
type: "Memory",
options: {
clone: true
}
}
});
This feature uses the lodash _.cloneDeep
method. To change cloning method set a Function
to the clone
option instead of a Boolean
.
Custom clone function with JSON parse & stringify:
const broker = new ServiceBroker({
cacher: {
type: "Memory",
options: {
clone: data => JSON.parse(JSON.stringify(data))
}
}
});
Changes
- service instances has a new property named
fullName
containing service version & service name. - the
Action
has arawName
property containing action name without service name. - new
$node.options
internal action to get the current broker options. Context.create
&new Context
signature changed.- removed Context metrics methods. All metrics feature moved to the
Metrics
middleware. ctx.timeout
moved toctx.options.timeout
.- removed
ctx.callerNodeID
. ctx.endpoint
is a new property pointing to targetEndpoint
. For example you can check withctx.endpoint.local
flag whether the request is remote or local.- lazily generated
ctx.id
, i.e. only generated at access.ctx.generateID()
was removed. - renamed service lifecycle methods in service instances (not in service schema!)
- extended
transit.stat.packets
with byte-based statistics. utils.deprecate
method was created for deprecation.- Transporter supports
mqtt+ssl://
,rediss://
&amqps://
protocols in connection URIs. - fixed circular objects handling in service schema (e.g.: Joi validator problem)
Deprecations
broker.use()
has been deprecated. Usemiddlewares: [...]
in broker options instead.