#include #include #include #include #include #include "aio.h" #include "tester.h" extern const char *__progname; static AIO_OQ oq; static TIME base; static unsigned int credit; static unsigned char rbuf[65536]; static int rtest_s(void *arg __attribute__((__unused__))) { TIME t; unsigned int c; t = now(); t -= base; c = (t * BPS) / 1000000; if (c) { credit += c; if (credit > 32768) credit = 32768; base += (c * (TIME)1000000) / BPS; } return(credit>0); } static void rd_s(void *arg __attribute__((__unused__))) { int n; int r; n = sizeof(rbuf); if (n > credit) n = credit; r = read(scpipe[0],&rbuf[0],n); if (r < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; } fprintf(stderr,"%s: copier read: %s\n",__progname,strerror(errno)); exit(1); } if (r == 0) exit(0); aio_oq_queue_copy(&oq,&rbuf[0],r); credit -= r; } static int delay_1ms(void *arg __attribute__((__unused__))) { return(1); } static int wtest_r(void *arg __attribute__((__unused__))) { return(aio_oq_nonempty(&oq)); } static void wr_r(void *arg __attribute__((__unused__))) { int nw; nw = aio_oq_writev(&oq,crpipe[1],-1); if (nw < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; } fprintf(stderr,"%s: copier write: %s\n",__progname,strerror(errno)); exit(1); } aio_oq_dropdata(&oq,nw); } void copier_main(void) { close(crpipe[0]); close(scpipe[1]); set_nbio(scpipe[0]); set_nbio(crpipe[1]); aio_oq_init(&oq); credit = 0; base = now(); aio_poll_init(); aio_add_poll(scpipe[0],&rtest_s,&aio_rwtest_never,&rd_s,0,0); aio_add_poll(crpipe[1],&aio_rwtest_never,&wtest_r,0,&wr_r,0); aio_add_block(&delay_1ms,0); aio_event_loop(); fprintf(stderr,"%s: poll: %s\n",__progname,strerror(errno)); exit(1); }