#include #include #include #include #include #include #include #include #include #include #include #include extern const char *__progname; #include "pidconn.h" typedef unsigned long long int ULLI; int no_pidconn = 0; static int pcfd; static int pcpipe; typedef enum { REPORT_NORMAL = 1, REPORT_DONE, } REPORTKIND; typedef struct pcc PCC; typedef struct report REPORT; struct pcc { PCC *flink; PCC *blink; unsigned int flags; #define PCCF_DEAD 0x00000001 int fd; AIO_OQ oq; ES ib; int pid; int bid; } ; struct report { REPORTKIND kind; union { struct { // if REPORT_NORMAL ULLI at; ULLI of; } normal; // nothing if REPORT_DONE } u; } ; #define REPORTSIZE (sizeof(REPORT)) static time_t last_report = 0; static int pcaid; static int reportid; static PCC *pccs = 0; static REPORT reportbuf; static int reportfill; static ULLI report_at; static ULLI report_of; #define Cisspace(x) isspace((unsigned char)(x)) static void set_nbio(int fd) { fcntl(fd,F_SETFL,fcntl(fd,F_GETFL,0)|O_NONBLOCK); } static int pc_destroy(void *cv) { PCC *c; c = cv; if (! (c->flags & PCCF_DEAD)) abort(); aio_remove_block(c->bid); c->bid = AIO_NOID; es_done(&c->ib); aio_oq_flush(&c->oq); close(c->fd); free(c); return(AIO_BLOCK_LOOP); } static void pc_kill(PCC *c) { if (c->flags & PCCF_DEAD) return; c->flags |= PCCF_DEAD; aio_remove_poll(c->pid); c->pid = AIO_NOID; c->bid = aio_add_block(&pc_destroy,c); if (c->flink) c->flink->blink = c->blink; if (c->blink) c->blink->flink = c->flink; else pccs = c->flink; } static int pc_drain(void *cv) { PCC *c; c = cv; if (aio_oq_nonempty(&c->oq)) return(AIO_BLOCK_NIL); aio_remove_block(c->bid); c->bid = AIO_NOID; pc_kill(c); return(AIO_BLOCK_LOOP); } static void pc_drain_kill(PCC *c) { c->bid = aio_add_block(&pc_drain,c); } static int wtest_pcc(void *cv) { return(aio_oq_nonempty(&((PCC *)cv)->oq)); } // XXX pass arg down to here to give more detailed help? static void do__help(PCC *c, int arglen, const char *cmd) { if (arglen) { aio_oq_queue_printf(&c->oq,"%s: takes no argument\n",cmd); return; } aio_oq_queue_point(&c->oq,"\ ?, help\n\ Print this help.\n\ exit, quit\n\ Close connection.\n\ stat\n\ Print status.\n\ ",AIO_STRLEN); } static void do__questionmark(PCC *c, const char *arg, int arglen) { (void)arg; do__help(c,arglen,"?"); } static void do__quit_exit_common(PCC *c, int arglen, const char *cmd) { if (arglen) { aio_oq_queue_printf(&c->oq,"%s: takes no argument\n",cmd); return; } aio_oq_queue_point(&c->oq,"Goodbye\n",AIO_STRLEN); pc_drain_kill(c); } static void do_exit(PCC *c, const char *arg, int arglen) { (void)arg; do__quit_exit_common(c,arglen,"exit"); } static void do_help(PCC *c, const char *arg, int arglen) { (void)arg; do__help(c,arglen,"help"); } static void do_quit(PCC *c, const char *arg, int arglen) { (void)arg; do__quit_exit_common(c,arglen,"quit"); } // XXX use arg for something? static void do_stat(PCC *c, const char *arg, int arglen) { (void)arg; if (arglen) { aio_oq_queue_point(&c->oq,"stat: takes no argument\n",AIO_STRLEN); return; } aio_oq_queue_printf(&c->oq,"%llu %llu\n",report_at,report_of); } static void pc_iline(PCC *c, const void *text, int len) { const char *b; int c0; int c1; int e; int a; b = text; e = len; while ((e > 0) && Cisspace(b[e-1])) e --; if (e < 1) return; for (c0=0;(c0= e) abort(); if (b[c0] == '?') { for (a=c0+1;(a { switch (c1-c0) { case 4: switch (b[c0]) { case 'e': if(! bcmp(b+c0,"exit",4)) { do_exit(c,b+a,e-a); break <"done">; } break; case 'h': if(! bcmp(b+c0,"help",4)) { do_help(c,b+a,e-a); break <"done">; } break; case 'q': if(! bcmp(b+c0,"quit",4)) { do_quit(c,b+a,e-a); break <"done">; } break; case 's': if(! bcmp(b+c0,"stat",4)) { do_stat(c,b+a,e-a); break <"done">; } break; } break; } aio_oq_queue_printf(&c->oq,"%.*s: unrecognized (`help' for help)\n",c1-c0,b+c0); } while (0); } } static void rd_pcc(void *cv) { PCC *c; int nr; char rbuf[256]; int o; char *nl; c = cv; nr = read(c->fd,&rbuf[0],sizeof(rbuf)); if (nr < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } pc_kill(c); return; } if (nr == 0) { pc_kill(c); return; } o = 0; while (o < nr) { nl = memchr(&rbuf[o],'\n',nr-o); if (nl) { if (es_len(&c->ib)) { es_append_n(&c->ib,&rbuf[o],nl-&rbuf[o]); pc_iline(c,es_buf(&c->ib),es_len(&c->ib)); es_clear(&c->ib); } else { pc_iline(c,&rbuf[o],nl-&rbuf[o]); } o = (nl + 1) - &rbuf[0]; } else { es_append_n(&c->ib,&rbuf[o],nr-o); break; } } if (es_len(&c->ib) > 65536) pc_kill(c); } static void wr_pcc(void *cv) { PCC *c; int nw; c = cv; nw = aio_oq_writev(&c->oq,c->fd,-1); switch (nw) { case AIO_WRITEV_ERROR: switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } pc_kill(c); return; } if (nw < 0) { fprintf(stderr,"%s: aio_oq_writev returned impossible %d\n",__progname,nw); exit(1); } aio_oq_dropdata(&c->oq,nw); } static void pc_acc(void *arg __attribute__((__unused__))) { int new; PCC *c; new = pidconn(PIDCONN_ACCEPT,pcfd,0); if (new < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: PIDCONN_ACCEPT: %s\n",__progname,strerror(errno)); exit(1); } set_nbio(new); c = malloc(sizeof(PCC)); c->flags = 0; c->fd = new; aio_oq_init(&c->oq); es_init(&c->ib); c->pid = aio_add_poll(new,&aio_rwtest_always,&wtest_pcc,&rd_pcc,&wr_pcc,c); c->bid = AIO_NOID; c->flink = pccs; c->blink = 0; if (pccs) pccs->blink = c; pccs = c; } static void got_report(void) { switch (reportbuf.kind) { case REPORT_NORMAL: report_at = reportbuf.u.normal.at; report_of = reportbuf.u.normal.of; break; case REPORT_DONE: exit(0); break; default: fprintf(stderr,"%s: impossible report type %d from worker\n",__progname,(int)reportbuf.kind); exit(1); break; } } static void report_rd(void *arg __attribute__((__unused__))) { int nr; char rbuf[256]; int o; int n; nr = read(pcpipe,&rbuf[0],sizeof(rbuf)); if (nr < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: report read: %s\n",__progname,strerror(errno)); exit(1); } if (nr == 0) { fprintf(stderr,"%s: worker child died\n",__progname); exit(1); } o = 0; while (o < nr) { n = REPORTSIZE - reportfill; if (n > nr-o) n = nr - o; bcopy(&rbuf[o],reportfill+(char *)&reportbuf,n); reportfill += n; o += n; if (reportfill >= REPORTSIZE) { got_report(); reportfill = 0; } } } static void run_parent(void) { aio_poll_init(); set_nbio(pcfd); set_nbio(pcpipe); pcaid = aio_add_poll(pcfd,&aio_rwtest_always,&aio_rwtest_never,&pc_acc,0,0); reportid = aio_add_poll(pcpipe,&aio_rwtest_always,&aio_rwtest_never,&report_rd,0,0); reportfill = 0; aio_event_loop(); exit(1); } void maybe_report_status(ULLI at, ULLI of) { time_t t; REPORT r; if (no_pidconn) return; t = time(0); if (t == last_report) return; last_report = t; r.kind = REPORT_NORMAL; r.u.normal.at = at; r.u.normal.of = of; write(pcpipe,&r,sizeof(r)); } void maybe_report_done(void) { REPORT r; if (no_pidconn) return; r.kind = REPORT_DONE; write(pcpipe,&r,sizeof(r)); } void setup_pidconn(void) { pid_t kid; int p[2]; if (no_pidconn) return; pcfd = pidconn(PIDCONN_LISTEN,0,0); if (pcfd < 0) { if (errno == ENOSYS) { fprintf(stderr,"%s: no pidconn support in this kernel\n",__progname); return; } fprintf(stderr,"%s: PIDCONN_LISTEN: %s\n",__progname,strerror(errno)); exit(1); } if (socketpair(AF_LOCAL,SOCK_STREAM,0,&p[0]) < 0) { fprintf(stderr,"%s: AF_LOCAL socketpair: %s\n",__progname,strerror(errno)); exit(1); } fflush(0); kid = fork(); if (kid == 0) { close(pcfd); close(p[0]); pcpipe = p[1]; return; } else { close(p[1]); pcpipe = p[0]; run_parent(); _exit(1); } }