#include #include #include #include #include #include #include #include #include extern const char *__progname; #include "nf.h" #include "oq.h" #include "pollloop.h" #include "scm-rights.h" #include "node.h" typedef struct adj ADJ; struct adj { ADJ *flink; ADJ *blink; int fd; int ioid; int blid; unsigned int id; OQ oq; unsigned char *rbuf; int rbmax; int rbfill; int rbptr; } ; static int cfd; static int dfd; static char *name; static unsigned char *msg; static int msglen; static unsigned int myid; static ADJ *adjs; static void msgspace(int nb) { if (nb > msglen) { free(msg); msglen = nb; msg = malloc(msglen); } } static void set_nonblock(int fd) { fcntl(fd,F_SETFL,fcntl(fd,F_GETFL,0)|O_NONBLOCK); } static void gen_bufsize(int n) { unsigned char msg[1+sizeof(int)]; msg[0] = NODEMSG_BUFSIZE; bcopy(&n,&msg[1],sizeof(int)); nf_send(cfd,&msg[0],1+sizeof(int),0); } static void outmsg(const char *, ...) __attribute__((__format__(__printf__,1,2))); static void outmsg(const char *fmt, ...) { va_list ap; char *s; int len; unsigned char hdr; struct iovec iov[2]; struct msghdr mh; va_start(ap,fmt); len = vasprintf(&s,fmt,ap); va_end(ap); gen_bufsize(1+len); hdr = NODEMSG_OUTPUT; iov[0].iov_base = &hdr; iov[0].iov_len = 1; iov[1].iov_base = s; iov[1].iov_len = len; mh.msg_name = 0; mh.msg_namelen = 0; mh.msg_iov = &iov[0]; mh.msg_iovlen = 2; mh.msg_control = 0; mh.msg_controllen = 0; mh.msg_flags = 0; nf_sendmsg(cfd,&mh,0); free(s); } static void rbroom(ADJ *a, int room) { if (a->rbmax < a->rbfill+room) { a->rbmax = a->rbfill + room; if (a->rbfill > 0) { a->rbuf = realloc(a->rbuf,a->rbmax); } else { free(a->rbuf); a->rbuf = malloc(a->rbmax); } } } static void kill_adjacency(ADJ *a) { close(a->fd); remove_poll_id(a->ioid); remove_block_id(a->blid); oq_flush(&a->oq); free(a->rbuf); free(a); } static int process_message(ADJ *a) { int n; if (a->rbptr >= a->rbfill) return(0); n = a->rbuf[a->rbptr]; if (a->rbptr+n > a->rbfill) return(0); a->rbptr += n; return(1); } static int wtest_adj(void *av) { return(oq_nonempty(&((ADJ *)av)->oq)); } static void rd_adj(void *av) { ADJ *a; int r; a = av; rbroom(a,8192); r = read(a->fd,&a->rbuf[a->rbfill],a->rbmax-a->rbfill); if (r < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: node %s: adjacency to %d read error: %s\n",__progname,name,a->id,strerror(errno)); kill_adjacency(a); return; } if (r == 0) { fprintf(stderr,"%s: node %s: adjacency to %d read EOF\n",__progname,name,a->id); kill_adjacency(a); return; } a->rbfill += r; a->rbptr = 0; while (process_message(a)) ; if (a->rbptr > 0) { if (a->rbptr < a->rbfill) { bcopy(a->rbuf+a->rbptr,a->rbuf,a->rbfill-a->rbptr); a->rbfill -= a->rbptr; } else { a->rbfill = 0; a->rbptr = 0; } } } static void wr_adj(void *av) { ADJ *a; int w; a = av; w = oq_writev(&a->oq,a->fd); if (w < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: node %s: adjacency to %d write error: %s\n",__progname,name,a->id,strerror(errno)); kill_adjacency(a); return; } if (w == 0) { fprintf(stderr,"%s: node %s: adjacency to %d zero write\n",__progname,name,a->id); kill_adjacency(a); return; } oq_dropdata(&a->oq,w); } static int block_adj(void *av) { ADJ *a; a = av; return(BLOCK_NIL); } static void adjacency_up(unsigned int id, int fd) { ADJ *a; a = malloc(sizeof(ADJ)); a->fd = fd; a->id = id; oq_init(&a->oq); a->ioid = add_poll_fd(fd,&rwtest_always,&wtest_adj,&rd_adj,&wr_adj,a); a->blid = add_block_fn(&block_adj,a); a->rbuf = 0; a->rbmax = 0; a->rbfill = 0; a->flink = adjs; a->blink = 0; set_nonblock(fd); adjs = a; if (a->flink) a->flink->blink = a; outmsg("adjacency_up %s (%d) to %d, fd %d\n",name,myid,id,fd); } static void rd_node_ctl(void *arg __attribute__((__unused__))) { unsigned char ctlbuf[CMSPACE(sizeof(int))]; struct msghdr mh; struct cmsghdr cmh; struct iovec iov; int rv; int fd; int val; iov.iov_base = msg; iov.iov_len = msglen; mh.msg_name = 0; mh.msg_namelen = 0; mh.msg_iov = &iov; mh.msg_iovlen = 1; mh.msg_control = &ctlbuf[0]; mh.msg_controllen = sizeof(ctlbuf); mh.msg_flags = 0; rv = recvmsg(cfd,&mh,0); if (rv < 0) { fprintf(stderr,"%s: node %s recvmsg: %s\n",__progname,name,strerror(errno)); exit(0); } if (rv == 0) { fprintf(stderr,"%s: node %s empty recvmsg\n",__progname,name); exit(0); } switch(msg[0]) { case NODEMSG_LINKUP: if (mh.msg_controllen < sizeof(struct cmsghdr)) { fprintf(stderr,"%s: node %s LINKUP controllen (%d) too small (< %d)\n",__progname,name,(int)mh.msg_controllen,(int)sizeof(struct cmsghdr)); exit(0); } bcopy(&ctlbuf[0],&cmh,sizeof(cmh)); if (cmh.cmsg_len < CMLEN(sizeof(int))) { fprintf(stderr,"%s: node %s LINKUP cmsg len (%d) too small (< %d)\n",__progname,name,(int)cmh.cmsg_len,(int)CMLEN(sizeof(int))); exit(0); } bcopy(&ctlbuf[CMSKIP(&cmh)],&fd,sizeof(int)); bcopy(&msg[1],&val,sizeof(int)); outmsg("node %d (%s) link up to node %d\n",myid,name,val); adjacency_up(val,fd); break; default: fprintf(stderr,"%s: node %s message %02x\n",__progname,name,msg[0]); exit(0); break; } } static void rd_node_death(void *arg __attribute__((__unused__))) { fprintf(stderr,"%s: node %s: parent death pipe readable\n",__progname,name); exit(0); } void node_main(int cfdarg, int dfdarg) { int rv; cfd = cfdarg; dfd = dfdarg; setproctitle("node"); msg = 0; msglen = 0; msgspace(sizeof(int)); rv = recv(cfd,msg,msglen,0); if (rv != sizeof(int)) { fprintf(stderr,"%s: node: startup protocol botch: first recv returned %d, not %d\n",__progname,rv,(int)sizeof(int)); exit(0); } bcopy(msg,&rv,sizeof(int)); msgspace(rv); rv = recv(cfd,msg,msglen,0); if (rv < 0) { fprintf(stderr,"%s: node: startup protocol botch: second recv: %s\n",__progname,strerror(errno)); exit(0); } name = malloc(rv+1); if (rv) bcopy(msg,name,rv); name[rv] = '\0'; msgspace(sizeof(int)); rv = recv(cfd,msg,msglen,0); if (rv != sizeof(int)) { fprintf(stderr,"%s: node: startup protocol botch: third recv returned %d, not %d\n",__progname,rv,(int)sizeof(int)); exit(0); } bcopy(msg,&myid,sizeof(int)); setproctitle("node %s (%d)",name,myid); msgspace(1+sizeof(int)); adjs = 0; init_polling(); add_poll_fd(cfd,&rwtest_always,&rwtest_never,&rd_node_ctl,0,0); add_poll_fd(dfd,&rwtest_always,&rwtest_never,&rd_node_death,0,0); poll_run(); }