Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared CPU NFs (#150) #4

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
[openNetVM][onvm]
[openNetVM][onvm] - with Shared CPUs
==

WARNING
--
This is an **EXPERIMENTAL** version of OpenNetVM. It allows multiple NFs to run on a shared core. In "normal" OpenNetVM, each NF will poll its RX queue for packets, monopolizing the CPU even if it has a low load. This branch adds a semaphore-based communication system so that NFs will block when there are no packets available. The NF Manger will then signal the semaphore once one or more packets arrive.

This code allows you to evaluate resource management techniques for NFs that share cores, however it has not been fully tested and should be considered unstable and unsupported.

For a description of how the code works, see the paper _Flurries: Countless Fine-Grained NFs for Flexible Per-Flow Customization_ by Wei Zhang, Jinho Hwang, Shriram Rajagopalan, K. K. Ramakrishnan, and Timothy Wood, published at _Co-NEXT 16_. Note that this code does not contain the full Flurries system, only the basic support for shared-CPU NFs.

Usage / Known Limitations:
- All code for sharing CPUs is within `#ifdef INTERRUPT_SEM` blocks. This macro is defined in `onvm/onvm_nflib/onvm_common.h`
- When enabled, you can run multiple NFs on the same CPU core with much less interference than if they are polling for packets
- Note that the manager threads all still use polling
- This code does not provide any particular intelligence for how NFs are scheduled or when they wakeup/sleep
- Currently ONVM only supports a max of 16 NFs. This can be adjusted by changing macros in `onvm/onvm_nflib/onvm_common.h`
- Current code has a bug where if multiple NFs start at the exact same time the manager will not correctly assign IDs. You may need to stagger NF startup to avoid this.
- Killing the manager will not correctly kill all NFs (since they are blocked on semaphore and don't get the shutdown message). You must kill NFs manually with `ctrl-c`.

WARNING
--

About
--
openNetVM is a high performance NFV platform based on [Intel DPDK][dpdk] and [Docker][docker] containers. openNetVM is SDN-enabled, allowing the network controller to provide rules that dictate what network functions need to process each packet flow.
Expand Down
141 changes: 141 additions & 0 deletions onvm/onvm_mgr/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
#include "onvm_pkt.h"
#include "onvm_nf.h"

#ifdef INTERRUPT_SEM
struct wakeup_info *wakeup_infos;
#endif //INTERRUPT_SEM

/****************************Internal Declarations****************************/

Expand Down Expand Up @@ -223,7 +226,22 @@ handle_signal(int sig) {


/*******************************Main function*********************************/
//TODO: Move to apporpriate header or a different file for onvm_nf_wakeup_mgr/hdlr.c
#ifdef INTERRUPT_SEM
#include <signal.h>

unsigned nfs_wakethr[MAX_CLIENTS] = {[0 ... MAX_CLIENTS-1] = 1};

static void
register_signal_handler(void);
static inline int
whether_wakeup_client(int instance_id);
static inline void
wakeup_client(int instance_id, struct wakeup_info *wakeup_info);
static int
wakeup_nfs(void *arg);

#endif

int
main(int argc, char *argv[]) {
Expand All @@ -232,6 +250,10 @@ main(int argc, char *argv[]) {
unsigned i;

/* initialise the system */
#ifdef INTERRUPT_SEM
unsigned wakeup_lcores;
register_signal_handler();
#endif

/* Reserve ID 0 for internal manager things */
next_instance_id = 1;
Expand All @@ -245,14 +267,24 @@ main(int argc, char *argv[]) {
/* Reserve n cores for: 1 Stats, 1 final Tx out, and ONVM_NUM_RX_THREADS for Rx */
cur_lcore = rte_lcore_id();
rx_lcores = ONVM_NUM_RX_THREADS;

#ifdef INTERRUPT_SEM
wakeup_lcores = ONVM_NUM_WAKEUP_THREADS;
tx_lcores = rte_lcore_count() - rx_lcores - wakeup_lcores - 1;
#else
tx_lcores = rte_lcore_count() - rx_lcores - 1;
#endif


/* Offset cur_lcore to start assigning TX cores */
cur_lcore += (rx_lcores-1);

RTE_LOG(INFO, APP, "%d cores available in total\n", rte_lcore_count());
RTE_LOG(INFO, APP, "%d cores available for handling manager RX queues\n", rx_lcores);
RTE_LOG(INFO, APP, "%d cores available for handling TX queues\n", tx_lcores);
#ifdef INTERRUPT_SEM
RTE_LOG(INFO, APP, "%d cores available for handling wakeup\n", wakeup_lcores);
#endif
RTE_LOG(INFO, APP, "%d cores available for handling stats\n", 1);

/* Evenly assign NFs to TX threads */
Expand Down Expand Up @@ -308,7 +340,116 @@ main(int argc, char *argv[]) {
}
}

#ifdef INTERRUPT_SEM
int clients_per_wakethread = ceil((unsigned)MAX_CLIENTS / wakeup_lcores);
wakeup_infos = (struct wakeup_info *)calloc(wakeup_lcores, sizeof(struct wakeup_info));
if (wakeup_infos == NULL) {
printf("can not alloc space for wakeup_info\n");
exit(1);
}
for (i = 0; i < wakeup_lcores; i++) {
wakeup_infos[i].first_client = RTE_MIN(i * clients_per_wakethread + 1, (unsigned)MAX_CLIENTS);
wakeup_infos[i].last_client = RTE_MIN((i+1) * clients_per_wakethread + 1, (unsigned)MAX_CLIENTS);
cur_lcore = rte_get_next_lcore(cur_lcore, 1, 1);
rte_eal_remote_launch(wakeup_nfs, (void*)&wakeup_infos[i], cur_lcore);
printf("wakeup lcore_id=%d, first_client=%d, last_client=%d\n", cur_lcore, wakeup_infos[i].first_client, wakeup_infos[i].last_client);
}

/* this change is Not needed anymore
cur_lcore = rte_get_next_lcore(cur_lcore, 1, 1);
printf("monitor_lcore=%u\n", cur_lcore);
rte_eal_remote_launch(monitor, NULL, cur_lcore);
*/
#endif

/* Master thread handles statistics and NF management */
master_thread_main();

return 0;
}

/*******************************Helper functions********************************/
#ifdef INTERRUPT_SEM
#define WAKEUP_THRESHOLD 1
static inline int
whether_wakeup_client(int instance_id)
{
uint16_t cur_entries;
if (clients[instance_id].rx_q == NULL) {
return 0;
}
cur_entries = rte_ring_count(clients[instance_id].rx_q);
if (cur_entries >= nfs_wakethr[instance_id]){
return 1;
}
return 0;
}

static inline void
wakeup_client(int instance_id, struct wakeup_info *wakeup_info)
{
if (whether_wakeup_client(instance_id) == 1) {
if (rte_atomic16_read(clients[instance_id].shm_server) ==1) {
wakeup_info->num_wakeups += 1;
rte_atomic16_set(clients[instance_id].shm_server, 0);
sem_post(clients[instance_id].mutex);
}
}
}

static int
wakeup_nfs(void *arg)
{
struct wakeup_info *wakeup_info = (struct wakeup_info *)arg;
unsigned i;

/*
if (wakeup_info->first_client == 1) {
wakeup_info->first_client += ONVM_SPECIAL_NF;
}
*/

while (true) {
for (i = wakeup_info->first_client; i < wakeup_info->last_client; i++) {
wakeup_client(i, wakeup_info);
}
}

return 0;
}

static void signal_handler(int sig, siginfo_t *info, void *secret) {
int i;
(void)info;
(void)secret;

//2 means terminal interrupt, 3 means terminal quit, 9 means kill and 15 means termination
if (sig <= 15) {
// handle for all clients or check for onvm_nf_is_valid(). Not needed!
for (i = 1; i < MAX_CLIENTS; i++) {
sem_close(clients[i].mutex);
sem_unlink(clients[i].sem_name);
}
#ifdef MONITOR
// rte_free(port_stats);
// rte_free(port_prev_stats);
#endif
}

exit(1);
}
static void
register_signal_handler(void) {
unsigned i;
struct sigaction act;
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO;
act.sa_handler = (void *)signal_handler;

for (i = 1; i < 31; i++) {
sigaction(i, &act, 0);
}
}
#endif

72 changes: 54 additions & 18 deletions onvm/onvm_mgr/onvm_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int
init(int argc, char *argv[]) {
int retval;
const struct rte_memzone *mz;
const struct rte_memzone *mz_scp;
const struct rte_memzone *mz_scp;
uint8_t i, total_ports;

/* init EAL, parsing EAL args */
Expand All @@ -109,7 +109,7 @@ init(int argc, char *argv[]) {
memset(mz->addr, 0, sizeof(*clients_stats));
clients_stats = mz->addr;

/* set up ports info */
/* set up ports info */
ports = rte_malloc(MZ_PORT_INFO, sizeof(*ports), 0);
if (ports == NULL)
rte_exit(EXIT_FAILURE, "Cannot allocate memory for ports details\n");
Expand All @@ -136,7 +136,7 @@ init(int argc, char *argv[]) {
rte_exit(EXIT_FAILURE, "Cannot create nf message pool: %s\n", rte_strerror(rte_errno));
}

/* now initialise the ports we will use */
/* now initialise the ports we will use */
for (i = 0; i < ports->num_ports; i++) {
retval = init_port(ports->id[i]);
if (retval != 0)
Expand All @@ -152,28 +152,28 @@ init(int argc, char *argv[]) {
/* initialise a queue for newly created NFs */
init_info_queue();

/*initialize a default service chain*/
default_chain = onvm_sc_create();
retval = onvm_sc_append_entry(default_chain, ONVM_NF_ACTION_TONF, 1);
/*initialize a default service chain*/
default_chain = onvm_sc_create();
retval = onvm_sc_append_entry(default_chain, ONVM_NF_ACTION_TONF, 1);
if (retval == ENOSPC) {
printf("chain length can not be larger than the maximum chain length\n");
exit(1);
}
printf("Default service chain: send to sdn NF\n");
printf("Default service chain: send to sdn NF\n");

/* set up service chain pointer shared to NFs*/
mz_scp = rte_memzone_reserve(MZ_SCP_INFO, sizeof(struct onvm_service_chain *),
rte_socket_id(), NO_FLAGS);
if (mz_scp == NULL)
rte_exit(EXIT_FAILURE, "Canot reserve memory zone for service chain pointer\n");
memset(mz_scp->addr, 0, sizeof(struct onvm_service_chain *));
default_sc_p = mz_scp->addr;
*default_sc_p = default_chain;
onvm_sc_print(default_chain);
/* set up service chain pointer shared to NFs*/
mz_scp = rte_memzone_reserve(MZ_SCP_INFO, sizeof(struct onvm_service_chain *),
rte_socket_id(), NO_FLAGS);
if (mz_scp == NULL)
rte_exit(EXIT_FAILURE, "Cannot reserve memory zone for service chain pointer\n");
memset(mz_scp->addr, 0, sizeof(struct onvm_service_chain *));
default_sc_p = mz_scp->addr;
*default_sc_p = default_chain;
onvm_sc_print(default_chain);

onvm_flow_dir_init();
onvm_flow_dir_init();

return 0;
return 0;
}


Expand Down Expand Up @@ -312,6 +312,15 @@ init_shm_rings(void) {
const unsigned ringsize = CLIENT_QUEUE_RINGSIZE;
const unsigned msgringsize = CLIENT_MSG_QUEUE_SIZE;

// mutex and semaphores for N
#ifdef INTERRUPT_SEM
const char * sem_name;
sem_t *mutex;
key_t key;
int shmid;
char *shm;
#endif

// use calloc since we allocate for all possible clients
// ensure that all fields are init to 0 to avoid reading garbage
// TODO plopreiato, move to creation when a NF starts
Expand Down Expand Up @@ -356,6 +365,33 @@ init_shm_rings(void) {

if (clients[i].msg_q == NULL)
rte_exit(EXIT_FAILURE, "Cannot create msg queue for client %u\n", i);

#ifdef INTERRUPT_SEM
sem_name = get_sem_name(i);
clients[i].sem_name = sem_name;

fprintf(stderr, "sem_name=%s for client %d\n", sem_name, i);
mutex = sem_open(sem_name, O_CREAT, 06666, 0);
if(mutex == SEM_FAILED) {
fprintf(stderr, "can not create semaphore for client %d\n", i);
sem_unlink(sem_name);
exit(1);
}
clients[i].mutex = mutex;

key = get_rx_shmkey(i);
if ((shmid = shmget(key, SHMSZ, IPC_CREAT | 0666)) < 0) {
fprintf(stderr, "can not create the shared memory segment for client %d\n", i);
exit(1);
}

if ((shm = shmat(shmid, NULL, 0)) == (char *) -1) {
fprintf(stderr, "can not attach the shared segment to the server space for client %d\n", i);
exit(1);
}

clients[i].shm_server = (rte_atomic16_t *)shm;
#endif
}
return 0;
}
Expand Down
22 changes: 21 additions & 1 deletion onvm/onvm_mgr/onvm_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@
#ifndef _ONVM_INIT_H_
#define _ONVM_INIT_H_

/***************************Standard C library********************************/

//#ifdef INTERRUPT_SEM //move maro to makefile, otherwise uncomemnt or need to include these after including common.h
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <semaphore.h>
#include <fcntl.h>
//#endif //INTERRUPT_SEM

/********************************DPDK library*********************************/

Expand Down Expand Up @@ -129,10 +138,21 @@ struct client {
volatile uint64_t act_drop;
volatile uint64_t act_next;
volatile uint64_t act_buffer;
#ifdef INTERRUPT_SEM
volatile uint64_t prev_rx;
volatile uint64_t prev_rx_drop;
#endif
} stats;

/* mutex and semaphore name for NFs to wait on */
#ifdef INTERRUPT_SEM
const char *sem_name;
sem_t *mutex;
key_t shm_key;
rte_atomic16_t *shm_server;
#endif
};


/*
* Shared port info, including statistics information for display by server.
* Structure will be put in a memzone.
Expand Down
13 changes: 13 additions & 0 deletions onvm/onvm_mgr/onvm_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,17 @@ struct thread_info {
struct packet_buf *port_tx_buf;
};


#ifdef INTERRUPT_SEM
/** NFs wakeup Info: used by manager to update NFs pool and wakeup stats
*/
struct wakeup_info {
unsigned first_client;
unsigned last_client;
uint64_t num_wakeups;
uint64_t prev_num_wakeups;
};
#endif //INTERRUPT_SEM


#endif // _ONVM_MGR_H_
Loading