Skip to content

Commit

Permalink
nanonext 1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Jun 3, 2024
1 parent cfe434a commit 54c1637
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 51 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.0.0.9022
Version: 1.1.0
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# nanonext 1.0.0.9022 (development)
# nanonext 1.1.0

#### New Features

Expand Down
2 changes: 1 addition & 1 deletion R/aio.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ call_aio_ <- function(aio) invisible(.Call(rnng_wait_thread_create, aio))
#' collect_aio(res)
#'
#' msg <- recv_aio(s2, timeout = 100)
#' collect_aio_(res)
#' collect_aio_(msg)
#'
#' close(s1)
#' close(s2)
Expand Down
2 changes: 1 addition & 1 deletion man/collect_aio.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 41 additions & 41 deletions vignettes/nanonext.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ One solution it provides is that of processing real-time data where computation
Create socket in Python using the NNG binding 'pynng':


```python
``` python
import numpy as np
import pynng
socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket")
Expand All @@ -46,7 +46,7 @@ socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket")
Create nano object in R using `nanonext`, then send a vector of 'doubles', specifying mode as 'raw':


```r
``` r
library(nanonext)
n <- nano("pair", dial = "ipc:///tmp/nanonext.socket")
n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw")
Expand All @@ -56,7 +56,7 @@ n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw")
Receive in Python as a NumPy array of 'floats', and send back to R:


```python
``` python
raw = socket.recv()
array = np.frombuffer(raw)
print(array)
Expand All @@ -69,7 +69,7 @@ socket.send(msg)
Receive in R, specifying the receive mode as 'double':


```r
``` r
n$recv(mode = "double")
#> [1] 1.1 2.2 3.3 4.4 5.5
```
Expand All @@ -81,7 +81,7 @@ n$recv(mode = "double")
`nanonext` implements true async send and receive, leveraging NNG as a massively-scaleable concurrency framework.


```r
``` r
s1 <- socket("pair", listen = "inproc://nano")
s2 <- socket("pair", dial = "inproc://nano")
```
Expand All @@ -91,7 +91,7 @@ s2 <- socket("pair", dial = "inproc://nano")
An 'Aio' object returns an unresolved value whilst its asynchronous operation is ongoing, automatically resolving to a final value once complete.


```r
``` r
# an async receive is requested, but no messages are waiting (yet to be sent)
msg <- recv_aio(s2)
msg
Expand All @@ -103,7 +103,7 @@ msg$data
For a 'sendAio' object, the result is stored at `$result`.


```r
``` r
res <- send_aio(s1, data.frame(a = 1, b = 2))
res
#> < sendAio | $result >
Expand All @@ -115,7 +115,7 @@ res$result
For a 'recvAio' object, the message is stored at `$data`.


```r
``` r
# now that a message has been sent, the 'recvAio' resolves automatically
msg$data
#> a b
Expand All @@ -125,7 +125,7 @@ msg$data
Auxiliary function `unresolved()` may be used in control flow statements to perform actions which depend on resolution of the Aio, both before and after. This means there is no need to actually wait (block) for an Aio to resolve, as the example below demonstrates.


```r
``` r
msg <- recv_aio(s2)

# unresolved() queries for resolution itself so no need to use it again within the while loop
Expand All @@ -144,14 +144,18 @@ msg$data
The values may also be called explicitly using `call_aio()`. This will wait for completion of the Aio (blocking).


```r
``` r
# will wait for completion then return the resolved Aio
call_aio(msg)

# to access the resolved value directly (waiting if required)
# to access the resolved value (waiting if required):
call_aio(msg)$data
#> [1] "resolved"

# or directly:
collect_aio(msg)
#> [1] "resolved"

close(s1)
close(s2)
```
Expand All @@ -167,7 +171,7 @@ Can be used to perform computationally-expensive calculations or I/O-bound opera
[S] Server process: `reply()` will wait for a message and apply a function, in this case `rnorm()`, before sending back the result. This is started in a background 'mirai' process.


```r
``` r
m <- mirai::mirai({
library(nanonext)
rep <- socket("rep", listen = "tcp://127.0.0.1:6556")
Expand All @@ -178,7 +182,7 @@ m <- mirai::mirai({
[C] Client process: `request()` performs an async send and receive request and returns immediately with a `recvAio` object.


```r
``` r
library(nanonext)
req <- socket("req", dial = "tcp://127.0.0.1:6556")
aio <- request(context(req), data = 1e8, recv_mode = "double")
Expand All @@ -187,7 +191,7 @@ aio <- request(context(req), data = 1e8, recv_mode = "double")
At this point, the client can run additional code concurrent with the server processing the request.


```r
``` r
# do more...
```

Expand All @@ -196,13 +200,9 @@ When the result of the server calculation is required, the `recvAio` may be call
The return value from the server request is then retrieved and stored in the Aio as `$data`.


