Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare Varnish-Cache for the age of AI #4209

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions bin/varnishd/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <pthread.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/uio.h>

#include "vdef.h"
#include "vrt.h"
Expand Down Expand Up @@ -286,6 +287,10 @@ enum boc_state_e {
#include "tbl/boc_state.h"
};

// cache_obj.h vai notify
struct vai_qe;
VSLIST_HEAD(vai_q_head, vai_qe);

struct boc {
unsigned magic;
#define BOC_MAGIC 0x70c98476
Expand All @@ -298,6 +303,7 @@ struct boc {
uint64_t fetched_so_far;
uint64_t delivered_so_far;
uint64_t transit_buffer;
struct vai_q_head vai_q_head;
};

/* Object core structure ---------------------------------------------
Expand Down Expand Up @@ -770,6 +776,30 @@ int ObjGetDouble(struct worker *, struct objcore *, enum obj_attr, double *);
int ObjGetU64(struct worker *, struct objcore *, enum obj_attr, uint64_t *);
int ObjCheckFlag(struct worker *, struct objcore *, enum obj_flags of);

/*====================================================================
* ObjVAI...(): Asynchronous Iteration
*
* see comments in cache_obj.c for usage
*/

typedef void *vai_hdl;
typedef void vai_notify_cb(vai_hdl, void *priv);

struct viov {
unsigned magic;
#define VIOV_MAGIC 0x7a107a10
unsigned flags;
#define VIOV_F_END 1 // last VIOV
uint64_t lease;
struct iovec iov;
};

vai_hdl ObjVAIinit(struct worker *, struct objcore *, struct ws *,
vai_notify_cb *, void *);
int ObjVAIlease(struct worker *, vai_hdl, struct viov *, int);
void ObjVAIreturn(struct worker *, vai_hdl, uint64_t *, int);
void ObjVAIfini(struct worker *, vai_hdl *);

/* cache_req_body.c */
ssize_t VRB_Iterate(struct worker *, struct vsl_log *, struct req *,
objiterate_f *func, void *priv);
Expand Down
188 changes: 177 additions & 11 deletions bin/varnishd/cache/cache_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,100 @@ ObjIterate(struct worker *wrk, struct objcore *oc,
return (om->objiterator(wrk, oc, priv, func, final));
}

/*====================================================================
* ObjVAI...(): Asynchronous Iteration
*
*
* ObjVAIinit() returns an opaque handle, or NULL if not supported
*
* A VAI handle must not be used concurrently
*
* the vai_notify_cb(priv) will be called asynchronously by the storage
* engine when a -EAGAIN / -ENOBUFS condition is over and ObjVAIlease()
* can be called again.
*
* Note:
* - the callback gets executed by an arbitrary thread
* - WITH the boc mtx held
* so it should never block and only do minimal work
*
* ObjVAIlease() fills the viov array passed in with leases. returns:
*
* -EAGAIN: nothing available at the moment, storage will notify, no use to
* call again until notification
* -ENOBUFS: caller needs to return leases, storage will notify
* -EPIPE: BOS_FAILED for busy object
* -(errno): other problem, fatal
* 0: EOF
* n: number of viovs filled
*
* struct viov:
*
* the returned leases can be used by the caller until returned with
* ObjVAIreturn(). The storage guarantees that the lease member is a
* multiple of 8 (that is, the lower three bits are zero). These can be
* used by the caller between lease and return, but must be returned to
* zero before returning.
*
* ObjVAIreturn() returns leases
*
* it must be called with an array of lease values from viovs
* received when the caller can guarantee that they are no longer accessed
*
* ObjVAIfini() finalized iteration
*
* it must be called when iteration is done, irrespective of error status
*/

vai_hdl
ObjVAIinit(struct worker *wrk, struct objcore *oc, struct ws *ws,
vai_notify_cb *cb, void *cb_priv)
{
const struct obj_methods *om = obj_getmethods(oc);

CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);

if (om->vai_init == NULL)
return (NULL);
return (om->vai_init(wrk, oc, ws, cb, cb_priv));
}

int
ObjVAIlease(struct worker *wrk, vai_hdl vhdl, struct viov *viov, int n)
{
struct vai_hdl_preamble *vaip = vhdl;

AN(vaip);
assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2);
AN(vaip->vai_lease);
return (vaip->vai_lease(wrk, vhdl, viov, n));
}

void
ObjVAIreturn(struct worker *wrk, vai_hdl vhdl, uint64_t *leases, int n)
{
struct vai_hdl_preamble *vaip = vhdl;

AN(vaip);
assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2);
/* vai_return is optional */
if (vaip->vai_return == NULL)
return;
vaip->vai_return(wrk, vhdl, leases, n);
}

void
ObjVAIfini(struct worker *wrk, vai_hdl *vhdlp)
{
AN(vhdlp);
struct vai_hdl_preamble *vaip = *vhdlp;

AN(vaip);
assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2);
AN(vaip->vai_lease);
return (vaip->vai_fini(wrk, vhdlp));
}

