Skip to content

Commit

Permalink
Implement memory and CPU hotplug
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dagnelie <pcd@delphix.com>
  • Loading branch information
pcd1193182 committed Nov 16, 2020
1 parent a724db0 commit 99b62a4
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 18 deletions.
5 changes: 5 additions & 0 deletions include/os/linux/spl/sys/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ typedef struct taskq {
int tq_nthreads; /* # of existing threads */
int tq_nspawn; /* # of threads being spawned */
int tq_maxthreads; /* # of threads maximum */
/* original maxthreads arg to create */
int tq_orig_maxthreads;
int tq_pri; /* priority */
int tq_minalloc; /* min taskq_ent_t pool size */
int tq_maxalloc; /* max taskq_ent_t pool size */
Expand All @@ -99,6 +101,9 @@ typedef struct taskq {
spl_wait_queue_head_t tq_work_waitq; /* new work waitq */
spl_wait_queue_head_t tq_wait_waitq; /* wait waitq */
tq_lock_role_t tq_lock_class; /* class when taking tq_lock */
/* list node fot the cpu hotplug callback */
struct hlist_node tq_hp_cb_node;
boolean_t tq_hp_support;
} taskq_t;

typedef struct taskq_ent {
Expand Down
1 change: 1 addition & 0 deletions include/sys/arc_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ extern int arc_memory_throttle(spa_t *spa, uint64_t reserve, uint64_t txg);
extern uint64_t arc_free_memory(void);
extern int64_t arc_available_memory(void);
extern void arc_tuning_update(boolean_t);
extern void arc_register_hotplug(void);

extern int param_set_arc_long(ZFS_MODULE_PARAM_ARGS);
extern int param_set_arc_int(ZFS_MODULE_PARAM_ARGS);
Expand Down
6 changes: 6 additions & 0 deletions module/os/freebsd/zfs/arc_os.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,9 @@ arc_lowmem_fini(void)
if (arc_event_lowmem != NULL)
EVENTHANDLER_DEREGISTER(vm_lowmem, arc_event_lowmem);
}

void
arc_register_hotplug(void)
{
return;
}
115 changes: 109 additions & 6 deletions module/os/linux/spl/spl-taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <sys/kmem.h>
#include <sys/tsd.h>
#include <sys/trace_spl.h>
#include <linux/cpuhotplug.h>

int spl_taskq_thread_bind = 0;
module_param(spl_taskq_thread_bind, int, 0644);
Expand Down Expand Up @@ -59,6 +60,9 @@ EXPORT_SYMBOL(system_delay_taskq);
static taskq_t *dynamic_taskq;
static taskq_thread_t *taskq_thread_create(taskq_t *);

/* Multi-callback id for cpu hotplugging. */
static int spl_taskq_cpuhp_state;

/* List of all taskqs */
LIST_HEAD(tq_list);
struct rw_semaphore tq_list_sem;
Expand Down Expand Up @@ -1031,6 +1035,7 @@ taskq_create(const char *name, int nthreads, pri_t pri,
taskq_thread_t *tqt;
int count = 0, rc = 0, i;
unsigned long irqflags;
int proper_nthreads = nthreads;

ASSERT(name != NULL);
ASSERT(minalloc >= 0);
Expand All @@ -1041,14 +1046,25 @@ taskq_create(const char *name, int nthreads, pri_t pri,
if (flags & TASKQ_THREADS_CPU_PCT) {
ASSERT(nthreads <= 100);
ASSERT(nthreads >= 0);
nthreads = MIN(nthreads, 100);
nthreads = MAX(nthreads, 0);
nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
proper_nthreads = MIN(nthreads, 100);
proper_nthreads = MAX(proper_nthreads, 0);
proper_nthreads = MAX((num_online_cpus() * proper_nthreads) /
100, 1);
}

tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
if (tq == NULL)
return (NULL);
if (nthreads == boot_ncpus || flags & TASKQ_THREADS_CPU_PCT) {
tq->tq_hp_support = B_TRUE;
if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
&tq->tq_hp_cb_node) != 0) {
kmem_free(tq, sizeof (*tq));
return (NULL);
}
} else {
tq->tq_hp_support = B_FALSE;
}

spin_lock_init(&tq->tq_lock);
INIT_LIST_HEAD(&tq->tq_thread_list);
Expand All @@ -1057,7 +1073,8 @@ taskq_create(const char *name, int nthreads, pri_t pri,
tq->tq_nactive = 0;
tq->tq_nthreads = 0;
tq->tq_nspawn = 0;
tq->tq_maxthreads = nthreads;
tq->tq_maxthreads = proper_nthreads;
tq->tq_orig_maxthreads = nthreads;
tq->tq_pri = pri;
tq->tq_minalloc = minalloc;
tq->tq_maxalloc = maxalloc;
Expand Down Expand Up @@ -1088,7 +1105,7 @@ taskq_create(const char *name, int nthreads, pri_t pri,
if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
nthreads = 1;

for (i = 0; i < nthreads; i++) {
for (i = 0; i < proper_nthreads; i++) {
tqt = taskq_thread_create(tq);
if (tqt == NULL)
rc = 1;
Expand Down Expand Up @@ -1131,6 +1148,10 @@ taskq_destroy(taskq_t *tq)
tq->tq_flags &= ~TASKQ_ACTIVE;
spin_unlock_irqrestore(&tq->tq_lock, flags);

if (tq->tq_hp_support) {
VERIFY0(cpuhp_state_remove_instance_nocalls(
spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
}
/*
* When TASKQ_ACTIVE is clear new tasks may not be added nor may
* new worker threads be spawned for dynamic taskq.
Expand Down Expand Up @@ -1198,7 +1219,6 @@ taskq_destroy(taskq_t *tq)
}
EXPORT_SYMBOL(taskq_destroy);


static unsigned int spl_taskq_kick = 0;

/*
Expand Down Expand Up @@ -1255,12 +1275,92 @@ module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
MODULE_PARM_DESC(spl_taskq_kick,
"Write nonzero to kick stuck taskqs to spawn more threads");

static int
spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
{
taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
unsigned long flags;
int err = 0;

ASSERT(tq);
spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);

if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out;

if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) {
int nthreads = MIN(tq->tq_orig_maxthreads, 100);
nthreads = MAX(nthreads, 0);
nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
tq->tq_maxthreads = nthreads;
} else {
tq->tq_maxthreads++;
}

if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)) {
taskq_thread_t *tqt = taskq_thread_create(tq);
if (tqt == NULL)
err = -1;
}

out:
spin_unlock_irqrestore(&tq->tq_lock, flags);
return (err);
}

/*
* While we don't support offlining CPUs, it is possible that CPUs will fail
* to online successfully. We do need to be able to handle this case
* gracefully.
*/
static int
spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
{
taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
unsigned long flags;
int err = 0;

ASSERT(tq);
spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);

if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out;

if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) {
int nthreads = MIN(tq->tq_orig_maxthreads, 100);
nthreads = MAX(nthreads, 0);
nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
tq->tq_maxthreads = nthreads;
} else {
tq->tq_maxthreads--;
}

if (!(flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic) {
taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
taskq_thread_t, tqt_thread_list);
struct task_struct *thread = tqt->tqt_thread;
spin_unlock_irqrestore(&tq->tq_lock, flags);

kthread_stop(thread);

spin_lock_irqsave_nested(&tq->tq_lock, flags,
tq->tq_lock_class);
}

out:
spin_unlock_irqrestore(&tq->tq_lock, flags);
return (err);
}

int
spl_taskq_init(void)
{
init_rwsem(&tq_list_sem);
tsd_create(&taskq_tsd, NULL);

spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
"fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);

system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
if (system_taskq == NULL)
Expand Down Expand Up @@ -1304,4 +1404,7 @@ spl_taskq_fini(void)
system_taskq = NULL;

tsd_destroy(&taskq_tsd);

cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
spl_taskq_cpuhp_state = 0;
}
67 changes: 56 additions & 11 deletions module/os/linux/zfs/arc_os.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
#include <sys/vmsystm.h>
#include <sys/zpl.h>
#include <linux/page_compat.h>
#include <linux/notifier.h>
#include <linux/memory.h>
#endif
#include <sys/callb.h>
#include <sys/kstat.h>
Expand Down Expand Up @@ -278,18 +280,9 @@ arc_memory_throttle(spa_t *spa, uint64_t reserve, uint64_t txg)
return (0);
}

void
arc_lowmem_init(void)
static void
arc_set_sys_free(uint64_t allmem)
{
uint64_t allmem = arc_all_memory();

/*
* Register a shrinker to support synchronous (direct) memory
* reclaim from the arc. This is done to prevent kswapd from
* swapping out pages when it is preferable to shrink the arc.
*/
spl_register_shrinker(&arc_shrinker);

/*
* The ARC tries to keep at least this much memory available for the
* system. This gives the ARC time to shrink in response to memory
Expand Down Expand Up @@ -342,6 +335,20 @@ arc_lowmem_init(void)
arc_sys_free = wmark * 3 + allmem / 32;
}

void
arc_lowmem_init(void)
{
uint64_t allmem = arc_all_memory();

/*
* Register a shrinker to support synchronous (direct) memory
* reclaim from the arc. This is done to prevent kswapd from
* swapping out pages when it is preferable to shrink the arc.
*/
spl_register_shrinker(&arc_shrinker);
arc_set_sys_free(allmem);
}

void
arc_lowmem_fini(void)
{
Expand Down Expand Up @@ -375,6 +382,39 @@ param_set_arc_int(const char *buf, zfs_kernel_param_t *kp)

return (0);
}

/* ARGSUSED */
static int
arc_hotplug_callback(struct notifier_block *self, unsigned long action,
void *arg)
{
uint64_t allmem = arc_all_memory();
if (action != MEM_ONLINE)
return (NOTIFY_OK);

arc_c_min = MAX(allmem / 32, 2ULL << SPA_MAXBLOCKSHIFT);
arc_c_max = arc_default_max(arc_c_min, allmem);

#ifdef __LP64__
if (zfs_dirty_data_max_max == 0)
zfs_dirty_data_max_max = MIN(4ULL * 1024 * 1024 * 1024,
allmem * zfs_dirty_data_max_max_percent / 100);
#else
if (zfs_dirty_data_max_max == 0)
zfs_dirty_data_max_max = MIN(1ULL * 1024 * 1024 * 1024,
allmem * zfs_dirty_data_max_max_percent / 100);
#endif

arc_set_sys_free(allmem);
return (NOTIFY_OK);
}

void
arc_register_hotplug(void)
{
// There is no significance to the value 100
hotplug_memory_notifier(arc_hotplug_callback, 100);
}
#else /* _KERNEL */
int64_t
arc_available_memory(void)
Expand Down Expand Up @@ -405,6 +445,11 @@ arc_free_memory(void)
{
return (spa_get_random(arc_all_memory() * 20 / 100));
}
void
arc_register_hotplug(void)
{
return;
}
#endif /* _KERNEL */

/*
Expand Down
2 changes: 2 additions & 0 deletions module/zfs/arc.c
Original file line number Diff line number Diff line change
Expand Up @@ -7647,6 +7647,8 @@ arc_init(void)
if (arc_c < arc_c_min)
arc_c = arc_c_min;

arc_register_hotplug();

arc_state_init();

buf_init();
Expand Down
8 changes: 7 additions & 1 deletion module/zfs/dmu_objset.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,13 @@ dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
mutex_init(&os->os_userused_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&os->os_obj_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&os->os_user_ptr_lock, NULL, MUTEX_DEFAULT, NULL);
os->os_obj_next_percpu_len = boot_ncpus;
/*
* We allocate an array that is large enough to have a separate entry
* for each cpu the system might eventually have, up to 32 cpus. This
* allows us to provide good performance after cpus are hot-added
* without using boundless amouunts of memory.
*/
os->os_obj_next_percpu_len = MAX(boot_ncpus, MIN(32, max_ncpus));
os->os_obj_next_percpu = kmem_zalloc(os->os_obj_next_percpu_len *
sizeof (os->os_obj_next_percpu[0]), KM_SLEEP);

Expand Down

0 comments on commit 99b62a4

Please sign in to comment.