/* This file is in the public domain. */ #include #include #include #include #ifdef DEBUG_BUILD #include #include #endif #include #include "impl.h" #define INFTIM (-1) // XXX Linux botch #define IDBASE 1000 typedef struct call_once_priv CALL_ONCE_PRIV; typedef struct bfstate BFSTATE; struct bfstate { int again; int tmo; } ; struct call_once_priv { int id; void (*fn)(void *); void *arg; } ; struct iofd { int fd; int (*rtest)(void *); int (*wtest)(void *); void (*rd)(void *); void (*wr)(void *); void *arg; unsigned int flags; #define IOFF_WANT_R 0x00000001 #define IOFF_WANT_W 0x00000002 #define IOFF_DEAD 0x00000004 int px; } ; static AIO_LOOP gloop; static char *order_used = &aio__order_used; #define BIMP (order_used ? &aio__bimpl_avl : &aio__bimpl_simple) #ifdef DEBUG_BUILD #define TRACESIZE 1048576 static char *tracering = 0; static int *tracehead; void aio__trace_append(const void *data, int len) { const unsigned char *dp; int h; if (! tracering) return; h = *tracehead; dp = data; if (len > TRACESIZE) { dp += len - TRACESIZE; len = TRACESIZE; } if (len <= TRACESIZE-h) { bcopy(dp,tracering+h,len); h += len; if (h >= TRACESIZE) h -= TRACESIZE; } else { bcopy(dp,tracering+h,TRACESIZE-h); bcopy(dp+(TRACESIZE-h),tracering,len-(TRACESIZE-h)); h = TRACESIZE - h; } *tracehead = h; } int aio__trace_setup(int fd) { void *mmrv; ftruncate(fd,TRACESIZE+sizeof(int)); mmrv = mmap(0,TRACESIZE+sizeof(int),PROT_READ|PROT_WRITE,MAP_FILE|MAP_SHARED,fd,0); if (mmrv == MAP_FAILED) return(-1); tracehead = mmrv; tracering = (char *)(tracehead+1); *tracehead = 0; bzero(tracering,TRACESIZE); return(0); } void aio__trace_s(const char *s) { aio__trace_append(s,strlen(s)); } int aio__tracing(void) { return(!!tracering); } #endif static void aio_poll_init_(AIO_LOOP *l) { l->iofds = 0; l->niofds = 0; l->bfns = (*BIMP->init)(); l->pfds = 0; l->pfdn = 0; l->pfdmax = 0; l->justloop = 0; l->addedblock = 0; } static void aio_cleanup(AIO_LOOP *l) { free(l->iofds); (*BIMP->cleanup)(l->bfns); free(l->pfds); aio_poll_init_(l); } static int externalize_poll_id(int id) { return(IDBASE+id+id); } static int externalize_block_id(int id) { return(IDBASE+id+id+1); } static int internalize_poll_id(AIO_LOOP *l, int id) { if (id < IDBASE) { panic("invalid id"); return(-1); } id -= IDBASE; if (id & 1) { panic("invalid id"); return(-1); } id >>= 1; if ((id < 0) || (id >= l->niofds) || !l->iofds[id]) { panic("id not found"); return(-1); } return(id); } int aio__internalize_block_id(AIO_LOOP *l, int id) { if (id < IDBASE) { panic("invalid id"); return(-1); } id -= IDBASE; if (! (id & 1)) { panic("invalid id"); return(-1); } id >>= 1; if (! (*BIMP->validid)(l->bfns,id)) { panic("id not found"); return(-1); } return(id); } static int call_once_call(void *pv) { CALL_ONCE_PRIV *p; void (*fn)(void *); void *arg; p = pv; aio_remove_block(p->id); fn = p->fn; arg = p->arg; free(p); (*fn)(arg); return(AIO_BLOCK_LOOP); } AIO_LOOP *aio_poll_init_of(void) { AIO_LOOP *l; l = malloc(sizeof(AIO_LOOP)); if (l) aio_poll_init_(l); return(l); } void aio_poll_done_of(AIO_LOOP *l) { if (l) { if (l == &gloop) panic("freeing global loop"); aio_cleanup(l); free(l); } } void aio_poll_init(void) { aio_poll_init_(&gloop); } void aio_poll_reinit(void) { aio_cleanup(&gloop); } int aio_add_poll_of(AIO_LOOP *l, int fd, int (*rtest)(void *), int (*wtest)(void *), void (*rd)(void *), void (*wr)(void *), void *arg) { IOFD *io; int i; IOFD **t; io = malloc(sizeof(IOFD)); if (! io) return(AIO_ERR); do <"gotspace"> { for (i=l->niofds-1;i>=0;i--) if (l->iofds[i] == 0) break <"gotspace">; i = l->niofds; t = realloc(l->iofds,sizeof(IOFD *)*++l->niofds); if (t == 0) { free(io); return(AIO_ERR); } l->iofds = t; } while (0); aio__trace_s("p+"); aio__trace_append(&fd,sizeof(fd)); aio__trace_append(&rtest,sizeof(rtest)); aio__trace_append(&wtest,sizeof(wtest)); aio__trace_append(&rd,sizeof(rd)); aio__trace_append(&wr,sizeof(wr)); aio__trace_append(&arg,sizeof(arg)); l->iofds[i] = io; io->fd = fd; io->rtest = rtest; io->wtest = wtest; io->rd = rd; io->wr = wr; io->arg = arg; io->flags = 0; i = externalize_poll_id(i); aio__trace_append(&io,sizeof(io)); aio__trace_append(&i,sizeof(i)); return(i); } int aio_add_poll(int fd, int (*rtest)(void *), int (*wtest)(void *), void (*rd)(void *), void (*wr)(void *), void *arg) { return(aio_add_poll_of(&gloop,fd,rtest,wtest,rd,wr,arg)); } int aio_change_poll_rtest_of(AIO_LOOP *l, int id, int (*rtest)(void *)) { id = internalize_poll_id(l,id); if (id < 0) return(-1); l->iofds[id]->rtest = rtest; return(0); } int aio_change_poll_rtest(int id, int (*rtest)(void *)) { return(aio_change_poll_rtest_of(&gloop,id,rtest)); } int aio_change_poll_wtest_of(AIO_LOOP *l, int id, int (*wtest)(void *)) { id = internalize_poll_id(l,id); if (id < 0) return(-1); l->iofds[id]->wtest = wtest; return(0); } int aio_change_poll_wtest(int id, int (*wtest)(void *)) { return(aio_change_poll_wtest_of(&gloop,id,wtest)); } int aio_change_poll_rfn_of(AIO_LOOP *l, int id, void (*rfn)(void *)) { id = internalize_poll_id(l,id); if (id < 0) return(-1); l->iofds[id]->rd = rfn; return(0); } int aio_change_poll_rfn(int id, void (*rfn)(void *)) { return(aio_change_poll_rfn_of(&gloop,id,rfn)); } int aio_change_poll_wfn_of(AIO_LOOP *l, int id, void (*wfn)(void *)) { id = internalize_poll_id(l,id); if (id < 0) return(-1); l->iofds[id]->wr = wfn; return(0); } int aio_change_poll_wfn(int id, void (*wfn)(void *)) { return(aio_change_poll_wfn_of(&gloop,id,wfn)); } int aio_change_poll_arg_of(AIO_LOOP *l, int id, void *arg) { id = internalize_poll_id(l,id); if (id < 0) return(-1); l->iofds[id]->arg = arg; return(0); } int aio_change_poll_arg(int id, void *arg) { return(aio_change_poll_arg_of(&gloop,id,arg)); } int aio_add_block_of(AIO_LOOP *l, int (*fn)(void *), void *arg) { BLOCKFN *bf; int i; bf = malloc(sizeof(BLOCKFN)); if (! bf) return(AIO_ERR); bf->fn = fn; bf->arg = arg; bf->flags = 0; i = (*BIMP->new)(l->bfns,bf); aio__trace_s("b+"); l->addedblock = 1; i = externalize_block_id(i); aio__trace_append(&fn,sizeof(fn)); aio__trace_append(&arg,sizeof(arg)); aio__trace_append(&bf,sizeof(bf)); aio__trace_append(&i,sizeof(int)); return(i); } int aio_add_block(int (*fn)(void *), void *arg) { return(aio_add_block_of(&gloop,fn,arg)); } int aio_call_once_of(AIO_LOOP *l, void (*fn)(void *), void *arg) { CALL_ONCE_PRIV *p; p = malloc(sizeof(CALL_ONCE_PRIV)); if (! p) return(AIO_ERR); p->id = aio_add_block_of(l,&call_once_call,p); if (p->id == AIO_ERR) { free(p); return(AIO_ERR); } p->fn = fn; p->arg = arg; return(AIO_OK); } int aio_call_once(void (*fn)(void *), void *arg) { return(aio_call_once_of(&gloop,fn,arg)); } int aio_remove_poll_of(AIO_LOOP *l, int id) { aio__trace_s("p-"); aio__trace_append(&id,sizeof(id)); id = internalize_poll_id(l,id); if (id < 0) return(-1); l->iofds[id]->flags |= IOFF_DEAD; return(0); } int aio_remove_poll(int id) { return(aio_remove_poll_of(&gloop,id)); } int aio_remove_block_of(AIO_LOOP *l, int id) { aio__trace_s("b-"); aio__trace_append(&id,sizeof(int)); id = aio__internalize_block_id(l,id); if (id < 0) return(-1); (*BIMP->remove)(l->bfns,id); return(0); } int aio_remove_block(int id) { return(aio_remove_block_of(&gloop,id)); } int aio_pre_poll_v_of(int n, AIO_LOOP **v) { int i; IOFD *io; int j; int pfdn; int pfdmax; struct pollfd *pfds; AIO_LOOP *l; int hb; struct pollfd *t; pfdn = 0; pfdmax = v[0]->pfdmax; pfds = v[0]->pfds; hb = 0; for (j=n-1;j>=0;j--) { l = v[j]; if ((*BIMP->anyblock)(l->bfns)) hb = 1; for (i=l->niofds-1;i>=0;i--) { io = l->iofds[i]; if (! io) continue; if (io->flags & IOFF_DEAD) { free(io); l->iofds[i] = 0; continue; } io->flags = (io->flags & ~(IOFF_WANT_R|IOFF_WANT_W)) | ((*io->rtest)(io->arg) ? IOFF_WANT_R : 0) | ((*io->wtest)(io->arg) ? IOFF_WANT_W : 0); if (io->flags & (IOFF_WANT_R|IOFF_WANT_W)) { struct pollfd *pfd; if (pfdn >= pfdmax) { t = realloc(pfds,(pfdmax=pfdn+8)*sizeof(*pfds)); if (! t) return(-1); pfds = t; } io->px = pfdn++; pfd = &pfds[io->px]; pfd->fd = io->fd; pfd->events = ((io->flags & IOFF_WANT_R) ? POLLIN|POLLRDNORM : 0) | ((io->flags & IOFF_WANT_W) ? POLLOUT|POLLWRNORM : 0); } else { io->px = -1; } } } aio__trace_s(hb?"1+":"1-"); v[0]->pfds = pfds; v[0]->pfdmax = pfdmax; v[0]->pfdn = pfdn; v[0]->haveblock = hb; return(0); } int aio_pre_poll_l_of(int n, ...) { va_list ap; AIO_LOOP **v; int i; v = malloc(n*sizeof(AIO_LOOP *)); if (! v) return(-1); va_start(ap,n); for (i=0;ifn,sizeof(bf->fn)); aio__trace_append(&bf->arg,sizeof(bf->arg)); usec = (*bf->fn)(bf->arg); aio__trace_s("cb)"); aio__trace_append(&usec,sizeof(int)); if (usec < 0) { switch (usec) { case AIO_BLOCK_NIL: break; case AIO_BLOCK_LOOP: s->again = 1; break; default: return(1); break; } } else { if ((s->tmo == INFTIM) || (usec < s->tmo)) s->tmo = usec; } return(0); } int aio_do_poll_v_of(int n, AIO_LOOP **v) { int prv; BFSTATE s; int i; s.again = 0; s.tmo = INFTIM; aio__trace_s(v[0]->haveblock?"2+":"2-"); if (v[0]->haveblock) { aio__trace_append(&v[0]->pfdn,sizeof(int)); aio__trace_append(v[0]->pfds,v[0]->pfdn*sizeof(struct pollfd)); prv = poll(v[0]->pfds,v[0]->pfdn,0); aio__trace_s("z"); aio__trace_append(&prv,sizeof(int)); if (prv > 0) { aio__trace_append(&v[0]->pfdn,sizeof(int)); aio__trace_append(v[0]->pfds,v[0]->pfdn*sizeof(struct pollfd)); } if (prv) return(prv); for (i=n-1;i>=0;i--) v[i]->addedblock = 0; (*BIMP->iter)(v,n,&callblock,&s); if (s.again) { aio__trace_s("a"); v[0]->justloop = 1; return(0); } for (i=n-1;i>=0;i--) { if (v[i]->addedblock) { aio__trace_s("j"); v[0]->justloop = 1; return(0); } } } aio__trace_s("t"); aio__trace_append(&s.tmo,sizeof(s.tmo)); prv = poll(v[0]->pfds,v[0]->pfdn,s.tmo); aio__trace_s("Z"); aio__trace_append(&prv,sizeof(prv)); if (prv > 0) { aio__trace_append(&v[0]->pfdn,sizeof(int)); aio__trace_append(v[0]->pfds,v[0]->pfdn*sizeof(struct pollfd)); } return(prv); } int aio_do_poll_l_of(int n, ...) { va_list ap; AIO_LOOP **v; int i; v = malloc(n*sizeof(AIO_LOOP *)); if (! v) { errno = ENOMEM; return(-1); } va_start(ap,n); for (i=0;ijustloop?"3+":"3-"); if (v[0]->justloop) { v[0]->justloop = 0; return; } pfds = v[0]->pfds; for (j=n-1;j>=0;j--) { l = v[j]; for (i=l->niofds-1;i>=0;i--) { io = l->iofds[i]; if (! io) continue; if (io->flags & IOFF_DEAD) continue; if ( (io->flags & IOFF_WANT_R) && (pfds[io->px].revents & (POLLIN|POLLRDNORM|POLLERR|POLLHUP|POLLNVAL)) ) { aio__trace_s("cr("); aio__trace_append(&io,sizeof(io)); aio__trace_append(&io->rd,sizeof(io->rd)); aio__trace_append(&io->arg,sizeof(io->arg)); (*io->rd)(io->arg); aio__trace_s("cr)"); } if ( (io->flags & IOFF_WANT_W) && (pfds[io->px].revents & (POLLOUT|POLLWRNORM|POLLERR|POLLHUP|POLLNVAL)) ) { aio__trace_s("cw("); aio__trace_append(&io,sizeof(io)); aio__trace_append(&io->wr,sizeof(io->wr)); aio__trace_append(&io->arg,sizeof(io->arg)); (*io->wr)(io->arg); aio__trace_s("cw)"); } } } } int aio_post_poll_l_of(int n, ...) { va_list ap; AIO_LOOP **v; int i; v = malloc(n*sizeof(AIO_LOOP *)); if (! v) return(-1); va_start(ap,n); for (i=0;igetorder)(l->bfns,id)); } int aio_get_block_order(int id) { return(aio_get_block_order_of(&gloop,id)); }