/*====================================================================
* ObjGetSpace()
*
Expand Down Expand Up @@ -231,6 +325,29 @@ obj_extend_condwait(const struct objcore *oc)
(void)Lck_CondWait(&oc->boc->cond, &oc->boc->mtx);
}

// notify of an extension of the boc or state change
//lint -sem(obj_boc_notify_Unlock, thread_unlock)

static void
obj_boc_notify_Unlock(struct boc *boc)
{
struct vai_qe *qe, *next;

PTOK(pthread_cond_broadcast(&boc->cond));
qe = VSLIST_FIRST(&boc->vai_q_head);
VSLIST_FIRST(&boc->vai_q_head) = NULL;
while (qe != NULL) {
CHECK_OBJ(qe, VAI_Q_MAGIC);
AN(qe->flags & VAI_QF_INQUEUE);
qe->flags &= ~VAI_QF_INQUEUE;
next = VSLIST_NEXT(qe, list);
VSLIST_NEXT(qe, list) = NULL;
qe->cb(qe->hdl, qe->priv);
qe = next;
}
Lck_Unlock(&boc->mtx);
}

void
ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
{
Expand All @@ -241,14 +358,13 @@ ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
AN(om->objextend);
assert(l >= 0);

Lck_Lock(&oc->boc->mtx);
if (l > 0) {
Lck_Lock(&oc->boc->mtx);
obj_extend_condwait(oc);
om->objextend(wrk, oc, l);
oc->boc->fetched_so_far += l;
PTOK(pthread_cond_broadcast(&oc->boc->cond));
obj_boc_notify_Unlock(oc->boc);
}
Lck_Unlock(&oc->boc->mtx);

assert(oc->boc->state < BOS_FINISHED);
if (final && om->objtrimstore != NULL)
Expand All @@ -258,6 +374,17 @@ ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
/*====================================================================
*/

static inline void
objSignalFetchLocked(const struct objcore *oc, uint64_t l)
{
if (oc->boc->transit_buffer > 0) {
assert(oc->flags & OC_F_TRANSIENT);
/* Signal the new client position */
oc->boc->delivered_so_far = l;
PTOK(pthread_cond_signal(&oc->boc->cond));
}
}

uint64_t
ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l,
enum boc_state_e *statep)
Expand All @@ -272,13 +399,8 @@ ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l,
while (1) {
rv = oc->boc->fetched_so_far;
assert(l <= rv || oc->boc->state == BOS_FAILED);
if (oc->boc->transit_buffer > 0) {
assert(oc->flags & OC_F_TRANSIENT);
/* Signal the new client position */
oc->boc->delivered_so_far = l;
PTOK(pthread_cond_signal(&oc->boc->cond));
}
state = oc->boc->state;
objSignalFetchLocked(oc, l);
if (rv > l || state >= BOS_FINISHED)
break;
(void)Lck_CondWait(&oc->boc->cond, &oc->boc->mtx);
Expand All @@ -288,6 +410,51 @@ ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l,
*statep = state;
return (rv);
}

// get a new extension _or_ register a notification
uint64_t
ObjVAIGetExtend(struct worker *wrk, const struct objcore *oc, uint64_t l,
enum boc_state_e *statep, struct vai_qe *qe)
{
enum boc_state_e state;
uint64_t rv;

(void) wrk;
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(oc->boc, BOC_MAGIC);
CHECK_OBJ_NOTNULL(qe, VAI_Q_MAGIC);
Lck_Lock(&oc->boc->mtx);
rv = oc->boc->fetched_so_far;
assert(l <= rv || oc->boc->state == BOS_FAILED);
state = oc->boc->state;
objSignalFetchLocked(oc, l);
if (l == rv && state < BOS_FINISHED &&
(qe->flags & VAI_QF_INQUEUE) == 0) {
qe->flags |= VAI_QF_INQUEUE;
VSLIST_INSERT_HEAD(&oc->boc->vai_q_head, qe, list);
}
Lck_Unlock(&oc->boc->mtx);
if (statep != NULL)
*statep = state;
return (rv);
}

void
ObjVAICancel(struct worker *wrk, struct boc *boc, struct vai_qe *qe)
{

(void) wrk;
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
CHECK_OBJ_NOTNULL(qe, VAI_Q_MAGIC);

Lck_Lock(&boc->mtx);
// inefficient, but should be rare
if ((qe->flags & VAI_QF_INQUEUE) != 0)
VSLIST_REMOVE(&boc->vai_q_head, qe, vai_qe, list);
qe->flags = 0;
Lck_Unlock(&boc->mtx);
}

/*====================================================================
*/

Expand All @@ -313,8 +480,7 @@ ObjSetState(struct worker *wrk, const struct objcore *oc,

Lck_Lock(&oc->boc->mtx);
oc->boc->state = next;
PTOK(pthread_cond_broadcast(&oc->boc->cond));
Lck_Unlock(&oc->boc->mtx);
obj_boc_notify_Unlock(oc->boc);
}

/*====================================================================
Expand Down
Loading