diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index 9346f0980f..dddd2b6f88 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -42,6 +42,7 @@ #include #include #include +#include #include "vdef.h" #include "vrt.h" @@ -284,6 +285,10 @@ enum boc_state_e { #include "tbl/boc_state.h" }; +// cache_obj.h vai notify +struct vai_qe; +VSLIST_HEAD(vai_q_head, vai_qe); + struct boc { unsigned magic; #define BOC_MAGIC 0x70c98476 @@ -296,6 +301,7 @@ struct boc { uint64_t fetched_so_far; uint64_t delivered_so_far; uint64_t transit_buffer; + struct vai_q_head vai_q_head; }; /* Object core structure --------------------------------------------- @@ -769,6 +775,150 @@ int ObjGetDouble(struct worker *, struct objcore *, enum obj_attr, double *); int ObjGetU64(struct worker *, struct objcore *, enum obj_attr, uint64_t *); int ObjCheckFlag(struct worker *, struct objcore *, enum obj_flags of); +/*==================================================================== + * ObjVAI...(): Asynchronous Iteration + * + * see comments in cache_obj.c for usage + */ + +typedef void *vai_hdl; +typedef void vai_notify_cb(vai_hdl, void *priv); + + +/* + * VSCARAB: Varnish SCatter ARAy of Buffers: + * + * an array of viovs, elsewhere also called an siov or sarray + */ +struct viov { + uint64_t lease; + struct iovec iov; +}; + +struct vscarab { + unsigned magic; +#define VSCARAB_MAGIC 0x05ca7ab0 + unsigned flags; +#define VSCARAB_F_END 1 // last viov is last overall + unsigned capacity; + unsigned used; + struct viov s[] v_counted_by_(capacity); +}; + +// VFLA: starting generic container-with-flexible-array-member macros +// aka "struct hack" +// +// type : struct name +// name : a pointer to struct type +// mag : the magic value for this VFLA +// cptr : pointer to container struct (aka "head") +// fam : member name of the flexible array member +// cap : capacity +// +// common properties of all VFLAs: +// - are a miniobj (have magic as the first element) +// - capacity member is the fam capacity +// - used member is the number of fam elements used +// +// VFLA_SIZE ignores the cap == 0 case, we assert in _INIT +// offsetoff ref: https://gustedt.wordpress.com/2011/03/14/flexible-array-member/ +#define VFLA_SIZE(type, fam, cap) (offsetof(struct type, fam) + \ + (cap) * sizeof(((struct type *)0)->fam[0])) +#define VFLA_INIT_(type, cptr, mag, fam, cap, save) do { \ + unsigned save = cap; \ + AN(save); \ + memset((cptr), 0, VFLA_SIZE(type, fam, save)); \ + (cptr)->magic = (mag); \ + (cptr)->capacity = (save); \ +} while (0) +#define VFLA_INIT(type, cptr, mag, fam, cap) \ + VFLA_INIT_(type, cptr, mag, fam, cap, VUNIQ_NAME(save)) +// declare, allocate and initialize a local VFLA +// the additional VLA buf declaration avoids +// "Variable-sized object may not be initialized" +#define VFLA_LOCAL_(type, name, mag, fam, cap, bufname) \ + char bufname[VFLA_SIZE(type, fam, cap)]; \ + struct type *name = (void *)bufname; \ + VFLA_INIT(type, name, mag, fam, cap) +#define VFLA_LOCAL(type, name, mag, fam, cap) \ + VFLA_LOCAL_(type, name, mag, fam, cap, VUNIQ_NAME(buf)) +// malloc and initialize a VFLA +#define VFLA_ALLOC(type, name, mag, fam, cap) do { \ + (name) = malloc(VFLA_SIZE(type, fam, cap)); \ + if ((name) != NULL) \ + VFLA_INIT(type, name, mag, fam, cap); \ +} while(0) +#define VFLA_FOREACH(var, cptr, fam) \ + for (var = &(cptr)->fam[0]; var < &(cptr)->fam[(cptr)->used]; var++) +// continue iterating after a break of a _FOREACH +#define VFLA_FOREACH_RESUME(var, cptr, fam) \ + for (; var != NULL && var < &(cptr)->fam[(cptr)->used]; var++) +#define VFLA_GET(cptr, fam) ((cptr)->used < (cptr)->capacity ? &(cptr)->fam[(cptr)->used++] : NULL) +// asserts sufficient capacity +#define VFLA_ADD(cptr, fam, val) do { \ + assert((cptr)->used < (cptr)->capacity); \ + (cptr)->fam[(cptr)->used++] = (val); \ +} while(0) + +#define VSCARAB_SIZE(cap) VFLA_SIZE(vscarab, s, cap) +#define VSCARAB_INIT(scarab, cap) VFLA_INIT(vscarab, scarab, VSCARAB_MAGIC, s, cap) +#define VSCARAB_LOCAL(scarab, cap) VFLA_LOCAL(vscarab, scarab, VSCARAB_MAGIC, s, cap) +#define VSCARAB_ALLOC(scarab, cap) VFLA_ALLOC(vscarab, scarab, VSCARAB_MAGIC, s, cap) +#define VSCARAB_FOREACH(var, scarab) VFLA_FOREACH(var, scarab, s) +#define VSCARAB_FOREACH_RESUME(var, scarab) VFLA_FOREACH_RESUME(var, scarab, s) +#define VSCARAB_GET(scarab) VFLA_GET(scarab, s) +#define VSCARAB_ADD(scarab, val) VFLA_ADD(scarab, s, val) +#define VSCARAB_LAST(scarab) (&(scarab)->s[(scarab)->used - 1]) + +#define VSCARAB_CHECK(scarab) do { \ + CHECK_OBJ(scarab, VSCARAB_MAGIC); \ + assert(scarab->used <= scarab->capacity); \ +} while(0) + +#define VSCARAB_CHECK_NOTNULL(scarab) do { \ + AN(scarab); \ + VSCARAB_CHECK(scarab); \ +} while(0) + +/* + * VSCARET: Varnish SCatter Array Return + * + * an array of leases obtained from a vscarab + */ + +struct vscaret { + unsigned magic; +#define VSCARET_MAGIC 0x9c1f3d7b + unsigned capacity; + unsigned used; + uint64_t lease[] v_counted_by_(capacity); +}; + +#define VSCARET_SIZE(cap) VFLA_SIZE(vscaret, lease, cap) +#define VSCARET_INIT(scaret, cap) VFLA_INIT(vscaret, scaret, VSCARET_MAGIC, lease, cap) +#define VSCARET_LOCAL(scaret, cap) VFLA_LOCAL(vscaret, scaret, VSCARET_MAGIC, lease, cap) +#define VSCARET_ALLOC(scaret, cap) VFLA_ALLOC(vscaret, scaret, VSCARET_MAGIC, lease, cap) +#define VSCARET_FOREACH(var, scaret) VFLA_FOREACH(var, scaret, lease) +#define VSCARET_GET(scaret) VFLA_GET(scaret, lease) +#define VSCARET_ADD(scaret, val) VFLA_ADD(scaret, lease, val) + +#define VSCARET_CHECK(scaret) do { \ + CHECK_OBJ(scaret, VSCARET_MAGIC); \ + assert(scaret->used <= scaret->capacity); \ +} while(0) + +#define VSCARET_CHECK_NOTNULL(scaret) do { \ + AN(scaret); \ + VSCARET_CHECK(scaret); \ +} while(0) + +vai_hdl ObjVAIinit(struct worker *, struct objcore *, struct ws *, + vai_notify_cb *, void *); +int ObjVAIlease(struct worker *, vai_hdl, struct vscarab *); +int ObjVAIbuffer(struct worker *, vai_hdl, struct vscarab *); +void ObjVAIreturn(struct worker *, vai_hdl, struct vscaret *); +void ObjVAIfini(struct worker *, vai_hdl *); + /* cache_req_body.c */ ssize_t VRB_Iterate(struct worker *, struct vsl_log *, struct req *, objiterate_f *func, void *priv); diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c index c71a6a108c..cf8c1d0fd5 100644 --- a/bin/varnishd/cache/cache_main.c +++ b/bin/varnishd/cache/cache_main.c @@ -403,9 +403,55 @@ static struct cli_proto child_cmds[] = { { NULL } }; +#define CAP 17U + +static void +t_vscarab1(struct vscarab *scarab) +{ + struct viov *v; + uint64_t sum; + + VSCARAB_CHECK_NOTNULL(scarab); + AZ(scarab->used); + + v = VSCARAB_GET(scarab); + AN(v); + v->lease = 12; + + VSCARAB_ADD(scarab, (struct viov){.lease = 30}); + + sum = 0; + VSCARAB_FOREACH(v, scarab) + sum += v->lease; + + assert(sum == 42); +} + +static void +t_vscarab(void) +{ + char testbuf[VSCARAB_SIZE(CAP)]; + struct vscarab *frombuf = (void *)testbuf; + VSCARAB_INIT(frombuf, CAP); + t_vscarab1(frombuf); + + // --- + + VSCARAB_LOCAL(scarab, CAP); + t_vscarab1(scarab); + + // --- + + struct vscarab *heap; + VSCARAB_ALLOC(heap, CAP); + t_vscarab1(heap); + free(heap); +} + void child_main(int sigmagic, size_t altstksz) { + t_vscarab(); if (sigmagic) child_sigmagic(altstksz); diff --git a/bin/varnishd/cache/cache_obj.c b/bin/varnishd/cache/cache_obj.c index 9ffa373f3b..6e4b82bd95 100644 --- a/bin/varnishd/cache/cache_obj.c +++ b/bin/varnishd/cache/cache_obj.c @@ -183,6 +183,130 @@ ObjIterate(struct worker *wrk, struct objcore *oc, return (om->objiterator(wrk, oc, priv, func, final)); } +/*==================================================================== + * ObjVAI...(): Asynchronous Iteration + * + * + * ObjVAIinit() returns an opaque handle, or NULL if not supported + * + * A VAI handle must not be used concurrently + * + * the vai_notify_cb(priv) will be called asynchronously by the storage + * engine when a -EAGAIN / -ENOBUFS condition is over and ObjVAIlease() + * can be called again. + * + * Note: + * - the callback gets executed by an arbitrary thread + * - WITH the boc mtx held + * so it should never block and only do minimal work + * + * ObjVAIlease() fills the vscarab with leases. returns: + * + * -EAGAIN: nothing available at the moment, storage will notify, no use to + * call again until notification + * -ENOBUFS: caller needs to return leases, storage will notify + * -EPIPE: BOS_FAILED for busy object + * -(errno): other problem, fatal + * 0: EOF + * n: number of viovs filled (== scarab->used) + * + * struct vscarab: + * + * the leases can be used by the caller until returned with + * ObjVAIreturn(). The storage guarantees that the lease member is a + * multiple of 8 (that is, the lower three bits are zero). These can be + * used by the caller between lease and return, but must be cleared to + * zero before returning. + * + * ObjVAIbuffer() allocates temporary buffers, returns: + * + * -EAGAIN: allocation can not be fulfilled immediately, storage will notify, + * no use to call again until notification + * -EINVAL: size larger than UINT_MAX requested + * -(errno): other problem, fatal + * n: n > 0, number of viovs filled + * + * The struct vscarab is used on the way in and out: On the way in, the + * iov.iov_len members contain the sizes the caller requests, all other + * members of the struct viovs are expected to be zero initialized. + * + * The maximum size to be requested is UINT_MAX. + * + * ObjVAIbuffer() may return sizes larger than requested. The returned n + * might be smaller than requested. + * + * ObjVAIreturn() returns leases collected in a struct vscaret + * + * it must be called with a vscaret, which holds an array of lease values + * received via ObjVAIlease() or ObjVAIbuffer() when the caller can + * guarantee that they are no longer accessed. + * + * ObjVAIreturn() may retain leases in the vscaret if the implementation + * still requires them, iow, the vscaret might not be empty upon return. + * + * ObjVAIfini() finalized iteration + * + * it must be called when iteration is done, irrespective of error status + */ + +vai_hdl +ObjVAIinit(struct worker *wrk, struct objcore *oc, struct ws *ws, + vai_notify_cb *cb, void *cb_priv) +{ + const struct obj_methods *om = obj_getmethods(oc); + + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + + if (om->vai_init == NULL) + return (NULL); + return (om->vai_init(wrk, oc, ws, cb, cb_priv)); +} + +int +ObjVAIlease(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab) +{ + struct vai_hdl_preamble *vaip = vhdl; + + AN(vaip); + assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2); + AN(vaip->vai_lease); + return (vaip->vai_lease(wrk, vhdl, scarab)); +} + +int +ObjVAIbuffer(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab) +{ + struct vai_hdl_preamble *vaip = vhdl; + + AN(vaip); + assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2); + AN(vaip->vai_buffer); + return (vaip->vai_buffer(wrk, vhdl, scarab)); +} + +void +ObjVAIreturn(struct worker *wrk, vai_hdl vhdl, struct vscaret *scaret) +{ + struct vai_hdl_preamble *vaip = vhdl; + + AN(vaip); + assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2); + AN(vaip->vai_return); + vaip->vai_return(wrk, vhdl, scaret); +} + +void +ObjVAIfini(struct worker *wrk, vai_hdl *vhdlp) +{ + AN(vhdlp); + struct vai_hdl_preamble *vaip = *vhdlp; + + AN(vaip); + assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2); + AN(vaip->vai_lease); + return (vaip->vai_fini(wrk, vhdlp)); +} + /*==================================================================== * ObjGetSpace() * @@ -231,6 +355,29 @@ obj_extend_condwait(const struct objcore *oc) (void)Lck_CondWait(&oc->boc->cond, &oc->boc->mtx); } +// notify of an extension of the boc or state change +//lint -sem(obj_boc_notify_Unlock, thread_unlock) + +static void +obj_boc_notify_Unlock(struct boc *boc) +{ + struct vai_qe *qe, *next; + + PTOK(pthread_cond_broadcast(&boc->cond)); + qe = VSLIST_FIRST(&boc->vai_q_head); + VSLIST_FIRST(&boc->vai_q_head) = NULL; + while (qe != NULL) { + CHECK_OBJ(qe, VAI_Q_MAGIC); + AN(qe->flags & VAI_QF_INQUEUE); + qe->flags &= ~VAI_QF_INQUEUE; + next = VSLIST_NEXT(qe, list); + VSLIST_NEXT(qe, list) = NULL; + qe->cb(qe->hdl, qe->priv); + qe = next; + } + Lck_Unlock(&boc->mtx); +} + void ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final) { @@ -241,14 +388,13 @@ ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final) AN(om->objextend); assert(l >= 0); - Lck_Lock(&oc->boc->mtx); if (l > 0) { + Lck_Lock(&oc->boc->mtx); obj_extend_condwait(oc); om->objextend(wrk, oc, l); oc->boc->fetched_so_far += l; - PTOK(pthread_cond_broadcast(&oc->boc->cond)); + obj_boc_notify_Unlock(oc->boc); } - Lck_Unlock(&oc->boc->mtx); assert(oc->boc->state < BOS_FINISHED); if (final && om->objtrimstore != NULL) @@ -258,6 +404,17 @@ ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final) /*==================================================================== */ +static inline void +objSignalFetchLocked(const struct objcore *oc, uint64_t l) +{ + if (oc->boc->transit_buffer > 0) { + assert(oc->flags & OC_F_TRANSIENT); + /* Signal the new client position */ + oc->boc->delivered_so_far = l; + PTOK(pthread_cond_signal(&oc->boc->cond)); + } +} + uint64_t ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l, enum boc_state_e *statep) @@ -272,13 +429,8 @@ ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l, while (1) { rv = oc->boc->fetched_so_far; assert(l <= rv || oc->boc->state == BOS_FAILED); - if (oc->boc->transit_buffer > 0) { - assert(oc->flags & OC_F_TRANSIENT); - /* Signal the new client position */ - oc->boc->delivered_so_far = l; - PTOK(pthread_cond_signal(&oc->boc->cond)); - } state = oc->boc->state; + objSignalFetchLocked(oc, l); if (rv > l || state >= BOS_FINISHED) break; (void)Lck_CondWait(&oc->boc->cond, &oc->boc->mtx); @@ -288,6 +440,51 @@ ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l, *statep = state; return (rv); } + +// get a new extension _or_ register a notification +uint64_t +ObjVAIGetExtend(struct worker *wrk, const struct objcore *oc, uint64_t l, + enum boc_state_e *statep, struct vai_qe *qe) +{ + enum boc_state_e state; + uint64_t rv; + + (void) wrk; + CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); + CHECK_OBJ_NOTNULL(oc->boc, BOC_MAGIC); + CHECK_OBJ_NOTNULL(qe, VAI_Q_MAGIC); + Lck_Lock(&oc->boc->mtx); + rv = oc->boc->fetched_so_far; + assert(l <= rv || oc->boc->state == BOS_FAILED); + state = oc->boc->state; + objSignalFetchLocked(oc, l); + if (l == rv && state < BOS_FINISHED && + (qe->flags & VAI_QF_INQUEUE) == 0) { + qe->flags |= VAI_QF_INQUEUE; + VSLIST_INSERT_HEAD(&oc->boc->vai_q_head, qe, list); + } + Lck_Unlock(&oc->boc->mtx); + if (statep != NULL) + *statep = state; + return (rv); +} + +void +ObjVAICancel(struct worker *wrk, struct boc *boc, struct vai_qe *qe) +{ + + (void) wrk; + CHECK_OBJ_NOTNULL(boc, BOC_MAGIC); + CHECK_OBJ_NOTNULL(qe, VAI_Q_MAGIC); + + Lck_Lock(&boc->mtx); + // inefficient, but should be rare + if ((qe->flags & VAI_QF_INQUEUE) != 0) + VSLIST_REMOVE(&boc->vai_q_head, qe, vai_qe, list); + qe->flags = 0; + Lck_Unlock(&boc->mtx); +} + /*==================================================================== */ @@ -313,8 +510,7 @@ ObjSetState(struct worker *wrk, const struct objcore *oc, Lck_Lock(&oc->boc->mtx); oc->boc->state = next; - PTOK(pthread_cond_broadcast(&oc->boc->cond)); - Lck_Unlock(&oc->boc->mtx); + obj_boc_notify_Unlock(oc->boc); } /*==================================================================== diff --git a/bin/varnishd/cache/cache_obj.h b/bin/varnishd/cache/cache_obj.h index 1f936a5348..da968d05a6 100644 --- a/bin/varnishd/cache/cache_obj.h +++ b/bin/varnishd/cache/cache_obj.h @@ -50,6 +50,124 @@ typedef void *objsetattr_f(struct worker *, struct objcore *, enum obj_attr attr, ssize_t len, const void *ptr); typedef void objtouch_f(struct worker *, struct objcore *, vtim_real now); +/* called by Obj/storage to notify that the lease function (vai_lease_f) or + * buffer function (vai_buffer_f) can be called again after return of + * -EAGAIN or -ENOBUFS + * NOTE: + * - the callback gets executed by an arbitrary thread + * - WITH the boc mtx held + * so it should never block and be efficient + */ + +/* notify entry added to struct boc::vai_q_head */ +struct vai_qe { + unsigned magic; +#define VAI_Q_MAGIC 0x573e27eb + unsigned flags; +#define VAI_QF_INQUEUE (1U<<0) + VSLIST_ENTRY(vai_qe) list; + vai_notify_cb *cb; + vai_hdl hdl; + void *priv; +}; + +#define VAI_ASSERT_LEASE(x) AZ((x) & 0x7) + +/* + * start an iteration. the ws can we used (reserved) by storage + * the void * will be passed as the second argument to vai_notify_cb + */ +typedef vai_hdl vai_init_f(struct worker *, struct objcore *, struct ws *, + vai_notify_cb *, void *); + +/* + * lease io vectors from storage + * + * vai_hdl is from vai_init_f + * the vscarab is provided by the caller to return leases + * + * return: + * -EAGAIN: nothing available at the moment, storage will notify, no use to + * call again until notification + * -ENOBUFS: caller needs to return leases, storage will notify + * -EPIPE: BOS_FAILED for busy object + * -(errno): other problem, fatal + * 0: EOF + * n: number of viovs filled + */ +typedef int vai_lease_f(struct worker *, vai_hdl, struct vscarab *); + +/* + * get io vectors with temporary buffers from storage + * + * vai_hdl is from vai_init_f + * the vscarab needs to be initialized with the number of requested elements + * and each iov.iov_len contings the requested sizes. all iov_base need to be + * zero. + * + * after return, the vscarab can be smaller than requested if only some + * allocation requests could be fulfilled + * + * return: + * -EAGAIN: allocation can not be fulfilled immediately, storage will notify, + * no use to call again until notification + * -(errno): other problem, fatal + * n: n > 0, number of viovs filled + */ +typedef int vai_buffer_f(struct worker *, vai_hdl, struct vscarab *); + +/* + * return leases from vai_lease_f or vai_buffer_f + */ +typedef void vai_return_f(struct worker *, vai_hdl, struct vscaret *); + +/* + * finish iteration, vai_return_f must have been called on all leases + */ +typedef void vai_fini_f(struct worker *, vai_hdl *); + +/* + * vai_hdl must start with this preamble such that when cast to it, cache_obj.c + * has access to the methods. + * + * The first magic is owned by storage, the second magic is owned by cache_obj.c + * and must be initialized to VAI_HDL_PREAMBLE_MAGIC2 + * + */ + +struct vai_hdl_preamble { + unsigned magic; // owned by storage + unsigned magic2; +#define VAI_HDL_PREAMBLE_MAGIC2 0x7a15d162 + vai_lease_f *vai_lease; + vai_buffer_f *vai_buffer; + vai_return_f *vai_return; + uintptr_t reserve[4]; // abi fwd compat + vai_fini_f *vai_fini; +}; + +#define INIT_VAI_HDL(to, x) do { \ + (void)memset(to, 0, sizeof *(to)); \ + (to)->preamble.magic = (x); \ + (to)->preamble.magic2 = VAI_HDL_PREAMBLE_MAGIC2; \ +} while (0) + +#define CHECK_VAI_HDL(obj, x) do { \ + assert(obj->preamble.magic == (x)); \ + assert(obj->preamble.magic2 == VAI_HDL_PREAMBLE_MAGIC2);\ +} while (0) + +#define CHECK_VAI_HDL_NOTNULL(obj, x) do { \ + AN(obj); \ + CHECK_VAI_HDL(obj, x); \ +} while (0) + +#define CAST_VAI_HDL_NOTNULL(obj, ptr, x) do { \ + AN(ptr); \ + (obj) = (ptr); \ + CHECK_VAI_HDL(obj, x); \ +} while (0) + struct obj_methods { /* required */ objfree_f *objfree; @@ -64,5 +182,6 @@ struct obj_methods { objslim_f *objslim; objtouch_f *objtouch; objsetstate_f *objsetstate; + /* async iteration (VAI) */ + vai_init_f *vai_init; }; - diff --git a/bin/varnishd/cache/cache_varnishd.h b/bin/varnishd/cache/cache_varnishd.h index a0e4987fab..414aec28a1 100644 --- a/bin/varnishd/cache/cache_varnishd.h +++ b/bin/varnishd/cache/cache_varnishd.h @@ -348,6 +348,10 @@ void *ObjSetAttr(struct worker *, struct objcore *, enum obj_attr, int ObjCopyAttr(struct worker *, struct objcore *, struct objcore *, enum obj_attr attr); void ObjBocDone(struct worker *, struct objcore *, struct boc **); +// VAI +uint64_t ObjVAIGetExtend(struct worker *, const struct objcore *, uint64_t, + enum boc_state_e *, struct vai_qe *); +void ObjVAICancel(struct worker *, struct boc *, struct vai_qe *); int ObjSetDouble(struct worker *, struct objcore *, enum obj_attr, double); int ObjSetU64(struct worker *, struct objcore *, enum obj_attr, uint64_t); diff --git a/bin/varnishd/storage/storage_persistent.c b/bin/varnishd/storage/storage_persistent.c index df2fc9128e..9b37de2759 100644 --- a/bin/varnishd/storage/storage_persistent.c +++ b/bin/varnishd/storage/storage_persistent.c @@ -692,6 +692,7 @@ smp_init(void) smp_oc_realmethods.objsetattr = SML_methods.objsetattr; smp_oc_realmethods.objtouch = LRU_Touch; smp_oc_realmethods.objfree = smp_oc_objfree; + smp_oc_realmethods.vai_init = SML_methods.vai_init; } /*-------------------------------------------------------------------- diff --git a/bin/varnishd/storage/storage_simple.c b/bin/varnishd/storage/storage_simple.c index 1dea4dc394..d6b31215e5 100644 --- a/bin/varnishd/storage/storage_simple.c +++ b/bin/varnishd/storage/storage_simple.c @@ -31,6 +31,8 @@ #include "config.h" +#include + #include "cache/cache_varnishd.h" #include "cache/cache_obj.h" @@ -184,6 +186,7 @@ SML_AllocBuf(struct worker *wrk, const struct stevedore *stv, size_t size, if (st == NULL) return (NULL); assert(st->space >= size); + st->flags = STORAGE_F_BUFFER; st->len = size; *ppriv = (uintptr_t)st; return (st->ptr); @@ -198,6 +201,7 @@ SML_FreeBuf(struct worker *wrk, const struct stevedore *stv, uintptr_t priv) CHECK_OBJ_NOTNULL(stv, STEVEDORE_MAGIC); CAST_OBJ_NOTNULL(st, (void *)priv, STORAGE_MAGIC); + assert(st->flags == STORAGE_F_BUFFER); sml_stv_free(stv, st); } @@ -304,130 +308,554 @@ sml_objfree(struct worker *wrk, struct objcore *oc) wrk->stats->n_object--; } -static int v_matchproto_(objiterator_f) -sml_iterator(struct worker *wrk, struct objcore *oc, - void *priv, objiterate_f *func, int final) +// kept for reviewers - XXX remove later +#undef VAI_DBG + +struct sml_hdl { + struct vai_hdl_preamble preamble; +#define SML_HDL_MAGIC 0x37dfd996 + struct vai_qe qe; + struct pool_task task; // unfortunate + struct ws *ws; // NULL is malloc() + struct objcore *oc; + struct object *obj; + const struct stevedore *stv; + struct boc *boc; + + struct storage *st; // updated by _lease() + + // only for _lease_boc() + uint64_t st_off; // already returned fragment of current st + uint64_t avail, returned; + struct storage *last; // to resume, held back by _return() +}; + +static inline uint64_t +st2lease(const struct storage *st) +{ + uint64_t r = (uintptr_t)st; + + if (sizeof(void *) < 8) + r <<= 1; + + return (r); +} + +static inline struct storage * +lease2st(uint64_t l) +{ + + if (sizeof(void *) < 8) + l >>= 1; + + return ((void *)l); +} + +static inline void +sml_ai_viov_fill(struct viov *viov, struct storage *st) +{ + viov->iov.iov_base = st->ptr; + viov->iov.iov_len = st->len; + viov->lease = st2lease(st); + VAI_ASSERT_LEASE(viov->lease); +} + +// sml has no mechanism to notify "I got free space again now" +// (we could add that, but because storage.h is used in mgt, a first attempt +// looks at least like this would cause some include spill for vai_q_head or +// something similar) +// +// So anyway, to get ahead we just implement a pretty stupid "call the notify +// some time later" on a thread +static void +sml_ai_later_task(struct worker *wrk, void *priv) +{ + struct sml_hdl *hdl; + const vtim_dur dur = 0.0042; + + (void)wrk; + VTIM_sleep(dur); + CAST_VAI_HDL_NOTNULL(hdl, priv, SML_HDL_MAGIC); + memset(&hdl->task, 0, sizeof hdl->task); + hdl->qe.cb(hdl, hdl->qe.priv); +} +static void +sml_ai_later(struct worker *wrk, struct sml_hdl *hdl) +{ + AZ(hdl->task.func); + AZ(hdl->task.priv); + hdl->task.func = sml_ai_later_task; + hdl->task.priv = hdl; + AZ(Pool_Task(wrk->pool, &hdl->task, TASK_QUEUE_BG)); +} + + +static int +sml_ai_buffer(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab) { - struct boc *boc; - enum boc_state_e state; - struct object *obj; - struct storage *st; - struct storage *checkpoint = NULL; const struct stevedore *stv; - ssize_t checkpoint_len = 0; - ssize_t len = 0; - int ret = 0, ret2; - ssize_t ol; - ssize_t nl; - ssize_t sl; - void *p; - ssize_t l; - unsigned u; - - obj = sml_getobj(wrk, oc); - CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC); - stv = oc->stobj->stevedore; + struct sml_hdl *hdl; + struct storage *st; + struct viov *vio; + int r = 0; + + (void) wrk; + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + stv = hdl->stv; CHECK_OBJ_NOTNULL(stv, STEVEDORE_MAGIC); - boc = HSH_RefBoc(oc); + VSCARAB_FOREACH(vio, scarab) + if (vio->iov.iov_len > UINT_MAX) + return (-EINVAL); - if (boc == NULL) { - VTAILQ_FOREACH_REVERSE_SAFE( - st, &obj->list, storagehead, list, checkpoint) { + VSCARAB_FOREACH(vio, scarab) { + st = objallocwithnuke(wrk, stv, vio->iov.iov_len, 0); + if (st == NULL) + break; + assert(st->space >= vio->iov.iov_len); + st->flags = STORAGE_F_BUFFER; + st->len = st->space; - u = 0; - if (VTAILQ_PREV(st, storagehead, list) == NULL) - u |= OBJ_ITER_END; - if (final) - u |= OBJ_ITER_FLUSH; - if (ret == 0 && st->len > 0) - ret = func(priv, u, st->ptr, st->len); - if (final) { - VTAILQ_REMOVE(&obj->list, st, list); - sml_stv_free(stv, st); - } else if (ret) - break; - } - return (ret); + sml_ai_viov_fill(vio, st); + r++; } + if (r == 0) { + sml_ai_later(wrk, hdl); + r = -EAGAIN; + } + return (r); +} - p = NULL; - l = 0; - - u = 0; - if (boc->fetched_so_far == 0) { - ret = func(priv, OBJ_ITER_FLUSH, NULL, 0); - if (ret) - return (ret); +static int +sml_ai_lease_simple(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab) +{ + struct storage *st, *next; + struct sml_hdl *hdl; + struct viov *viov; + int r = 0; + + (void) wrk; + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + VSCARAB_CHECK_NOTNULL(scarab); + + AZ(hdl->st_off); + st = hdl->st; + while (st != NULL && (viov = VSCARAB_GET(scarab)) != NULL) { + next = VTAILQ_PREV(st, storagehead, list); + CHECK_OBJ_ORNULL(next, STORAGE_MAGIC); + sml_ai_viov_fill(viov, st); + scarab->flags = (next == NULL) ? VSCARAB_F_END : 0; + + r++; + st = next; } - while (1) { - ol = len; - nl = ObjWaitExtend(wrk, oc, ol, &state); + hdl->st = st; + return (r); +} + +/* + * on leases while streaming (with a boc): + * + * SML uses the lease return facility to implement the "free behind" for + * OC_F_TRANSIENT objects. When streaming, we also return leases on + * fragments of sts, but we must only "free behind" when we are done with the + * last fragment. + * + * So we use a magic lease to signal "this is only a fragment", which we ignore + * on returns + */ + +// not using 0 to be able to catch accidental null pointers +static const uint64_t sml_ai_lease_frag = 0x8; + +static int +sml_ai_lease_boc(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab) +{ + enum boc_state_e state; + struct storage *next; + struct sml_hdl *hdl; + struct viov *viov; + int r = 0; + + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + VSCARAB_CHECK_NOTNULL(scarab); + + if (hdl->avail == hdl->returned) { + hdl->avail = ObjVAIGetExtend(wrk, hdl->oc, hdl->returned, + &state, &hdl->qe); if (state == BOS_FAILED) { - ret = -1; - break; - } - if (nl == ol) { - assert(state == BOS_FINISHED); - break; + hdl->last = NULL; + return (-EPIPE); } - assert(nl > ol); - Lck_Lock(&boc->mtx); - AZ(VTAILQ_EMPTY(&obj->list)); - if (checkpoint == NULL) { - st = VTAILQ_LAST(&obj->list, storagehead); - sl = 0; - } else { - st = checkpoint; - sl = checkpoint_len; - ol -= checkpoint_len; + else if (state == BOS_FINISHED) + (void)0; + else if (hdl->avail == hdl->returned) { + // ObjVAIGetExtend() has scheduled a notification + if (hdl->boc->transit_buffer > 0) + return (-ENOBUFS); + else + return (-EAGAIN); } - while (st != NULL) { - if (st->len > ol) { - p = st->ptr + ol; - l = st->len - ol; - len += l; + else + assert(state < BOS_FINISHED); + } + Lck_Lock(&hdl->boc->mtx); + if (hdl->st == NULL && hdl->last != NULL) { + /* when the "last" st completed, we did not yet have a next, so + * resume from there. Because "last" might have been returned and + * deleted, we can not just use the pointer, but rather need to + * iterate the st list. + * if we can not find "last", it also has been returned and + * deleted, and the current write head (VTAILQ_LAST) is our next + * st, which can also be null if we are done. + */ + VTAILQ_FOREACH_REVERSE(next, &hdl->obj->list, storagehead, list) { + if (next == hdl->last) { + hdl->st = VTAILQ_PREV(next, storagehead, list); break; } - ol -= st->len; - assert(ol >= 0); - nl -= st->len; - assert(nl > 0); - sl += st->len; - st = VTAILQ_PREV(st, storagehead, list); - if (final && checkpoint != NULL) { - if (checkpoint == boc->stevedore_priv) - boc->stevedore_priv = trim_once; - else - VTAILQ_REMOVE(&obj->list, checkpoint, list); - sml_stv_free(stv, checkpoint); - } - checkpoint = st; - checkpoint_len = sl; } - CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC); - CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); - st = VTAILQ_PREV(st, storagehead, list); - if (st != NULL && st->len == 0) - st = NULL; - Lck_Unlock(&boc->mtx); - assert(l > 0 || state == BOS_FINISHED); - u = 0; - if (st == NULL || final) - u |= OBJ_ITER_FLUSH; - if (st == NULL && state == BOS_FINISHED) - u |= OBJ_ITER_END; - ret = func(priv, u, p, l); - if (ret) - break; } - HSH_DerefBoc(wrk, oc); + hdl->last = NULL; + if (hdl->st == NULL) { + assert(hdl->returned == 0 || hdl->avail == hdl->returned); + hdl->st = VTAILQ_LAST(&hdl->obj->list, storagehead); + } + if (hdl->st == NULL) + assert(hdl->avail == hdl->returned); + + while (hdl->avail > hdl->returned && (viov = VSCARAB_GET(scarab)) != NULL) { + CHECK_OBJ_NOTNULL(hdl->st, STORAGE_MAGIC); // ObjVAIGetExtend ensures + assert(hdl->st_off <= hdl->st->space); + size_t av = hdl->avail - hdl->returned; + size_t l = hdl->st->space - hdl->st_off; + AN(l); + if (l > av) + l = av; + viov->iov.iov_base = hdl->st->ptr + hdl->st_off; + viov->iov.iov_len = l; + if (hdl->st_off + l == hdl->st->space) { + next = VTAILQ_PREV(hdl->st, storagehead, list); + AZ(hdl->last); + if (next == NULL) + hdl->last = hdl->st; + else + CHECK_OBJ(next, STORAGE_MAGIC); +#ifdef VAI_DBG + VSLb(wrk->vsl, SLT_Debug, "off %zu + l %zu == space st %p next st %p stvprv %p", + hdl->st_off, l, hdl->st, next, hdl->boc->stevedore_priv); +#endif + viov->lease = st2lease(hdl->st); + hdl->st_off = 0; + hdl->st = next; + } + else { + viov->lease = sml_ai_lease_frag; + hdl->st_off += l; + } + hdl->returned += l; + VAI_ASSERT_LEASE(viov->lease); + r++; + } + + Lck_Unlock(&hdl->boc->mtx); + return (r); +} + +// return only buffers, used if object is not streaming +static void v_matchproto_(vai_return_f) +sml_ai_return_buffers(struct worker *wrk, vai_hdl vhdl, struct vscaret *scaret) +{ + struct storage *st; + struct sml_hdl *hdl; + uint64_t *p; + + (void) wrk; + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + + VSCARET_FOREACH(p, scaret) { + if (*p == sml_ai_lease_frag) + continue; + CAST_OBJ_NOTNULL(st, lease2st(*p), STORAGE_MAGIC); + if ((st->flags & STORAGE_F_BUFFER) == 0) + continue; + sml_stv_free(hdl->stv, st); + } + VSCARET_INIT(scaret, scaret->capacity); +} + +// generic return for buffers and object leases, used when streaming +static void v_matchproto_(vai_return_f) +sml_ai_return(struct worker *wrk, vai_hdl vhdl, struct vscaret *scaret) +{ + struct storage *st; + struct sml_hdl *hdl; + uint64_t *p, last = 0; + + (void) wrk; + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + VSCARET_CHECK_NOTNULL(scaret); + + // callback is only registered if needed + assert(hdl->boc != NULL && (hdl->oc->flags & OC_F_TRANSIENT) != 0); + + // filter and delay hdl->last + VSCARET_LOCAL(todo, scaret->used); + VSCARET_FOREACH(p, scaret) { + if (*p == sml_ai_lease_frag) + continue; + CAST_OBJ_NOTNULL(st, lease2st(*p), STORAGE_MAGIC); + if (st == hdl->last) { + AZ(last); + last = *p; + continue; + } + VSCARET_ADD(todo, *p); + } + VSCARET_INIT(scaret, scaret->capacity); + if (last != 0) + VSCARET_ADD(scaret, last); + + Lck_Lock(&hdl->boc->mtx); + VSCARET_FOREACH(p, todo) { + CAST_OBJ_NOTNULL(st, lease2st(*p), STORAGE_MAGIC); + if ((st->flags & STORAGE_F_BUFFER) != 0) + continue; + VTAILQ_REMOVE(&hdl->obj->list, st, list); + if (st == hdl->boc->stevedore_priv) + hdl->boc->stevedore_priv = trim_once; + } + Lck_Unlock(&hdl->boc->mtx); + + VSCARET_FOREACH(p, todo) { + CAST_OBJ_NOTNULL(st, lease2st(*p), STORAGE_MAGIC); + sml_stv_free(hdl->stv, st); + } +} + +static void v_matchproto_(vai_fini_f) +sml_ai_fini(struct worker *wrk, vai_hdl *vai_hdlp) +{ + struct sml_hdl *hdl; + + AN(vai_hdlp); + CAST_VAI_HDL_NOTNULL(hdl, *vai_hdlp, SML_HDL_MAGIC); + *vai_hdlp = NULL; + + if (hdl->boc != NULL) { + ObjVAICancel(wrk, hdl->boc, &hdl->qe); + HSH_DerefBoc(wrk, hdl->oc); + hdl->boc = NULL; + } + + if (hdl->ws != NULL) + WS_Release(hdl->ws, 0); + else + free(hdl); +} + +static vai_hdl v_matchproto_(vai_init_f) +sml_ai_init(struct worker *wrk, struct objcore *oc, struct ws *ws, + vai_notify_cb *notify, void *notify_priv) +{ + struct sml_hdl *hdl; + const size_t sz = sizeof *hdl; + + if (ws != NULL && WS_ReserveSize(ws, (unsigned)sz)) + hdl = WS_Reservation(ws); + else { + hdl = malloc(sz); + ws = NULL; + } + + AN(hdl); + INIT_VAI_HDL(hdl, SML_HDL_MAGIC); + hdl->preamble.vai_lease = sml_ai_lease_simple; + hdl->preamble.vai_buffer = sml_ai_buffer; + hdl->preamble.vai_return = sml_ai_return_buffers; + hdl->preamble.vai_fini = sml_ai_fini; + hdl->ws = ws; + + hdl->oc = oc; + hdl->obj = sml_getobj(wrk, oc); + CHECK_OBJ_NOTNULL(hdl->obj, OBJECT_MAGIC); + hdl->stv = oc->stobj->stevedore; + CHECK_OBJ_NOTNULL(hdl->stv, STEVEDORE_MAGIC); + + hdl->st = VTAILQ_LAST(&hdl->obj->list, storagehead); + CHECK_OBJ_ORNULL(hdl->st, STORAGE_MAGIC); + + hdl->qe.magic = VAI_Q_MAGIC; + hdl->qe.cb = notify; + hdl->qe.hdl = hdl; + hdl->qe.priv = notify_priv; + + hdl->boc = HSH_RefBoc(oc); + if (hdl->boc == NULL) + return (hdl); + /* we only initialize notifications if we have a boc, so + * any wrong attempt triggers magic checks. + */ + hdl->preamble.vai_lease = sml_ai_lease_boc; + if ((hdl->oc->flags & OC_F_TRANSIENT) != 0) + hdl->preamble.vai_return = sml_ai_return; + return (hdl); +} + +/* + * trivial notification to allow the iterator to simply block + */ +struct sml_notify { + unsigned magic; +#define SML_NOTIFY_MAGIC 0x4589af31 + unsigned hasmore; + pthread_mutex_t mtx; + pthread_cond_t cond; +}; + +static void +sml_notify_init(struct sml_notify *sn) +{ + + INIT_OBJ(sn, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_init(&sn->mtx, NULL)); + AZ(pthread_cond_init(&sn->cond, NULL)); +} + +static void +sml_notify_fini(struct sml_notify *sn) +{ + + CHECK_OBJ_NOTNULL(sn, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_destroy(&sn->mtx)); + AZ(pthread_cond_destroy(&sn->cond)); +} + +static void v_matchproto_(vai_notify_cb) +sml_notify(vai_hdl hdl, void *priv) +{ + struct sml_notify *sn; + + (void) hdl; + CAST_OBJ_NOTNULL(sn, priv, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_lock(&sn->mtx)); + sn->hasmore = 1; + AZ(pthread_cond_signal(&sn->cond)); + AZ(pthread_mutex_unlock(&sn->mtx)); + +} + +static void +sml_notify_wait(struct sml_notify *sn) +{ + + CHECK_OBJ_NOTNULL(sn, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_lock(&sn->mtx)); + while (sn->hasmore == 0) + AZ(pthread_cond_wait(&sn->cond, &sn->mtx)); + AN(sn->hasmore); + sn->hasmore = 0; + AZ(pthread_mutex_unlock(&sn->mtx)); +} + +static int v_matchproto_(objiterator_f) +sml_iterator(struct worker *wrk, struct objcore *oc, + void *priv, objiterate_f *func, int final) +{ + struct sml_notify sn; + struct viov *vio, *last; + unsigned u, uu; + vai_hdl hdl; + int nn, r, r2, islast; + + VSCARAB_LOCAL(scarab, 16); + VSCARET_LOCAL(scaret, 16); + + (void) final; // phase out? + sml_notify_init(&sn); + hdl = ObjVAIinit(wrk, oc, NULL, sml_notify, &sn); + AN(hdl); + + r = u = 0; + + do { + do { + nn = ObjVAIlease(wrk, hdl, scarab); + if (nn <= 0) + break; + } while (scarab->used < scarab->capacity); + + /* + * nn is the wait/return action or 0 + * nn tells us if to flush + */ + vio = NULL; + uu = u; + last = VSCARAB_LAST(scarab); + VSCARAB_FOREACH(vio, scarab) { + islast = vio == last; + AZ(u & OBJ_ITER_END); + if (islast && scarab->flags & VSCARAB_F_END) + u |= OBJ_ITER_END; + + // flush if it is the scarab's last IOV and we will block next + // or if we need space in the return leases array + uu = u; + if ((islast && nn < 0) || scaret->used == scaret->capacity - 1) + uu |= OBJ_ITER_FLUSH; + r = func(priv, uu, vio->iov.iov_base, vio->iov.iov_len); + if (r != 0) + break; + + // sufficient space ensured by capacity check above + VSCARET_ADD(scaret, vio->lease); + + // whenever we have flushed, return leases + if ((uu & OBJ_ITER_FLUSH) && scaret->used > 0) + ObjVAIreturn(wrk, hdl, scaret); + } + + // return leases which we did not use if error (break) + VSCARAB_FOREACH_RESUME(vio, scarab) { + if (scaret->used == scaret->capacity) + ObjVAIreturn(wrk, hdl, scaret); + VSCARET_ADD(scaret, vio->lease); + } + + // we have now completed the scarab + VSCARAB_INIT(scarab, scarab->capacity); + + // flush before blocking if we did not already + if (r == 0 && (nn == -ENOBUFS || nn == -EAGAIN) && + (uu & OBJ_ITER_FLUSH) == 0) { + r2 = func(priv, OBJ_ITER_FLUSH, NULL, 0); + if (scaret->used > 0) + ObjVAIreturn(wrk, hdl, scaret); + if (r == 0) + r = r2; + } + + if (r == 0 && (nn == -ENOBUFS || nn == -EAGAIN)) { + assert(scaret->used <= 1); + sml_notify_wait(&sn); + } + else if (r == 0 && nn < 0) + r = -1; + } while (nn != 0 && r == 0); + if ((u & OBJ_ITER_END) == 0) { - ret2 = func(priv, OBJ_ITER_END, NULL, 0); - if (ret == 0) - ret = ret2; + r2 = func(priv, OBJ_ITER_END, NULL, 0); + if (r == 0) + r = r2; } - return (ret); + + if (scaret->used > 0) + ObjVAIreturn(wrk, hdl, scaret); + + ObjVAIfini(wrk, &hdl); + sml_notify_fini(&sn); + + return (r); } /*-------------------------------------------------------------------- @@ -734,6 +1162,7 @@ const struct obj_methods SML_methods = { .objgetattr = sml_getattr, .objsetattr = sml_setattr, .objtouch = LRU_Touch, + .vai_init = sml_ai_init }; static void diff --git a/bin/varnishd/storage/storage_simple.h b/bin/varnishd/storage/storage_simple.h index 881e0b8571..207a11505a 100644 --- a/bin/varnishd/storage/storage_simple.h +++ b/bin/varnishd/storage/storage_simple.h @@ -38,7 +38,8 @@ struct storage { unsigned magic; #define STORAGE_MAGIC 0x1a4e51c0 - + unsigned flags; +#define STORAGE_F_BUFFER 1 VTAILQ_ENTRY(storage) list; void *priv; diff --git a/bin/varnishtest/tests/c00111.vtc b/bin/varnishtest/tests/c00111.vtc index 706ee7041a..996d5d258a 100644 --- a/bin/varnishtest/tests/c00111.vtc +++ b/bin/varnishtest/tests/c00111.vtc @@ -15,7 +15,8 @@ client c1 { } -run varnish v1 -vsl_catchup -varnish v1 -expect fetch_failed == 1 +# with vai, this no longer fails systematically (which is good) +varnish v1 -expect fetch_failed <= 1 varnish v1 -cliok "param.set transit_buffer 4k" @@ -26,4 +27,4 @@ client c2 { varnish v1 -vsl_catchup varnish v1 -expect s_fetch == 2 -varnish v1 -expect fetch_failed == 1 +varnish v1 -expect fetch_failed <= 1