diff --git a/README.md b/README.md index e514a23..1116d0b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,6 @@ [![npm downloads](https://img.shields.io/npm/dm/flatend.svg?style=flat)](https://www.npmjs.com/package/flatend) [![Security Responsible Disclosure](https://img.shields.io/badge/Security-Responsible%20Disclosure-yellow.svg)](https://github.com/nodejs/security-wg/blob/master/processes/responsible_disclosure_template.md) - **flatend** is an experimental framework and protocol to make microservices more modular, simpler, safer, cheaper, and faster to build using [p2p networking](https://github.com/lithdew/monte). @@ -16,30 +15,29 @@ ## Features -* Fully agnostic and compatible with any type of language, database, tool, library, or framework. -* P2P-based service discovery, load balancing, routing, and PKI via [Kademlia](https://en.wikipedia.org/wiki/Kademlia). -* Fully-encrypted, end-to-end, bidirectional streaming RPC via [Monte](https://github.com/lithdew/monte). -* Automatic reconnect/retry upon crashes or connection loss. -* Zero-hassle serverless: every function is a microservice. -* Stream multiple gigabytes of data across microservices. +- Fully agnostic and compatible with any type of language, database, tool, library, or framework. +- P2P-based service discovery, load balancing, routing, and PKI via [Kademlia](https://en.wikipedia.org/wiki/Kademlia). +- Fully-encrypted, end-to-end, bidirectional streaming RPC via [Monte](https://github.com/lithdew/monte). +- Automatic reconnect/retry upon crashes or connection loss. +- Zero-hassle serverless: every function is a microservice. +- Stream multiple gigabytes of data across microservices. ## Gateways **flatend** additionally comes with scalable, high-performance, production-ready, easily-deployable API gateways that are bundled into a [small, single executable binary](https://github.com/lithdew/flatend/releases) to help you quickly deploy your microservices. -* Written in [Go](https://golang.org/). -* HTTP/1.1, HTTP/2 support. -* Automatic HTTPS via [LetsEncrypt](https://letsencrypt.org/). -* Expose/load-balance across microservices. -* Serve static files and directories. -* REPL for real-time management (*coming soon!*). -* Prometheus metrics (*coming soon!*). -* WebSocket support (*coming soon!*). -* gRPC support (*coming soon!*). +- Written in [Go](https://golang.org/). +- HTTP/1.1, HTTP/2 support. +- Automatic HTTPS via [LetsEncrypt](https://letsencrypt.org/). +- Expose/load-balance across microservices. +- Serve static files and directories. +- REPL for real-time management (_coming soon!_). +- Prometheus metrics (_coming soon!_). +- WebSocket support (_coming soon!_). +- gRPC support (_coming soon!_). All gateways have been extensively tested on [Rackspace](https://www.rackspace.com/), [Scaleway](https://www.scaleway.com/en/), [AWS](https://aws.amazon.com/), [Google Cloud](https://cloud.google.com/), and [DigitalOcean](https://www.digitalocean.com/). - ## Requirements Although **flatend** at its core is a protocol, and hence agnostic to whichever programming langauge you use, there are currently only two reference implementations in NodeJS and Go. @@ -50,7 +48,7 @@ Although **flatend** at its core is a protocol, and hence agnostic to whichever The rationale for starting with NodeJS and Go is so that, for any new product/service, you may: 1. Quickly prototype and deploy in NodeJS with SQLite using a 2USD/month bare-metal server. -2. Once you start scaling up, split up your microservice and rewrite the performance-critical parts in Go. +2. Once you start scaling up, split up your microservice and rewrite the performance-critical parts in Go. 3. Run a red/blue deployment easily to gradually deploy your new microservices and experience zero downtime. Support is planned for the following runtimes/languages: @@ -165,7 +163,7 @@ Hello world! Try restart your API gateway and watch your service re-discover it. ```shell -$ go run main.go +$ go run main.go 2020/06/18 04:11:06 Listening for Flatend nodes on '[::]:39313'. 2020/06/18 04:11:06 You are now connected to 127.0.0.1:9000. Services: [] 2020/06/18 04:11:06 Re-probed 127.0.0.1:9000. Services: [] @@ -207,34 +205,34 @@ success Saved X new dependencies. Write a function that describes how to handle requests for the service `hello_world` in `index.js`. ```js -const {Node, Context} = require("flatend"); +const { Node, Context } = require("flatend"); -const helloWorld = ctx => ctx.send("Hello world!"); +const helloWorld = (ctx) => ctx.send("Hello world!"); ``` Register the function as a handler for the service `hello_world`. Start the node and have it connect to Flatend's API gateway. ```js -const {Node, Context} = require("flatend"); +const { Node, Context } = require("flatend"); -const helloWorld = ctx => ctx.send("Hello world!"); +const helloWorld = (ctx) => ctx.send("Hello world!"); async function main() { - await Node.start({ - addrs: ["127.0.0.1:9000"], - services: { - 'hello_world': helloWorld, - }, - }); + await Node.start({ + addrs: ["127.0.0.1:9000"], + services: { + hello_world: helloWorld, + }, + }); } -main().catch(err => console.error(err)); +main().catch((err) => console.error(err)); ``` Run it. ```shell -$ DEBUG=* node index.js +$ DEBUG=* node index.js flatend You are now connected to 127.0.0.1:9000. Services: [] +0ms flatend Discovered 0 peer(s). +19ms ``` @@ -249,7 +247,7 @@ Hello world! Try restart your API gateway and watch your service re-discover it. ```shell -$ DEBUG=* node index.js +$ DEBUG=* node index.js flatend You are now connected to 127.0.0.1:9000. Services: [] +0ms flatend Discovered 0 peer(s). +19ms flatend Trying to reconnect to 127.0.0.1:9000. Sleeping for 500ms. +41s @@ -276,21 +274,37 @@ package flatend import "github.com/lithdew/kademlia" type Node struct { - // A reachable, public address which peers may reach you on. - // The format of the address must be [host]:[port]. - PublicAddr string + // A reachable, public address which peers may reach you on. + // The format of the address must be [host]:[port]. + PublicAddr string - // A 32-byte Ed25519 private key. A secret key must be provided - // to allow for peers to reach you. A secret key may be generated - // by calling `flatend.GenerateSecretKey()`. - SecretKey kademlia.PrivateKey + // A 32-byte Ed25519 private key. A secret key must be provided + // to allow for peers to reach you. A secret key may be generated + // by calling `flatend.GenerateSecretKey()`. + SecretKey kademlia.PrivateKey // A list of IPv4/IPv6 addresses and ports assembled as [host]:[port] which // your Flatend node will listen for other nodes from. - BindAddrs []string + BindAddrs []string + + // A mapping of service names to their respective handlers. + Services map[string]flatend.Handler + + // Total number of attempts to reconnect to a peer we reached that disconnected. + // Default is 8 attempts, set to a negative integer to not attempt to reconnect at all. + NumReconnectAttempts int - // A mapping of service names to their respective handlers. - Services map[string]Handler + // A factor proportionally representing how much larger each reconnection attempts + // delay should increase by upon each attempt. Default is 1.25. + ReconnectBackoffFactor float64 + + // The minimum amount of time to wait before each reconnection attempt. Default is 500 + // milliseconds. + ReconnectBackoffMinDuration time.Duration + + // The maximum amount of time to wait before each reconnection attempt. Default is 1 + // second. + ReconnectBackoffMaxDuration time.Duration // .... } @@ -307,9 +321,9 @@ func helloWorld(ctx *flatend.Context) { _ = ctx.ID // All headers must be written before writing any response body data. - + // Headers are used to send small amounts of metadata to a requester. - + // For example, the HTTP API gateway directly sets headers provided // as a response as the headers of a HTTP response to a HTTP request // which has been transcribed to a Flatend service request that is @@ -329,7 +343,7 @@ func helloWorld(ctx *flatend.Context) { // The body of a request may be accessed via `ctx.Body`. Request bodies // are unbounded in size, and represented as a `io.ReadCloser`. - + // It is advised to wrap the body under an `io.LimitReader` to limit // the size of the bodies of requests. @@ -344,18 +358,18 @@ func helloWorld(ctx *flatend.Context) { ### NodeJS SDK ```js -const {Node} = require("flatend"); +const { Node } = require("flatend"); export interface NodeOptions { // A reachable, public address which peers may reach you on. // The format of the address must be [host]:[port]. publicAddr?: string; - + // A list of [host]:[port] addresses which this node will bind a listener // against to accept new Flatend nodes. bindAddrs?: string[]; - // A list of addresses to nodes to initially reach out + // A list of addresses to nodes to initially reach out // for/bootstrap from first. addrs?: string[]; @@ -366,69 +380,78 @@ export interface NodeOptions { // A mapping of service names to their respective handlers. services?: { [key: string]: Handler }; + + // Total number of attempts to reconnect to a peer we reached that disconnected. + // Default is 8 attempts: set to 0 to not attempt to reconnect, or a negative number + // to always attempt to reconnect. + numReconnectAttempts?: number; + + // The amount of time to wait before each reconnection attempt. Default is 500 + // milliseconds. + reconnectBackoffDuration?: number; } -await Node.start(opts: NodeOpts); +await Node.start((opts: NodeOpts)); -const {Context} = require("flatend"); +const { Context } = require("flatend"); // Handlers may optionally be declared as async, and may optionally // return promises. -const helloWorld = async ctx => { - // 'ctx' is a NodeJS Duplex stream. Writing to it writes a response - // body, and reading from it reads a request body. +const helloWorld = async (ctx) => { + // 'ctx' is a NodeJS Duplex stream. Writing to it writes a response + // body, and reading from it reads a request body. - _ = ctx.id; // The ID of the requester. + _ = ctx.id; // The ID of the requester. - ctx.pipe(ctx); // This would pipe all request data as response data. + ctx.pipe(ctx); // This would pipe all request data as response data. - // Headers are used to send small amounts of metadata to a requester. - - // For example, the HTTP API gateway directly sets headers provided - // as a response as the headers of a HTTP response to a HTTP request - // which has been transcribed to a Flatend service request that is - // handled by some given node. + // Headers are used to send small amounts of metadata to a requester. - ctx.header("header key", "header val"); + // For example, the HTTP API gateway directly sets headers provided + // as a response as the headers of a HTTP response to a HTTP request + // which has been transcribed to a Flatend service request that is + // handled by some given node. - // All request headers may be accessed via 'ctx.headers'. Headers - // are represented as an object. + ctx.header("header key", "header val"); - // The line below closes the response with the body being a - // JSON-encoded version of the request headers provided. + // All request headers may be accessed via 'ctx.headers'. Headers + // are represented as an object. - ctx.json(ctx.headers); + // The line below closes the response with the body being a + // JSON-encoded version of the request headers provided. - // Arbitrary streams may be piped into 'ctx', like the contents of - // a file for example. + ctx.json(ctx.headers); - const fs = require("fs"); - fs.createFileStream("index.js").pipe(ctx); + // Arbitrary streams may be piped into 'ctx', like the contents of + // a file for example. - // Any errors thrown in a handler are caught and sent as a JSON - // response. + const fs = require("fs"); + fs.createFileStream("index.js").pipe(ctx); - throw new Error("This shouldn't happen!"); + // Any errors thrown in a handler are caught and sent as a JSON + // response. - // The 'ctx' stream must be closed, either manually via 'ctx.end()' or - // via a function. Not closing 'ctx' will cause the handler to deadlock. + throw new Error("This shouldn't happen!"); - // DO NOT DO THIS! - // ctx.write("hello world!"); + // The 'ctx' stream must be closed, either manually via 'ctx.end()' or + // via a function. Not closing 'ctx' will cause the handler to deadlock. - // DO THIS! - ctx.write("hello world!"); - ctx.end(); + // DO NOT DO THIS! + // ctx.write("hello world!"); - // OR THIS! - ctx.send("hello world!"); + // DO THIS! + ctx.write("hello world!"); + ctx.end(); - // The line below reads the request body into a buffer up to 65536 bytes. - // If the body exceeds 65536 bytes, an error will be thrown. + // OR THIS! + ctx.send("hello world!"); - const body = await ctx.read({limit: 65536}); - console.log("I got this message:", body.toString("utf8")); + // The line below reads the request body into a buffer up to 65536 bytes. + // If the body exceeds 65536 bytes, an error will be thrown. + + const body = await ctx.read({ limit: 65536 }); + console.log("I got this message:", body.toString("utf8")); }; ``` @@ -531,12 +554,12 @@ Got a question? Either: #### Is flatend production-ready? Who uses flatend today? -*flatend is still a heavy work-in-progress*. That being said, it is being field tested with a few enterprise projects related to energy and IoT right now. +_flatend is still a heavy work-in-progress_. That being said, it is being field tested with a few enterprise projects related to energy and IoT right now. Deployments of flatend have also been made with a few hundred thousand visitors. #### Will I be able to run flatend myself? - + It was built from the start to allow for self-hosting on the cloud, on bare-metal servers, in Docker containers, on Kubernetes, etc. The cloud is your limit (see the pun I did there?). #### I'm worried about vendor lock-in - what happens if flatend goes out of business? @@ -573,4 +596,4 @@ Reach out to us on Discord, maybe the system you are looking to support may be a ## License -**flatend**, and all of its source code is released under the [MIT License](LICENSE). \ No newline at end of file +**flatend**, and all of its source code is released under the [MIT License](LICENSE). diff --git a/node.go b/node.go index 31d5647..45915a2 100644 --- a/node.go +++ b/node.go @@ -18,6 +18,11 @@ import ( var _ monte.Handler = (*Node)(nil) var _ monte.ConnStateHandler = (*Node)(nil) +const DefaultNumReconnectAttempts = 8 +const DefaultReconnectBackoffFactor = 1.25 +const DefaultReconnectBackoffMinDuration = 500 * time.Millisecond +const DefaultReconnectBackoffMaxDuration = 1 * time.Second + type Node struct { // A reachable, public address which peers may reach you on. // The format of the address must be [host]:[port]. @@ -35,6 +40,22 @@ type Node struct { // A mapping of service names to their respective handlers. Services map[string]Handler + // Total number of attempts to reconnect to a peer we reached that disconnected. + // Default is 8 attempts, set to a negative integer to not attempt to reconnect at all. + NumReconnectAttempts int + + // A factor proportionally representing how much larger each reconnection attempts + // delay should increase by upon each attempt. Default is 1.25. + ReconnectBackoffFactor float64 + + // The minimum amount of time to wait before each reconnection attempt. Default is 500 + // milliseconds. + ReconnectBackoffMinDuration time.Duration + + // The maximum amount of time to wait before each reconnection attempt. Default is 1 + // second. + ReconnectBackoffMaxDuration time.Duration + start sync.Once stop sync.Once @@ -61,6 +82,46 @@ func GenerateSecretKey() kademlia.PrivateKey { return secret } +func (n *Node) getNumReconnectAttempts() int { + if n.NumReconnectAttempts == 0 { + return DefaultNumReconnectAttempts + } + return n.NumReconnectAttempts +} + +func (n *Node) getReconnectBackoffFactor() float64 { + if n.ReconnectBackoffFactor <= 0 { + return DefaultReconnectBackoffFactor + } + return n.ReconnectBackoffFactor +} + +func (n *Node) getReconnectBackoffDurationRange() (time.Duration, time.Duration) { + reconnectBackoffMinDuration := n.ReconnectBackoffMinDuration + if reconnectBackoffMinDuration <= 0 { + reconnectBackoffMinDuration = DefaultReconnectBackoffMinDuration + } + reconnectBackoffMaxDuration := n.ReconnectBackoffMaxDuration + if reconnectBackoffMaxDuration <= 0 { + reconnectBackoffMaxDuration = DefaultReconnectBackoffMaxDuration + } + if reconnectBackoffMaxDuration < reconnectBackoffMinDuration { + reconnectBackoffMinDuration = DefaultReconnectBackoffMinDuration + reconnectBackoffMaxDuration = DefaultReconnectBackoffMaxDuration + } + return reconnectBackoffMinDuration, reconnectBackoffMaxDuration +} + +func (n *Node) getReconnectBackoffPlan() *backoff.Backoff { + min, max := n.getReconnectBackoffDurationRange() + return &backoff.Backoff{ + Factor: n.getReconnectBackoffFactor(), + Jitter: true, + Min: min, + Max: max, + } +} + func (n *Node) Start(addrs ...string) error { var ( publicHost net.IP @@ -77,7 +138,7 @@ func (n *Node) Start(addrs ...string) error { publicHost = addr.IP if addr.Port <= 0 || addr.Port >= math.MaxUint16 { - return fmt.Errorf("'%d' is an invalid port", addr.Port) + return fmt.Errorf("%d is an invalid port", addr.Port) } publicPort = uint16(addr.Port) @@ -102,7 +163,7 @@ func (n *Node) Start(addrs ...string) error { start := false n.start.Do(func() { start = true }) if !start { - return errors.New("listener already started") + return errors.New("node already started") } if n.SecretKey != kademlia.ZeroPrivateKey { @@ -117,9 +178,10 @@ func (n *Node) Start(addrs ...string) error { n.table = kademlia.NewTable(kademlia.ZeroPublicKey) } - n.providers = NewProviders() n.clients = make(map[string]*monte.Client) + n.providers = NewProviders() + n.srv = &monte.Server{ Handler: n, ConnState: n, @@ -314,28 +376,25 @@ func (n *Node) HandleConnState(conn *monte.Conn, state monte.ConnState) { return } - go func() { - b := &backoff.Backoff{ - Factor: 1.25, - Jitter: true, - Min: 500 * time.Millisecond, - Max: 1 * time.Second, - } + if n.getNumReconnectAttempts() > 0 { + go func() { + plan := n.getReconnectBackoffPlan() - for i := 0; i < 8; i++ { // 8 attempts max - err := n.Probe(addr) - if err == nil { - return - } + for i := 0; i < n.getNumReconnectAttempts(); i++ { // 8 attempts max + err := n.Probe(addr) + if err == nil { + return + } - duration := b.Duration() + duration := plan.Duration() - log.Printf("Trying to reconnect to %s. Sleeping for %s.", addr, duration) - time.Sleep(duration) - } + log.Printf("Trying to reconnect to %s. Sleeping for %s.", addr, duration) + time.Sleep(duration) + } - log.Printf("Tried 8 times reconnecting to %s. Giving up.", addr) - }() + log.Printf("Tried 8 times reconnecting to %s. Giving up.", addr) + }() + } } func (n *Node) HandleMessage(ctx *monte.Context) error { @@ -573,11 +632,6 @@ func (n *Node) probe(conn *monte.Conn) error { } if packet.ID != nil { - //addr = Addr(packet.ID.Host, packet.ID.Port) - //if !packet.ID.Host.Equal(resolved.IP) || packet.ID.Port != uint16(resolved.Port) { - // return provider, fmt.Errorf("dialed '%s' which advertised '%s'", resolved, addr) - //} - // update the provider with id and services info provider, _ = n.providers.register(conn, packet.ID, packet.Services, true) if !exists { diff --git a/nodejs/README.md b/nodejs/README.md index 20fdd9d..1116d0b 100644 --- a/nodejs/README.md +++ b/nodejs/README.md @@ -274,25 +274,37 @@ package flatend import "github.com/lithdew/kademlia" type Node struct { - // A reachable, public address which peers may reach you on. - // The format of the address must be [host]:[port]. - PublicAddr string - - // A 32-byte Ed25519 private key. A secret key must be provided - // to allow for peers to reach you. A secret key may be generated - // by calling `flatend.GenerateSecretKey()`. - SecretKey kademlia.PrivateKey - - // A list of addresses and ports assembled using: - // 1. flatend.BindAny() (bind to all hosts and any available port) - // 2. flatend.BindTCP(string) (binds to a [host]:[port]) - // 3. flatend.BindTCPv4(string) (binds to an [IPv4 host]:[port]) - // 4. flatend.BindTCPv6(string) (binds to an [IPv6 host]:[port]) - // which your Flatend node will listen for other nodes from. - BindAddrs []BindFunc - - // A mapping of service names to their respective handlers. - Services map[string]Handler + // A reachable, public address which peers may reach you on. + // The format of the address must be [host]:[port]. + PublicAddr string + + // A 32-byte Ed25519 private key. A secret key must be provided + // to allow for peers to reach you. A secret key may be generated + // by calling `flatend.GenerateSecretKey()`. + SecretKey kademlia.PrivateKey + + // A list of IPv4/IPv6 addresses and ports assembled as [host]:[port] which + // your Flatend node will listen for other nodes from. + BindAddrs []string + + // A mapping of service names to their respective handlers. + Services map[string]flatend.Handler + + // Total number of attempts to reconnect to a peer we reached that disconnected. + // Default is 8 attempts, set to a negative integer to not attempt to reconnect at all. + NumReconnectAttempts int + + // A factor proportionally representing how much larger each reconnection attempts + // delay should increase by upon each attempt. Default is 1.25. + ReconnectBackoffFactor float64 + + // The minimum amount of time to wait before each reconnection attempt. Default is 500 + // milliseconds. + ReconnectBackoffMinDuration time.Duration + + // The maximum amount of time to wait before each reconnection attempt. Default is 1 + // second. + ReconnectBackoffMaxDuration time.Duration // .... } @@ -368,6 +380,15 @@ export interface NodeOptions { // A mapping of service names to their respective handlers. services?: { [key: string]: Handler }; + + // Total number of attempts to reconnect to a peer we reached that disconnected. + // Default is 8 attempts: set to 0 to not attempt to reconnect, or a negative number + // to always attempt to reconnect. + numReconnectAttempts?: number; + + // The amount of time to wait before each reconnection attempt. Default is 500 + // milliseconds. + reconnectBackoffDuration?: number; } await Node.start((opts: NodeOpts)); diff --git a/nodejs/src/node.ts b/nodejs/src/node.ts index b8752eb..88fb884 100644 --- a/nodejs/src/node.ts +++ b/nodejs/src/node.ts @@ -40,6 +40,15 @@ export interface NodeOptions { // A mapping of service names to their respective handlers. services?: { [key: string]: Handler }; + + // Total number of attempts to reconnect to a peer we reached that disconnected. + // Default is 8 attempts: set to 0 to not attempt to reconnect, or a negative number + // to always attempt to reconnect. + numReconnectAttempts?: number; + + // The amount of time to wait before each reconnection attempt. Default is 500 + // milliseconds. + reconnectBackoffDuration?: number; } export class Node { @@ -52,12 +61,19 @@ export class Node { id?: ID; keys?: nacl.SignKeyPair; handlers: { [key: string]: Handler } = {}; + _shutdown = false; + _numReconnectAttempts: number = 8; + _reconnectBackoffDuration: number = 500; public static async start(opts: NodeOptions): Promise { const node = new Node(); if (opts.services) node.handlers = opts.services; + if (opts.numReconnectAttempts) + node._numReconnectAttempts = opts.numReconnectAttempts; + if (opts.reconnectBackoffDuration) + node._reconnectBackoffDuration = opts.reconnectBackoffDuration; if (opts.secretKey) { node.keys = nacl.sign.keyPair.fromSecretKey(opts.secretKey); @@ -392,36 +408,42 @@ export class Node { } }); - setImmediate(async () => { - await events.once(provider!.sock, "end"); + if (this._numReconnectAttempts !== 0) { + setImmediate(async () => { + await events.once(provider!.sock, "end"); - if (this._shutdown) return; + if (this._shutdown) return; - let count = 8; + let count = this._numReconnectAttempts; - const reconnect = async () => { - if (this._shutdown) return; + const reconnect = async () => { + if (this._shutdown) return; + + if (this._numReconnectAttempts > 0 && count-- === 0) { + debug( + `Tried ${this._numReconnectAttempts} times reconnecting to ${ + provider!.addr + }. Giving up.` + ); + return; + } - if (count-- === 0) { debug( - `Tried 8 times reconnecting to ${provider!.addr}. Giving up.` + `Trying to reconnect to '${provider!.addr}'. Sleeping for ${ + this._reconnectBackoffDuration + }ms.` ); - return; - } - - debug( - `Trying to reconnect to '${provider!.addr}'. Sleeping for 500ms.` - ); - try { - await this.connect(opts); - } catch (err) { - setTimeout(reconnect, 500); - } - }; + try { + await this.connect(opts); + } catch (err) { + setTimeout(reconnect, this._reconnectBackoffDuration); + } + }; - setTimeout(reconnect, 500); - }); + setTimeout(reconnect, this._reconnectBackoffDuration); + }); + } } catch (err) { conn.end(); throw err;