```r
call_aio(aio)

aio
#> < recvAio | $data >
aio$data |> str()
#> num [1:100000000] 1.365 -0.842 -0.816 1.367 -0.813 ...
``` r
call_aio(aio)$data |> str()
#> num [1:100000000] 0.257 -0.413 0.946 0.545 0.071 ...
```

As `call_aio()` is blocking and will wait for completion, an alternative is to query `aio$data` directly. This will return an 'unresolved' logical NA value if the calculation is yet to complete.
Expand Down Expand Up @@ -232,7 +232,7 @@ The following shows how condition variables and signalling work in practice.
Example 1: set up a socket, and wait for the other side to connect:


```r
``` r
sock <- socket("pair", listen = "inproc://nanopipe")

cv <- cv() # create new condition variable
Expand Down Expand Up @@ -265,7 +265,7 @@ close(sock)
Example 2: wait until a message is received or connection is dropped:


```r
``` r
sock <- socket("pair", listen = "inproc://nanosignal")
sock2 <- socket("pair", dial = "inproc://nanosignal")

Expand Down Expand Up @@ -317,11 +317,11 @@ A client configuration requires a PEM-encoded CA certificate (chain) used to ver
Additionally, the convenience function `write_cert()` can automatically generate a 4096 bit RSA key pair and self-signed X.509 certificate in the format required by `tls_config()`. The 'cn' argument must be provided and match exactly the hostname / IP address of the URL that is being used, e.g. in the example below '127.0.0.1' must be used throughout, or alternatively 'localhost', but not a mixture of the two.


```r
``` r
cert <- write_cert(cn = "127.0.0.1")
str(cert)
#> List of 2
#> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKAIBAAKCAgEAuMCkX3Rdm9ssjzAfpLbDndtuwvwceenNXQNO9R2/v99teHdn\nTsjeYb+gNNpP"| __truncated__
#> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKgIBAAKCAgEA3CPAXY45HOTzvo4z+U15qFP3jvrcATlNio/qO4HU4L0E82k+\nQ2P1aDuWUg7h"| __truncated__
#> $ client: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ ""

ser <- tls_config(server = cert$server)
Expand All @@ -348,7 +348,7 @@ close(s)
`nanonext` fully implements NNG's pub/sub protocol as per the below example. A subscriber can subscribe to one or multiple topics broadcast by a publisher.


```r
``` r
pub <- socket("pub", listen = "inproc://nanobroadcast")
sub <- socket("sub", dial = "inproc://nanobroadcast")

Expand Down Expand Up @@ -392,7 +392,7 @@ sub |> recv(mode = "character")
The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received.


```r
``` r
sub |> subscribe(topic = 1)
pub |> send(c(1, 10, 10, 20), mode = "raw")
#> [1] 0
Expand All @@ -416,7 +416,7 @@ This type of pattern is useful for applications such as service discovery.
A surveyor sends a survey, which is broadcast to all peer respondents. Respondents are then able to reply, but are not obliged to. The survey itself is a timed event, and responses received after the timeout are discarded.


```r
``` r
sur <- socket("surveyor", listen = "inproc://nanoservice")
res1 <- socket("respondent", dial = "inproc://nanoservice")
res2 <- socket("respondent", dial = "inproc://nanoservice")
Expand Down Expand Up @@ -470,7 +470,7 @@ It can be seen that the final value resolves into a timeout, which is an integer
For normal use, it takes just the URL. It can follow redirects.


```r
``` r
ncurl("https://postman-echo.com/get")
#> $status
#> [1] 200
Expand All @@ -479,13 +479,13 @@ ncurl("https://postman-echo.com/get")
#> NULL
#>
#> $data
#> [1] "{\n \"args\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-6634af61-2a7256825245680a005b73e7\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}"
#> [1] "{\n \"args\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-665dccb1-6374c8162c8a5951767e475b\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}"
```

For advanced use, supports additional HTTP methods such as POST or PUT.


