/* This file is in the public domain. */ #include #include #include #include #include #include #include #include #include "impl.h" #undef aio_pq_init #define PQ AIO_PQ #define PP AIO_PP #define PQE AIO_PQE #define PQPP AIO_PQPP typedef struct writev_state WRITEV_STATE; struct writev_state { struct iovec *iov; int *pv; int pa; PQE **heads; PP **parts; int vl; } ; struct aio_pqe { PQE *link; PP *data; } ; struct aio_pp { PP *link; const char *data; int left; void (*done)(void *); void *donearg; } ; struct aio_pqpp { PQE *head; PQE **tail; unsigned int plen; PP *partial; PP **ptail; } ; #define len aio_v1.aio_len #define npri aio_v1.aio_npri #define perpri aio_v1.aio_perpri #define inprogress aio_v1.aio_inprogress static int maxiov = -1; void aio_pq_init(PQ *q, int ver, int np) { int i; PQPP *pp; switch (ver) { default: panic("aio_pq_init: impossible version"); break; case 1: break; } if (np < 1) return; #ifdef DETAIL_TRACE printf("aio_pq_init %p npri=%d\n",(void *)q,np); #endif q->aio_version = ver; q->len = 0; q->npri = np; q->perpri = malloc(np*sizeof(*q->perpri)); for (i=np-1;i>=0;i--) { pp = &q->perpri[i]; pp->head = 0; pp->tail = &pp->head; pp->plen = 0; pp->partial = 0; pp->ptail = &pp->partial; } q->inprogress = -1; #ifdef DETAIL_TRACE printf("aio_pq_init perpri vec ["); for (i=0;iperpri[i]); printf("\n"); #endif } void aio_pq_deinit(PQ *q) { #ifdef DETAIL_TRACE printf("aio_pq_deinit %p\n",(void *)q); #endif aio_pq_flush(q,AIO_ALL_PRIO); free(q->perpri); } static void queue_pp(PQPP *p, PP *e, int l) { #ifdef DETAIL_TRACE printf("queue_pp: perpri %p, pp %p, data %p, len %d\n",(void *)p,(void *)e,(const void *)e->data,l); #endif e->left = l; p->plen += l; e->link = 0; *p->ptail = e; p->ptail = &e->link; #ifdef DETAIL_TRACE printf("queue_pp: plen now %d\n",p->plen); #endif } void aio_pq_queue_point(PQ *q, int p, const void *data, int dlen) { PP *e; #ifdef DETAIL_TRACE printf("aio_pq_queue_point: q %p pri %d data %p len ",(void *)q,p,data); if (dlen == AIO_STRLEN) printf("STRLEN"); else printf("%d",dlen); printf("\n"); #endif if (dlen == AIO_STRLEN) { dlen = strlen(data); #ifdef DETAIL_TRACE printf("aio_pq_queue_point: STRLEN -> %d\n",dlen); #endif } if (dlen == 0) { #ifdef DETAIL_TRACE printf("aio_pq_queue_point: zero length\n"); #endif return; } if (dlen < 1) panic("bad length"); if ((p < 0) || (p >= q->npri)) panic("bad prio"); e = malloc(sizeof(PP)); e->data = data; e->done = 0; queue_pp(&q->perpri[p],e,dlen); #ifdef DETAIL_TRACE printf("aio_pq_queue_point: done, queued %p\n",(void *)e); #endif } void aio_pq_queue_copy(PQ *q, int p, const void *data, int dlen) { PP *e; #ifdef DETAIL_TRACE printf("aio_pq_queue_copy: q %p pri %d data %p len ",(void *)q,p,data); if (dlen == AIO_STRLEN) printf("STRLEN"); else printf("%d",dlen); printf("\n"); #endif if (dlen == AIO_STRLEN) { dlen = strlen(data); #ifdef DETAIL_TRACE printf("aio_pq_queue_copy: STRLEN -> %d\n",dlen); #endif } if (dlen == 0) { #ifdef DETAIL_TRACE printf("aio_pq_queue_copy: zero length\n"); #endif return; } if (dlen < 1) panic("bad length"); if ((p < 0) || (p >= q->npri)) panic("bad prio"); e = malloc(sizeof(PP)+dlen); e->data = (void *) (e+1); e->done = 0; bcopy(data,e+1,dlen); queue_pp(&q->perpri[p],e,dlen); #ifdef DETAIL_TRACE printf("aio_pq_queue_copy: done, queued %p\n",(void *)e); #endif } void aio_pq_queue_free(PQ *q, int p, void *data, int dlen) { PP *e; #ifdef DETAIL_TRACE printf("aio_pq_queue_free: q %p pri %d data %p len ",(void *)q,p,data); if (dlen == AIO_STRLEN) printf("STRLEN"); else printf("%d",dlen); printf("\n"); #endif if (dlen == AIO_STRLEN) { dlen = strlen(data); #ifdef DETAIL_TRACE printf("aio_pq_queue_free: STRLEN -> %d\n",dlen); #endif } if (dlen == 0) { #ifdef DETAIL_TRACE printf("aio_pq_queue_free: zero length\n"); #endif free(data); return; } if (dlen < 1) panic("bad length"); if ((p < 0) || (p >= q->npri)) panic("bad prio"); e = malloc(sizeof(PP)); e->data = data; e->done = &free; e->donearg = data; queue_pp(&q->perpri[p],e,dlen); #ifdef DETAIL_TRACE printf("aio_pq_queue_free: done, queued %p\n",(void *)e); #endif } void aio_pq_queue_cb(PQ *q, int p, void *data, int dlen, void (*cb)(void *), void *cbarg) { PP *e; #ifdef DETAIL_TRACE printf("aio_pq_queue_cb: q %p pri %d data %p len ",(void *)q,p,data); if (dlen == AIO_STRLEN) printf("STRLEN"); else printf("%d",dlen); printf("\n"); #endif if (dlen == AIO_STRLEN) { dlen = strlen(data); #ifdef DETAIL_TRACE printf("aio_pq_queue_cb: STRLEN -> %d\n",dlen); #endif } if (dlen == 0) { #ifdef DETAIL_TRACE printf("aio_pq_queue_cb: zero length\n"); #endif (*cb)(cbarg); return; } if (dlen < 1) panic("bad length"); if ((p < 0) || (p >= q->npri)) panic("bad prio"); e = malloc(sizeof(PP)); e->data = data; e->done = cb; e->donearg = cbarg; queue_pp(&q->perpri[p],e,dlen); #ifdef DETAIL_TRACE printf("aio_pq_queue_cb: done, queued %p\n",(void *)e); #endif } void aio_pq_queue_printf(PQ *q, int p, const char *fmt, ...) { va_list ap; char *s; int l; PP *e; #ifdef DETAIL_TRACE printf("aio_pq_queue_printf: q %p pri %d fmt %p\n",(void *)q,p,(const void *)fmt); #endif if ((p < 0) || (p >= q->npri)) panic("bad prio"); va_start(ap,fmt); l = vasprintf(&s,fmt,ap); va_end(ap); if (l < 1) { #ifdef DETAIL_TRACE printf("aio_pq_queue_printf: zero length\n"); #endif free(s); return; } e = malloc(sizeof(PP)); e->data = s; e->done = &free; e->donearg = s; queue_pp(&q->perpri[p],e,l); #ifdef DETAIL_TRACE printf("aio_pq_queue_printf: done, queued %p\n",(void *)e); #endif } void aio_pq_boundary(PQ *q, int p) { PQPP *pp; PQE *e; #ifdef DETAIL_TRACE printf("aio_pq_boundary: q %p pri %d\n",(void *)q,p); #endif if ((p < 0) || (p >= q->npri)) panic("bad prio"); pp = &q->perpri[p]; if (! pp->partial) { #ifdef DETAIL_TRACE printf("aio_pq_boundary: nothing to move\n"); #endif return; } e = malloc(sizeof(PQE)); e->data = pp->partial; e->link = 0; *pp->tail = e; pp->tail = &e->link; pp->partial = 0; pp->ptail = &pp->partial; #ifdef DETAIL_TRACE printf("aio_pq_boundary: moved partial len %d, entry %p\n",pp->plen,(void *)e); #endif q->len += pp->plen; pp->plen = 0; } int aio_pq_writev_drop(AIO_PQ *q, int fd, int maxbytes) { static WRITEV_STATE ws; int totb; int ip; PQPP *pp; PQE *e; PP *part; int i; int nb; int rv; int p; if (maxiov < 0) { int mib[2]; size_t oldlen; mib[0] = CTL_KERN; mib[1] = KERN_IOV_MAX; oldlen = sizeof(maxiov); if (sysctl(&mib[0],2,&maxiov,&oldlen,0,0) < 0) { panic("can't get kern.iov_max: %s",strerror(errno)); } if (maxiov > 64) maxiov = 64; if (maxiov < 1) maxiov = 1; #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: maxiov %d\n",maxiov); #endif ws.iov = malloc(maxiov*sizeof(*ws.iov)); ws.pv = malloc(maxiov*sizeof(*ws.pv)); ws.pa = 0; ws.heads = 0; ws.parts = 0; } #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: %p, fd %d, maxb %d, ip %d\n",(void *)q,fd,maxbytes,q->inprogress); #endif ws.vl = 0; if (q->npri > ws.pa) { free(ws.heads); free(ws.parts); ws.pa = q->npri; ws.heads = malloc(ws.pa*sizeof(*ws.heads)); ws.parts = malloc(ws.pa*sizeof(*ws.parts)); } ip = q->inprogress; for (i=q->npri-1;i>=0;i--) { ws.heads[i] = q->perpri[i].head; ws.parts[i] = ws.heads[i] ? ws.heads[i]->data : 0; } totb = 0; while (1) { #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: loop top, totb %d vl %d\n",totb,ws.vl); #endif if ((maxbytes >= 0) && (totb >= maxbytes)) break; if (ws.vl >= maxiov) break; if (ip < 0) { #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: ip<0:"); for (i=0;inpri;i++) printf(" %p(%p)",(void *)ws.heads[i],(void *)ws.parts[i]); printf("\n"); #endif for (ip=q->npri-1;ip>=0;ip--) if (ws.parts[ip]) break; if (ip < 0) { #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: nothing left in queue\n"); #endif break; } } #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: prio %d part %p\n",ip,(void *)ws.parts[ip]); #endif ws.iov[ws.vl] = (struct iovec){ .iov_base=dequal(ws.parts[ip]->data), .iov_len=ws.parts[ip]->left }; ws.pv[ws.vl] = ip; ws.vl ++; ws.parts[ip] = ws.parts[ip]->link; #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: parts[%d] now %p\n",ip,(void *)ws.parts[ip]); #endif if (! ws.parts[ip]) { ws.heads[ip] = ws.heads[ip]->link; ws.parts[ip] = ws.heads[ip] ? ws.heads[ip]->data : 0; #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: heads[%d] now %p, parts[%d] now %p\n",ip,(void *)ws.heads[ip],ip,(void *)ws.parts[ip]); #endif ip = -1; } } if (ws.vl < 1) { #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: empty iovec\n"); #endif return(0); } #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: iovec:"); for (i=0;i 0) { if (i >= ws.vl) panic("writev overran data written"); p = ws.pv[i]; if ((p < 0) || (p >= q->npri)) panic("aio_pq_writev_drop: impossible priority %d",p); pp = &q->perpri[p]; e = pp->head; part = e->data; if (nb < part->left) { #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: dropping %d from %p (data %p)\n",nb,(void *)part,(const void *)part->data); #endif part->left -= nb; part->data += nb; q->len -= nb; ip = p; break; } q->len -= part->left; nb -= part->left; #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: dropping entire %p (data %p), nb now %d\n",(void *)part,(const void *)part->data,nb); #endif if (part->done) (*part->done)(part->donearg); e->data = part->link; free(part); if (! e->data) { pp->head = e->link; if (! pp->head) pp->tail = &pp->head; #ifdef DETAIL_TRACE printf("aio_pq_writev_drop: packet end\n"); #endif free(e); ip = -1; } else { ip = p; } i ++; } q->inprogress = ip; return(rv); } unsigned int aio_pq_qlen(AIO_PQ *q) { return(q->len); } int aio_pq_empty(AIO_PQ *q) { return(q->len==0); } int aio_pq_nonempty(AIO_PQ *q) { return(q->len!=0); } static int free_pp_list(PP *list) { int n; PP *e; n = 0; while (list) { e = list; list = e->link; n += e->left; if (e->done) (*e->done)(e->donearg); free(e); } return(n); } void aio_pq_flush(AIO_PQ *q, int pri) { PQPP *pp; PQE *e; #ifdef DETAIL_TRACE printf("aio_pq_flush: queue %p, pri ",(void *)q); if (pri == AIO_ALL_PRIO) printf("ALL"); else printf("%d",pri); printf("\n"); #endif if (pri == AIO_ALL_PRIO) { for (pri=q->npri-1;pri>=0;pri--) aio_pq_flush(q,pri); return; } if ((pri < 0) || (pri >= q->npri)) return; pp = &q->perpri[pri]; free_pp_list(pp->partial); pp->plen = 0; pp->partial = 0; pp->ptail = &pp->partial; while ((e = pp->head)) { q->len -= free_pp_list(e->data); pp->head =e->link; free(e); } pp->tail = &pp->head; #ifdef DETAIL_TRACE printf("aio_pq_flush: overall len now %d\n",q->len); #endif }