/* This file is in the public domain. */ #include #include #include #include #include #include #include #include "impl.h" typedef enum { ST_WRITE = 1, ST_DROP, ST_CB, } SPEC_TYPE; #undef aio_oq_init #define OQ AIO_OQ #define OQE AIO_OQE struct aio_oqe { OQE *link; int special; union { struct { const char *data; int left; void (*done)(void *); void *donearg; } norm; struct { SPEC_TYPE type; void (*cb)(AIO_SPECIAL_OP, void *, int); AIO_SPECIAL_TYPE (*typecb)(void *, int); void *vv; int iv; } spec; } ; } ; #define head aio_v1.aio_head #define tail aio_v1.aio_tail #define len aio_v1.aio_len static void queue_common(OQ *q, OQE *e) { e->link = 0; *q->tail = e; q->tail = &e->link; } static void queue_norm(OQ *q, OQE *e, int l) { e->special = 0; e->norm.left = l; q->len += l; queue_common(q,e); } static void do_special(OQE *e, AIO_SPECIAL_OP op) { void (*cb)(AIO_SPECIAL_OP, void *, int); void *cbav; int cbai; cb = e->spec.cb; cbav = e->spec.vv; cbai = e->spec.iv; free(e); (*cb)(op,cbav,cbai); } static void donefree(OQE *e) { void (*done)(void *); void *donearg; done = e->norm.done; donearg = e->norm.donearg; free(e); if (done) (*done)(donearg); } void aio_oq_init(OQ *q, int ver) { switch (ver) { default: panic("aio_oq_init: impossible version"); break; case 1: break; } q->aio_version = ver; q->len = 0; q->head = 0; q->tail = &q->head; } unsigned int aio_oq_qlen(OQ *q) { return(q->len); } int aio_oq_headlen(OQ *q) { if (q->head == 0) return(AIO_OQ_HL_EMPTY); if (q->head->special) return(AIO_OQ_HL_SPECIAL); return(q->head->norm.left); } int aio_oq_empty(OQ *q) { return(q->head == 0); } int aio_oq_nonempty(OQ *q) { return(q->head != 0); } int aio_oq_queue_point(OQ *q, const void *data, int l) { OQE *e; if (l == AIO_STRLEN) l = strlen(data); if (l == 0) return(0); if (l < 1) panic("bad length"); e = malloc(sizeof(OQE)); if (! e) return(-1); e->norm.data = data; e->norm.done = 0; queue_norm(q,e,l); return(0); } int aio_oq_queue_copy(OQ *q, const void *data, int l) { OQE *e; if (l == AIO_STRLEN) l = strlen(data); if (l == 0) return(0); if (l < 1) panic("bad length"); e = malloc(sizeof(OQE)+l); if (! e) return(-1); e->norm.data = (void *) (e+1); e->norm.done = 0; bcopy(data,e+1,l); queue_norm(q,e,l); return(0); } int aio_oq_queue_free(OQ *q, void *data, int l) { OQE *e; if (l == AIO_STRLEN) l = strlen(data); if (l == 0) { free(data); return(0); } if (l < 1) panic("bad length"); e = malloc(sizeof(OQE)); if (! e) return(-1); e->norm.data = data; e->norm.done = &free; e->norm.donearg = data; queue_norm(q,e,l); return(0); } int aio_oq_queue_cb(OQ *q, const void *data, int l, void (*cb)(void *), void *cbarg) { OQE *e; if (l == AIO_STRLEN) l = strlen(data); if (l == 0) { (*cb)(cbarg); return(0); } if (l < 1) panic("bad length"); e = malloc(sizeof(OQE)); if (! e) return(-1); e->norm.data = data; e->norm.done = cb; e->norm.donearg = cbarg; queue_norm(q,e,l); return(0); } int aio_oq_queue_printf(OQ *q, const char *fmt, ...) { va_list ap; char *s; int l; OQE *e; va_start(ap,fmt); l = vasprintf(&s,fmt,ap); va_end(ap); if (l < 0) return(-1); if (l < 1) { free(s); return(0); } e = malloc(sizeof(OQE)); if (! e) { free(s); return(-1); } e->norm.data = s; e->norm.done = &free; e->norm.donearg = s; queue_norm(q,e,l); return(0); } int aio_oq_queue_special(OQ *q, AIO_SPECIAL_TYPE t, void (*cb)(AIO_SPECIAL_OP, void *, int), void *vv, int iv) { OQE *e; SPEC_TYPE ti; switch (t) { case AIO_SPECIAL_WRITE: ti = ST_WRITE; break; case AIO_SPECIAL_DROP: ti = ST_DROP; break; default: panic("bad type to aio_oq_queue_special"); break; } e = malloc(sizeof(OQE)); if (! e) return(-1); e->special = 1; e->spec.type = ti; e->spec.cb = cb; e->spec.vv = vv; e->spec.iv = iv; queue_common(q,e); return(0); } void aio_oq_set_special_typecb(OQ *q, AIO_SPECIAL_TYPE (*cb)(void *, int)) { OQE *e; e = q->head; if (!e || !e->special || !cb) return; e->spec.type = ST_CB; e->spec.typecb = cb; } int aio_oq_writev(OQ *q, int fd, int maxb) { static int maxiov = -1; static struct iovec *iov; static int iovn; int iovl; OQE *e; int nb; int totb; if (maxiov < 0) { maxiov = 64; iov = 0; iovn = 0; } iovl = 0; totb = 0; for <"iov"> (e=q->head;e;e=e->link) { if ((maxb >= 0) && (totb >= maxb)) break; if (iovl >= maxiov) break; if (e->special) { switch <"type"> (e->spec.type) { case ST_CB: switch ((*e->spec.typecb)(e->spec.vv,e->spec.iv)) { case AIO_SPECIAL_WRITE: case <"type"> ST_WRITE: if (e != q->head) break <"iov">; if (! (q->head = e->link)) q->tail = &q->head; do_special(e,AIO_SPECIAL_NORMAL); return(AIO_WRITEV_SPECIAL); break; case AIO_SPECIAL_DROP: case <"type"> ST_DROP: default: break; } break; default: panic("impossible special type"); break; } } else { if (iovl >= iovn) { struct iovec *t; t = realloc(iov,(iovn=iovl+4)*sizeof(*iov)); if (! t) { errno = ENOMEM; return(AIO_WRITEV_ERROR); } iov = t; } nb = e->norm.left; if ((maxb >= 0) && (nb > maxb-totb)) nb = maxb - totb; iov[iovl++] = (struct iovec){ .iov_base=dequal(e->norm.data), .iov_len=nb }; totb += nb; } } if (iovl < 1) return(0); nb = writev(fd,iov,iovl); if (nb < 0) return(AIO_WRITEV_ERROR); return(nb); } int aio_oq_dropdata_cb(OQ *q, int n, void (*cb)(const void *, int, void *), void *cbarg) { OQE *e; while <"drop"> ((e = q->head)) { if (e->special) { switch <"type"> (e->spec.type) { case ST_WRITE: if (n > 0) panic("dropping write special"); break <"drop">; case ST_DROP: case ST_CB: if (! (q->head = e->link)) q->tail = &q->head; do_special(e,AIO_SPECIAL_NORMAL); break; default: panic("impossible special type"); break; } } else { if (!n || (n < e->norm.left)) break <"drop">; if (! (q->head = e->link)) q->tail = &q->head; if (cb) (*cb)(e->norm.data,e->norm.left,cbarg); q->len -= e->norm.left; n -= e->norm.left; donefree(e); } } if (e) { if (n) { if (cb) (*cb)(e->norm.data,n,cbarg); e->norm.data += n; e->norm.left -= n; q->len -= n; } return(0); } else { if (n) panic("dropping more than is in queue"); if (q->len) panic("empty queue with nonzero length"); return(1); } } int aio_oq_dropdata(OQ *q, int n) { return(aio_oq_dropdata_cb(q,n,0,0)); } void aio_oq_flush(OQ *q) { OQE *e; while (q->head) { e = q->head; if (! (q->head = e->link)) q->tail = &q->head; if (e->special) { do_special(e,AIO_SPECIAL_FLUSH); } else { q->len -= e->norm.left; donefree(e); } } if (q->len) panic("queue has nonzero length after flushing"); if (q->tail != &q->head) panic("queue tail pointer wrong after flushing"); }