```r
``` r
res <- ncurl_aio("https://postman-echo.com/post",
method = "POST",
headers = c(`Content-Type` = "application/json", Authorization = "Bearer APIKEY"),
Expand All @@ -496,10 +496,10 @@ res

call_aio(res)$headers
#> $date
#> [1] "Fri, 03 May 2024 09:33:21 GMT"
#> [1] "Mon, 03 Jun 2024 14:01:21 GMT"

res$data
#> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-6634af61-7f19837d47b179ea131ef4e9\",\n \"content-length\": \"16\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}"
#> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-665dccb1-74cd2d7d4fe4282876bfb176\",\n \"content-length\": \"16\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}"
```

In this respect, it may be used as a performant and lightweight method for making REST API requests.
Expand All @@ -509,7 +509,7 @@ In this respect, it may be used as a performant and lightweight method for makin
By specifying `convert = FALSE`, the received binary data is made available as a raw vector. This may be fed into 'json' parsers which can operate directly on such data etc.


```r
``` r
sess <- ncurl_session("https://postman-echo.com/get",
convert = FALSE,
headers = c(`Content-Type` = "application/json", Authorization = "Bearer APIKEY"),
Expand All @@ -523,7 +523,7 @@ transact(sess)
#>
#> $headers
#> $headers$Date
#> [1] "Fri, 03 May 2024 09:33:22 GMT"
#> [1] "Mon, 03 Jun 2024 14:01:22 GMT"
#>
#> $headers$`Content-Type`
#> [1] "application/json; charset=utf-8"
Expand All @@ -534,7 +534,7 @@ transact(sess)
#> [40] 6f 72 77 61 72 64 65 64 2d 70 72 6f 74 6f 22 3a 20 22 68 74 74 70 73 22 2c 0a 20 20 20 20 22 78 2d 66 6f 72 77 61 72
#> [79] 64 65 64 2d 70 6f 72 74 22 3a 20 22 34 34 33 22 2c 0a 20 20 20 20 22 68 6f 73 74 22 3a 20 22 70 6f 73 74 6d 61 6e 2d
#> [118] 65 63 68 6f 2e 63 6f 6d 22 2c 0a 20 20 20 20 22 78 2d 61 6d 7a 6e 2d 74 72 61 63 65 2d 69 64 22 3a 20 22 52 6f 6f 74
#> [157] 3d 31 2d 36 36 33 34 61 66 36 32 2d 37 38 31 63 34 34 38 66 35 30 37 65 61 62 64 65 33 31 35 61 66 35 37 35 22 2c 0a
#> [157] 3d 31 2d 36 36 35 64 63 63 62 32 2d 31 39 34 62 62 66 39 35 30 64 30 32 64 65 36 33 36 30 32 38 66 32 34 31 22 2c 0a
#> [196] 20 20 20 20 22 63 6f 6e 74 65 6e 74 2d 74 79 70 65 22 3a 20 22 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 22 2c
#> [235] 0a 20 20 20 20 22 61 75 74 68 6f 72 69 7a 61 74 69 6f 6e 22 3a 20 22 42 65 61 72 65 72 20 41 50 49 4b 45 59 22 0a 20
#> [274] 20 7d 2c 0a 20 20 22 75 72 6c 22 3a 20 22 68 74 74 70 73 3a 2f 2f 70 6f 73 74 6d 61 6e 2d 65 63 68 6f 2e 63 6f 6d 2f
Expand All @@ -544,7 +544,7 @@ transact(sess)
Optimised functions for base64 encoding and decoding from the 'Mbed TLS' library are also exposed as convenience utilities:


```r
``` r
base64enc("hello world!")
#> [1] "aGVsbG8gd29ybGQh"

Expand All @@ -561,7 +561,7 @@ base64dec(base64enc("hello world!"))
The stream interface can be used to communicate with (secure) websocket servers. The argument `textframes = TRUE` can be specified where the websocket server uses text rather than binary frames.


```r
``` r
# connecting to an echo service
s <- stream(dial = "wss://echo.websocket.events/", textframes = TRUE)
s
Expand All @@ -574,7 +574,7 @@ s
`send()` and `recv()`, as well as their asynchronous counterparts `send_aio()` and `recv_aio()` can be used on Streams in the same way as Sockets. This affords a great deal of flexibility in ingesting and processing streaming data.


```r
``` r
s |> recv()
#> [1] "echo.websocket.events sponsored by Lob.com"

Expand Down Expand Up @@ -612,7 +612,7 @@ See the function documentation page for a list of common options.
Once a dialer or listener has started, it is not generally possible to change its configuration. In this case, the dialer or listener should be created specifying 'autostart = FALSE'.


```r
``` r
s <- socket(listen = "inproc://options", autostart = FALSE)

# no maximum message size
Expand All @@ -635,7 +635,7 @@ This can be used on a Socket, Listener or Dialer to query useful statistics such
See the function documentation page for available statistics.


```r
``` r
s <- socket(listen = "inproc://stat")

# no active connections (pipes)
Expand Down
11 changes: 5 additions & 6 deletions vignettes/nanonext.Rmd.orig
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ The values may also be called explicitly using `call_aio()`. This will wait for
# will wait for completion then return the resolved Aio
call_aio(msg)

# to access the resolved value directly (waiting if required)
# to access the resolved value (waiting if required):
call_aio(msg)$data

# or directly:
collect_aio(msg)

close(s1)
close(s2)

Expand Down Expand Up @@ -184,11 +187,7 @@ When the result of the server calculation is required, the `recvAio` may be call
The return value from the server request is then retrieved and stored in the Aio as `$data`.

```{r rpcclient3}
call_aio(aio)

aio
aio$data |> str()

call_aio(aio)$data |> str()
```

As `call_aio()` is blocking and will wait for completion, an alternative is to query `aio$data` directly. This will return an 'unresolved' logical NA value if the calculation is yet to complete.
Expand Down

0 comments on commit 54c1637

Please sign in to comment.