Skip to content

Commit

Permalink
Update NNG version and remove deprecated API
Browse files Browse the repository at this point in the history
A new option class is added: ArbitraryOption
  • Loading branch information
Leonard Pollak committed Oct 21, 2021
1 parent b9f2495 commit fc6dda5
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 116 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ _nng_py.c
Release
_nng.c
*.pyd
__pycache__
**/__pycache__/
*.swp
.pytest_cache
dist
Expand Down
2 changes: 1 addition & 1 deletion build_pynng.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# e.g.: python setup.py build_ext -I<inc_path> -L<lib_path> -l<lib>
#elif True:
# incdirs = None
# libraries = ['pthread' 'mbedtls' 'nng']
# libraries = ['pthread', 'mbedtls', 'nng']
# objects = None
else:
incdirs = ['nng/include']
Expand Down
115 changes: 26 additions & 89 deletions nng_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,16 @@ struct nng_sockaddr_inproc {
uint16_t sa_family;
char sa_name[(128)];
};
typedef struct nng_sockaddr_inproc nng_sockaddr_inproc;
struct nng_sockaddr_path {
uint16_t sa_family;
char sa_path[(128)];
};
typedef struct nng_sockaddr_path nng_sockaddr_path;
typedef struct nng_sockaddr_path nng_sockaddr_ipc;
struct nng_sockaddr_in6 {
uint16_t sa_family;
uint16_t sa_port;
uint8_t sa_addr[16];
uint32_t sa_scope;
};
typedef struct nng_sockaddr_in6 nng_sockaddr_in6;
typedef struct nng_sockaddr_in6 nng_sockaddr_udp6;
typedef struct nng_sockaddr_in6 nng_sockaddr_tcp6;
struct nng_sockaddr_in {
uint16_t sa_family;
uint16_t sa_port;
Expand All @@ -48,25 +43,41 @@ struct nng_sockaddr_zt {
uint64_t sa_nodeid;
uint32_t sa_port;
};
struct nng_sockaddr_abstract {
uint16_t sa_family;
uint16_t sa_len;
uint8_t sa_name[107];
};
struct nng_sockaddr_storage {
uint16_t sa_family;
uint64_t sa_pad[16];
};
typedef struct nng_sockaddr_inproc nng_sockaddr_inproc;
typedef struct nng_sockaddr_path nng_sockaddr_path;
typedef struct nng_sockaddr_path nng_sockaddr_ipc;
typedef struct nng_sockaddr_in nng_sockaddr_in;
typedef struct nng_sockaddr_in nng_sockaddr_udp;
typedef struct nng_sockaddr_in nng_sockaddr_tcp;
typedef struct nng_sockaddr_in6 nng_sockaddr_in6;
typedef struct nng_sockaddr_zt nng_sockaddr_zt;
typedef struct nng_sockaddr_abstract nng_sockaddr_abstract;
typedef struct nng_sockaddr_storage nng_sockaddr_storage;
typedef union nng_sockaddr {
uint16_t s_family;
nng_sockaddr_ipc s_ipc;
nng_sockaddr_inproc s_inproc;
nng_sockaddr_in6 s_in6;
nng_sockaddr_in s_in;
nng_sockaddr_zt s_zt;
nng_sockaddr_abstract s_abstract;
nng_sockaddr_storage s_storage;
} nng_sockaddr;
enum nng_sockaddr_family {
NNG_AF_UNSPEC = 0,
NNG_AF_INPROC = 1,
NNG_AF_IPC = 2,
NNG_AF_INET = 3,
NNG_AF_INET6 = 4,
NNG_AF_ZT = 5
NNG_AF_ZT = 5,
NNG_AF_ABSTRACT = 6
};
typedef struct nng_iov {
void * iov_buf;
Expand All @@ -75,23 +86,6 @@ typedef struct nng_iov {
extern void nng_fini(void);
extern int nng_close(nng_socket);
extern int nng_socket_id(nng_socket);
extern void nng_closeall(void);
extern int nng_setopt(nng_socket, const char *, const void *, size_t);
extern int nng_setopt_bool(nng_socket, const char *, bool);
extern int nng_setopt_int(nng_socket, const char *, int);
extern int nng_setopt_ms(nng_socket, const char *, nng_duration);
extern int nng_setopt_size(nng_socket, const char *, size_t);
extern int nng_setopt_uint64(nng_socket, const char *, uint64_t);
extern int nng_setopt_string(nng_socket, const char *, const char *);
extern int nng_setopt_ptr(nng_socket, const char *, void *);
extern int nng_getopt(nng_socket, const char *, void *, size_t *);
extern int nng_getopt_bool(nng_socket, const char *, bool *);
extern int nng_getopt_int(nng_socket, const char *, int *);
extern int nng_getopt_ms(nng_socket, const char *, nng_duration *);
extern int nng_getopt_size(nng_socket, const char *, size_t *);
extern int nng_getopt_uint64(nng_socket, const char *, uint64_t *);
extern int nng_getopt_ptr(nng_socket, const char *, void **);
extern int nng_getopt_string(nng_socket, const char *, char **);
extern int nng_socket_set(nng_socket, const char *, const void *, size_t);
extern int nng_socket_set_bool(nng_socket, const char *, bool);
extern int nng_socket_set_int(nng_socket, const char *, int);
Expand Down Expand Up @@ -129,24 +123,6 @@ extern int nng_dialer_close(nng_dialer);
extern int nng_listener_close(nng_listener);
extern int nng_dialer_id(nng_dialer);
extern int nng_listener_id(nng_listener);
extern int nng_dialer_setopt(nng_dialer, const char *, const void *, size_t);
extern int nng_dialer_setopt_bool(nng_dialer, const char *, bool);
extern int nng_dialer_setopt_int(nng_dialer, const char *, int);
extern int nng_dialer_setopt_ms(nng_dialer, const char *, nng_duration);
extern int nng_dialer_setopt_size(nng_dialer, const char *, size_t);
extern int nng_dialer_setopt_uint64(nng_dialer, const char *, uint64_t);
extern int nng_dialer_setopt_ptr(nng_dialer, const char *, void *);
extern int nng_dialer_setopt_string(nng_dialer, const char *, const char *);
extern int nng_dialer_getopt(nng_dialer, const char *, void *, size_t *);
extern int nng_dialer_getopt_bool(nng_dialer, const char *, bool *);
extern int nng_dialer_getopt_int(nng_dialer, const char *, int *);
extern int nng_dialer_getopt_ms(nng_dialer, const char *, nng_duration *);
extern int nng_dialer_getopt_size(nng_dialer, const char *, size_t *);
extern int nng_dialer_getopt_sockaddr(
nng_dialer, const char *, nng_sockaddr *);
extern int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *);
extern int nng_dialer_getopt_ptr(nng_dialer, const char *, void **);
extern int nng_dialer_getopt_string(nng_dialer, const char *, char **);
extern int nng_dialer_set(nng_dialer, const char *, const void *, size_t);
extern int nng_dialer_set_bool(nng_dialer, const char *, bool);
extern int nng_dialer_set_int(nng_dialer, const char *, int);
Expand All @@ -166,28 +142,6 @@ extern int nng_dialer_get_string(nng_dialer, const char *, char **);
extern int nng_dialer_get_ptr(nng_dialer, const char *, void **);
extern int nng_dialer_get_ms(nng_dialer, const char *, nng_duration *);
extern int nng_dialer_get_addr(nng_dialer, const char *, nng_sockaddr *);
extern int nng_listener_setopt(
nng_listener, const char *, const void *, size_t);
extern int nng_listener_setopt_bool(nng_listener, const char *, bool);
extern int nng_listener_setopt_int(nng_listener, const char *, int);
extern int nng_listener_setopt_ms(nng_listener, const char *, nng_duration);
extern int nng_listener_setopt_size(nng_listener, const char *, size_t);
extern int nng_listener_setopt_uint64(nng_listener, const char *, uint64_t);
extern int nng_listener_setopt_ptr(nng_listener, const char *, void *);
extern int nng_listener_setopt_string(
nng_listener, const char *, const char *);
extern int nng_listener_getopt(nng_listener, const char *, void *, size_t *);
extern int nng_listener_getopt_bool(nng_listener, const char *, bool *);
extern int nng_listener_getopt_int(nng_listener, const char *, int *);
extern int nng_listener_getopt_ms(
nng_listener, const char *, nng_duration *);
extern int nng_listener_getopt_size(nng_listener, const char *, size_t *);
extern int nng_listener_getopt_sockaddr(
nng_listener, const char *, nng_sockaddr *);
extern int nng_listener_getopt_uint64(
nng_listener, const char *, uint64_t *);
extern int nng_listener_getopt_ptr(nng_listener, const char *, void **);
extern int nng_listener_getopt_string(nng_listener, const char *, char **);
extern int nng_listener_set(
nng_listener, const char *, const void *, size_t);
extern int nng_listener_set_bool(nng_listener, const char *, bool);
Expand Down Expand Up @@ -220,16 +174,6 @@ extern int nng_ctx_close(nng_ctx);
extern int nng_ctx_id(nng_ctx);
extern void nng_ctx_recv(nng_ctx, nng_aio *);
extern void nng_ctx_send(nng_ctx, nng_aio *);
extern int nng_ctx_getopt(nng_ctx, const char *, void *, size_t *);
extern int nng_ctx_getopt_bool(nng_ctx, const char *, bool *);
extern int nng_ctx_getopt_int(nng_ctx, const char *, int *);
extern int nng_ctx_getopt_ms(nng_ctx, const char *, nng_duration *);
extern int nng_ctx_getopt_size(nng_ctx, const char *, size_t *);
extern int nng_ctx_setopt(nng_ctx, const char *, const void *, size_t);
extern int nng_ctx_setopt_bool(nng_ctx, const char *, bool);
extern int nng_ctx_setopt_int(nng_ctx, const char *, int);
extern int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration);
extern int nng_ctx_setopt_size(nng_ctx, const char *, size_t);
extern int nng_ctx_get(nng_ctx, const char *, void *, size_t *);
extern int nng_ctx_get_bool(nng_ctx, const char *, bool *);
extern int nng_ctx_get_int(nng_ctx, const char *, int *);
Expand All @@ -254,6 +198,7 @@ extern char *nng_strdup(const char *);
extern void nng_strfree(char *);
extern int nng_aio_alloc(nng_aio **, void (*)(void *), void *);
extern void nng_aio_free(nng_aio *);
extern void nng_aio_reap(nng_aio *);
extern void nng_aio_stop(nng_aio *);
extern int nng_aio_result(nng_aio *);
extern size_t nng_aio_count(nng_aio *);
Expand All @@ -276,6 +221,8 @@ extern void nng_sleep_aio(nng_duration, nng_aio *);
extern int nng_msg_alloc(nng_msg **, size_t);
extern void nng_msg_free(nng_msg *);
extern int nng_msg_realloc(nng_msg *, size_t);
extern int nng_msg_reserve(nng_msg *, size_t);
extern size_t nng_msg_capacity(nng_msg *);
extern void * nng_msg_header(nng_msg *);
extern size_t nng_msg_header_len(const nng_msg *);
extern void * nng_msg_body(nng_msg *);
Expand Down Expand Up @@ -317,16 +264,6 @@ extern void nng_msg_clear(nng_msg *);
extern void nng_msg_header_clear(nng_msg *);
extern void nng_msg_set_pipe(nng_msg *, nng_pipe);
extern nng_pipe nng_msg_get_pipe(const nng_msg *);
extern int nng_msg_getopt(nng_msg *, int, void *, size_t *);
extern int nng_pipe_getopt(nng_pipe, const char *, void *, size_t *);
extern int nng_pipe_getopt_bool(nng_pipe, const char *, bool *);
extern int nng_pipe_getopt_int(nng_pipe, const char *, int *);
extern int nng_pipe_getopt_ms(nng_pipe, const char *, nng_duration *);
extern int nng_pipe_getopt_size(nng_pipe, const char *, size_t *);
extern int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *);
extern int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *);
extern int nng_pipe_getopt_ptr(nng_pipe, const char *, void **);
extern int nng_pipe_getopt_string(nng_pipe, const char *, char **);
extern int nng_pipe_get(nng_pipe, const char *, void *, size_t *);
extern int nng_pipe_get_bool(nng_pipe, const char *, bool *);
extern int nng_pipe_get_int(nng_pipe, const char *, int *);
Expand Down Expand Up @@ -369,6 +306,7 @@ enum nng_unit_enum {
NNG_UNIT_EVENTS = 4
};
extern uint64_t nng_stat_value(nng_stat *);
extern bool nng_stat_bool(nng_stat *);
extern const char *nng_stat_string(nng_stat *);
extern const char *nng_stat_desc(nng_stat *);
extern uint64_t nng_stat_timestamp(nng_stat *);
Expand Down Expand Up @@ -591,9 +529,8 @@ int nng_tls_config_version(
const char *nng_tls_engine_name(void);
const char *nng_tls_engine_description(void);
bool nng_tls_engine_fips_mode(void);
int nng_tls_register(void);
#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer
#define NNG_FLAG_NONBLOCK 2u // Non-blocking operations
#define NNG_MAJOR_VERSION 1
#define NNG_MINOR_VERSION 4
#define NNG_PATCH_VERSION 0
#define NNG_MINOR_VERSION 5
#define NNG_PATCH_VERSION 2
4 changes: 2 additions & 2 deletions pynng/_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ async def wait_for_aio():
except curio.CancelledError:
if fut.cancelled():
lib.nng_aio_cancel(aio.aio)

err = lib.nng_aio_result(aio.aio)
if err == lib.NNG_ECANCELED:
raise curio.CancelledError()
check_err(err)

def callback():
if not fut.cancelled():
fut.set_result(True)
Expand Down
64 changes: 60 additions & 4 deletions pynng/nng.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def __set__(self, instance, value):
self.__class__._setter(instance, self.option, value)


class ArbitraryOption(_NNGOption):
"""Descriptor for getting/setting arbitrary options"""
_getter = options._getopt_arbitrary
_setter = options._setopt_arbitrary


class IntOption(_NNGOption):
"""Descriptor for getting/setting integer options"""
_getter = options._getopt_int
Expand Down Expand Up @@ -279,7 +285,10 @@ class Socket:
tcp_nodelay = BooleanOption('tcp-nodelay')
tcp_keepalive = BooleanOption('tcp-keepalive')

tls_config = PointerOption('tls-config')
#tls_config = PointerOption('tls-config')
# Sockets are Transport agnostic.
# The tls-config is transport specific and has to be set on the listener/dialer
tls_config = None

def __init__(self, *,
dial=None,
Expand Down Expand Up @@ -386,11 +395,21 @@ def _dial(self, address, flags=0):
"""
dialer = ffi.new('nng_dialer *')
ret = lib.nng_dial(self.socket, to_char(address), dialer, flags)
if self.tls_config:
ret = lib.nng_dialer_create(dialer, self.socket, to_char(address))
else:
ret = lib.nng_dial(self.socket, to_char(address), dialer, flags)
check_err(ret)
# we can only get here if check_err doesn't raise
d_id = lib.nng_dialer_id(dialer[0])
py_dialer = Dialer(dialer, self)
if self.tls_config:
py_dialer.tls_config = self.tls_config
lib.nng_dialer_start(dialer[0], flags)
# FIXME: Set the tls_config to None here
# If one wants another dialer with the same tls_config it
# has to be set again which might be confusing
self.tls_config = None
self._dialers[d_id] = py_dialer
return py_dialer

Expand All @@ -401,11 +420,21 @@ def listen(self, address, flags=0):
"""
listener = ffi.new('nng_listener *')
ret = lib.nng_listen(self.socket, to_char(address), listener, flags)
if self.tls_config:
ret = lib.nng_listener_create(listener, self.socket, to_char(address))
else:
ret = lib.nng_listen(self.socket, to_char(address), listener, flags)
check_err(ret)
# we can only get here if check_err doesn't raise
l_id = lib.nng_listener_id(listener[0])
py_listener = Listener(listener, self)
if self.tls_config:
py_listener.tls_config = self.tls_config
lib.nng_listener_start(listener[0], flags)
# FIXME: Set the tls_config to None here
# If one wants another listener with the same tls_config it
# has to be set again which might be confusing
self.tls_config = None
self._listeners[l_id] = py_listener
return py_listener

Expand Down Expand Up @@ -873,11 +902,38 @@ def subscribe(self, topic):
desired behavior, just pass :class:`bytes` in as the topic.
"""
options._setopt_string(self, b'sub:subscribe', topic)
options._setopt_arbitrary(self, b'sub:subscribe', topic)

def unsubscribe(self, topic):
"""Unsubscribe to the specified topic.
.. Note::
If you pass a :class:`str` as the ``topic``, it will be
automatically encoded with :meth:`str.encode`. If this is not the
desired behavior, just pass :class:`bytes` in as the topic.
"""
options._setopt_arbitrary(self, b'sub:unsubscribe', topic)

def subscribe_string(self, topic):
"""Subscribe to the specified topic.
Topics are matched by looking at the first bytes of any received
message.
.. Note::
If you pass a :class:`str` as the ``topic``, it will be
automatically encoded with :meth:`str.encode`. If this is not the
desired behavior, just pass :class:`bytes` in as the topic.
"""
options._setopt_string(self, b'sub:subscribe', topic)

def unsubscribe_string(self, topic):
"""Unsubscribe to the specified topic.
.. Note::
If you pass a :class:`str` as the ``topic``, it will be
Expand Down
Loading

0 comments on commit fc6dda5

Please sign in to comment.