/* This file is in the public domain. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define BW_TIME_TOLERANCE 100000 /* 10ms, in struct timeval units */ #define FIXisspace(x) isspace((unsigned char)(x)) #define FIXisdigit(x) isdigit((unsigned char)(x)) #include "lb.h" #include "getput.h" #include "backing.h" #include "pollloop.h" #include "timer-socket.h" #ifndef NI_WITHSCOPEID #define NI_WITHSCOPEID 0 #endif extern const char *__progname; static const char *rnddev = "/dev/urandom"; static const unsigned char pre_s2c[2] = { 0x73, 0x2d }; static const unsigned char pre_c2s[2] = { 0x63, 0x2d }; static const unsigned char post_s2c[2] = { 0x3e, 0x63 }; static const unsigned char post_c2s[2] = { 0x3e, 0x73 }; static const unsigned char pingmsg[1] = { LB_PING }; static const unsigned char pongmsg[1] = { LB_PONG }; static const unsigned char abortedmsg[1] = { LB_ABORTED }; static int verbose = 0; static const char *configfile = 0; typedef enum { CS_RR_A = 1, CS_RR_B, CS_RR_C, CS_UP, CS_DEAD } CONNSTATE; typedef struct acc ACC; typedef struct accops ACCOPS; typedef struct accsock ACCSOCK; typedef struct conn CONN; typedef struct oq OQ; typedef struct ob OB; typedef struct client CLIENT; typedef struct conflict CONFLICT; typedef struct cclist CCLIST; typedef struct config CONFIG; typedef struct admin ADMIN; typedef struct bwclass BWCLASS; typedef struct bwcm BWCM; typedef struct backtype BACKTYPE; struct backtype { const char *name; const BACKING *ops; } ; struct config { CLIENT *clients; ACC *admin; BWCLASS *bwclasses; } ; struct bwclass { BWCLASS *link; char *name; int namelen; unsigned long long int Bps; unsigned int tolerance; int nmembers; BWCM *members; unsigned int ioflags; #define BWIOF_RBLOCK 0x00000001 #define BWIOF_WBLOCK 0x00000002 } ; /* the _bw elements form a DLL rooted in the bwclass (.members); the _cl elements form a DLL rooted in the client (.bwclasses). */ struct bwcm { BWCM *f_bw; BWCM *b_bw; BWCM *f_cl; BWCM *b_cl; CLIENT *client; BWCLASS *bwclass; unsigned int r_debt; unsigned int w_debt; unsigned int ioflags; #define BWCMIO_RBLOCK 0x00000001 #define BWCMIO_WBLOCK 0x00000002 } ; /* the _cf elements form a DLL rooted in the conflict (.clients); the _cl elements form a DLL rooted in the client (.conflicts). */ struct cclist { CCLIST *f_cf; CCLIST *b_cf; CCLIST *f_cl; CCLIST *b_cl; CLIENT *client; CONFLICT *conflict; } ; struct conflict { CONFLICT *link; char *tag; int taglen; CONN *busy; CCLIST *clients; } ; struct client { CLIENT *link; char *backingname; const BACKING *backops; void *backvp; char *keyfile; CCLIST *conflicts; ACC *accs; char *keydata; int keylen; CONN *conns; void *tmp; BWCM *bwclasses; } ; struct acc { ACC *link; char *hoststr; char *portstr; ACCSOCK *socks; ACCOPS *ops; void *private; } ; struct accops { void (*dbgprint)(FILE *, ACC *, CLIENT *); void (*accept)(ACCSOCK *, int, struct sockaddr_storage *, int); } ; struct accsock { ACCSOCK *link; ACC *a; struct sockaddr *addr; int addrlen; int fd; int id; char *txt; } ; struct oq { OB *q; OB **qt; int qlen; } ; struct ob { OB *link; const char *data; int left; char *free; } ; struct admin { ADMIN *flink; ADMIN *blink; struct sockaddr_storage from; int fromlen; int fd; char *b; int a; int l; OQ oq; int ioid; int bid; unsigned int flags; #define AF_DEAD 0x00000001 #define AF_DRAIN 0x00000002 char *prompt; } ; struct conn { CONN *link; ACCSOCK *s; CLIENT *c; CONNSTATE state; struct sockaddr_storage from; int fromlen; int fd; int id; int crid; OQ oq; int cfill; int clen; int sumid; unsigned int start; unsigned int hand; unsigned int end; unsigned int bf; unsigned int ckt; unsigned int cklen; int tmofd; int tmoid; int rbfill; int rbptr; unsigned int countdown; ARC4_STATE s2cenc; ARC4_STATE c2senc; unsigned char srnd[16]; unsigned char crbuf[517]; unsigned char rbuf[8192]; char *status; } ; static const BACKTYPE backtypes[] = { { "simple", &backing_simple }, { "snapshot", &backing_snapshot } }; #define N_BACKTYPES (sizeof(backtypes)/sizeof(backtypes[0])) #define DEFAULT_BACKOPS (&backing_simple) static int backtype_namelens[N_BACKTYPES] = { -1 }; static int rndfd; static CONFIG config; static int sighup_pipe_r; static volatile int sighup_pipe_w; static int sighup_pipe_id; static int sighup_block_id; static CONFLICT *conflicts; static ADMIN *admins; int cwd_fd; static void *dequal(volatile const void *v) { void *tmp; tmp = &v; return(*(void **)tmp); } static void vprf(int, const char *, ...) __attribute__((__format__(__printf__,2,3))); static void vprf(int level, const char *fmt, ...) { va_list ap; if (verbose < level) return; va_start(ap,fmt); vprintf(fmt,ap); va_end(ap); } #define ISV(n) (verbose >= (n)) static void panic(const char *, ...) __attribute__((__format__(__printf__,1,2),__noreturn__)); static void panic(const char *fmt, ...) { va_list ap; static int panicking = 0; fprintf(stderr,"%s: panic: ",__progname); va_start(ap,fmt); vfprintf(stderr,fmt,ap); va_end(ap); fprintf(stderr,"\n"); if (ISV(1) && !panicking) { CLIENT *c; ACC *a; CONN *n; CCLIST *l; CCLIST *lb; CONFLICT *f; fflush(0); panicking = 1; for (c=config.clients;c;c=c->link) { printf("client %p: %s\n",(void *)c,c->backingname); for (a=c->accs;a;a=a->link) { printf(" acc %p: %s/%s\n",(void *)a,a->hoststr,a->portstr); (*a->ops->dbgprint)(stdout,a,c); } for (n=c->conns;n;n=n->link) { printf(" conn %p: sock %p state ",(void *)n->s,(void *)n); switch (n->state) { case CS_RR_A: printf("RR_A"); break; case CS_RR_B: printf("RR_B"); break; case CS_RR_C: printf("RR_C"); break; case CS_UP: printf("UP"); break; case CS_DEAD: printf("DEAD"); break; default: printf("?%d",n->state); break; } printf(" fd %d oq %d cfill %d/%d\n",n->fd,n->oq.qlen,n->cfill,n->clen); } printf(" conflicts\n"); lb = 0; for (l=c->conflicts;l;l=l->f_cl) { printf(" %p %p\n",(void *)l,(void *)l->conflict); if (l->client != c) printf("*** client backlink %p\n",(void *)l->client); if (l->b_cl != lb) printf("*** b_cl %p\n",(void *)l->b_cl); lb = l; } } for (f=conflicts;f;f=f->link) { printf("conflict %p: %.*s busy=%p\n",(void *)f,f->taglen,f->tag,(void *)f->busy); lb = 0; for (l=f->clients;l;l=l->f_cf) { printf(" %p %p\n",(void *)l,(void *)l->client); if (l->conflict != f) printf("*** conflict backlink %p\n",(void *)l->conflict); if (l->b_cf != lb) printf("*** b_cf %p\n",(void *)l->b_cf); lb = l; } } } abort(); } static void handleargs(int ac, char **av) { int skip; int errs; void setconfig(const char *s) { if (configfile) { fprintf(stderr,"%s: config file already specified, using later value\n",__progname); } configfile = s; } skip = 0; errs = 0; for (ac--,av++;ac;ac--,av++) { if (skip > 0) { skip --; continue; } if (**av != '-') { if (! configfile) { setconfig(*av); } else { fprintf(stderr,"%s: stray argument `%s'\n",__progname,*av); errs ++; } continue; } if (0) { needarg:; fprintf(stderr,"%s: %s needs a following argument\n",__progname,*av); errs ++; continue; } #define WANTARG() do { if (++skip >= ac) goto needarg; } while (0) if (!strcmp(*av,"-v")) { verbose ++; continue; } if (!strcmp(*av,"-config")) { WANTARG(); setconfig(av[skip]); continue; } #undef WANTARG fprintf(stderr,"%s: unrecognized option `%s'\n",__progname,*av); errs ++; } if (errs) exit(1); } static void oq_init(OQ *q) { q->q = 0; q->qt = &q->q; q->qlen = 0; } static void oq_flusho(OQ *q) { OB *b; while (q->q) { b = q->q; q->q = b->link; q->qlen -= b->left; free(b->free); free(b); } q->qt = &q->q; if (q->qlen) panic("qlen != 0 after flush"); } static void oq_queue_copy(OQ *q, const void *data, int len) { OB *b; if (len < 0) len = strlen(data); if (len < 1) return; b = malloc(sizeof(OB)+len); b->link = 0; bcopy(data,b+1,len); *q->qt = b; q->qt = &b->link; b->data = (void *)(b+1); b->left = len; b->free = 0; q->qlen += len; } static void oq_queue_free(OQ *q, void *data, int len) { OB *b; if (len < 0) len = strlen(data); if (len < 1) return; b = malloc(sizeof(OB)); b->link = 0; *q->qt = b; q->qt = &b->link; b->data = data; b->left = len; b->free = data; q->qlen += len; } static void oq_queue_point(OQ *q, const void *data, int len) { OB *b; if (len < 0) len = strlen(data); if (len < 1) return; b = malloc(sizeof(OB)); b->link = 0; *q->qt = b; q->qt = &b->link; b->data = data; b->left = len; b->free = 0; q->qlen += len; } static void oq_printf(OQ *, const char *, ...) __attribute__((__format__(__printf__,2,3))); static void oq_printf(OQ *q, const char *fmt, ...) { va_list ap; char *s; int l; va_start(ap,fmt); l = vasprintf(&s,fmt,ap); va_end(ap); oq_queue_free(q,s,l); } static int oq_write(OQ *q, int fd) { static int maxiov = -1; static struct iovec *iov; static int iovn; int iovl; OB *b; int rv; int w; if (! q->q) return(0); if (maxiov < 0) { int mib[2]; size_t oldlen; mib[0] = CTL_KERN; mib[1] = KERN_IOV_MAX; oldlen = sizeof(maxiov); if (sysctl(&mib[0],2,&maxiov,&oldlen,0,0) < 0) { fprintf(stderr,"%s: can't get kern.iov_max: %s\n",__progname,strerror(errno)); exit(1); } if (maxiov > 64) maxiov = 64; if (maxiov < 1) maxiov = 1; iov = 0; iovn = 0; } iovl = 0; for (b=q->q;b;b=b->link) { if (iovl >= maxiov) break; if (iovl >= iovn) iov = realloc(iov,(iovn=iovl+4)*sizeof(*iov)); iov[iovl++] = (struct iovec){ .iov_base=dequal(b->data), .iov_len=b->left }; } rv = writev(fd,iov,iovl); if (rv < 0) { switch (errno) { case EWOULDBLOCK: case EINTR: return(0); break; } return(-1); } w = rv; while ((b=q->q) && w && (w >= b->left)) { q->q = b->link; q->qlen -= b->left; w -= b->left; free(b->free); free(b); } if (b) { if (w) { b->data += w; b->left -= w; q->qlen -= w; } } else { q->qt = &q->q; } if (q->qlen < 0) panic("queue underflow"); if (q->qlen && !q->q) panic("queue lost"); return(rv); } static void wcrypted(CONN *c, const void *data, int len) { unsigned char *buf; buf = malloc(len); arc4_crypt(&c->s2cenc,data,len,buf); oq_queue_free(&c->oq,buf,len); } static void crypto_step1(CONN *c) { void *h; int i; int j; int k; unsigned char m[20]; unsigned char key[257]; bzero(&key[0],237); for (i=0;i<32;i++) { h = sha1_init(); sha1_process_bytes(h,c->c->keydata,c->c->keylen); sha1_process_bytes(h,&c->srnd[0],16); sha1_process_bytes(h,&pre_s2c[0],2); if (i) sha1_process_bytes(h,&m[0],20); sha1_process_bytes(h,&post_s2c[0],2); sha1_process_bytes(h,&c->crbuf[0],16); sha1_process_bytes(h,c->c->keydata,c->c->keylen); sha1_result(h,&m[0]); for (j=i*7,k=0;k<20;j++,k++) key[j] += m[k]; } h = sha1_init(); sha1_process_bytes(h,&key[0],237); sha1_result(h,&key[237]); arc4_init(&c->s2cenc); arc4_setkey(&c->s2cenc,&key[0],256,65536); bzero(&key[0],237); for (i=0;i<32;i++) { h = sha1_init(); sha1_process_bytes(h,c->c->keydata,c->c->keylen); sha1_process_bytes(h,&c->crbuf[0],16); sha1_process_bytes(h,&pre_c2s[0],2); if (i) sha1_process_bytes(h,&m[0],20); sha1_process_bytes(h,&post_c2s[0],2); sha1_process_bytes(h,&c->srnd[0],16); sha1_process_bytes(h,c->c->keydata,c->c->keylen); sha1_result(h,&m[0]); for (j=i*7,k=0;k<20;j++,k++) key[j] += m[k]; } h = sha1_init(); sha1_process_bytes(h,&key[0],237); sha1_result(h,&key[237]); arc4_init(&c->c2senc); arc4_setkey(&c->c2senc,&key[0],256,65536); read(rndfd,&c->srnd[0],16); bzero(&c->crbuf[0],16); bzero(&key[0],sizeof(key)); wcrypted(c,&c->srnd[0],16); c->cfill = 0; c->state = CS_RR_B; } static void crypto_step2(CONN *c) { wcrypted(c,&c->crbuf[0],16); c->cfill = 0; c->clen = 16; c->state = CS_RR_C; } static void crypto_step3(CONN *c) { CONN *c2; unsigned char versmsg[5]; if (bcmp(&c->crbuf[0],&c->srnd[0],16)) { c->state = CS_DEAD; return; } vprf(1,"%p (%d %s) now up\n",(void *)c,c->fd,c->c->backingname); for (c2=c->c->conns;c2;c2=c2->link) { if (c2->state == CS_UP) { vprf(1,"killing %p (fd %d)\n",(void *)c2,c2->fd); c2->state = CS_DEAD; } } c->cfill = 0; c->clen = 1; c->state = CS_UP; versmsg[0] = LB_VERSION; put_4(&versmsg[1],CURRENT_VERSION); wcrypted(c,&versmsg[0],5); } static int rtest_conn(RWTESTFN_ARGS) { CONN *c; BWCM *m; c = arg; if ((c->state == CS_DEAD) || (c->rbptr < c->rbfill)) { if (ISV(2)) printf("client %s R not interested\n",c->c->backingname); return(0); } if (c->state != CS_UP) { if (ISV(2)) printf("client %s R not yet up\n",c->c->backingname); return(1); } for (m=c->c->bwclasses;m;m=m->f_cl) { if (m->r_debt > m->bwclass->tolerance) { if (c->c != m->client) abort(); m->bwclass->ioflags |= BWIOF_RBLOCK; m->ioflags |= BWCMIO_RBLOCK; if (ISV(2)) { printf("client %s R blocking on %.*s (debt %u tolerance %u)\n", m->client->backingname,m->bwclass->namelen,m->bwclass->name, m->r_debt,m->bwclass->tolerance); } return(0); } } if (ISV(2)) printf("client %s R not blocking\n",c->c->backingname); return(1); } static int wtest_conn(RWTESTFN_ARGS) { CONN *c; BWCM *m; c = arg; if (!c->oq.q || (c->state == CS_DEAD)) { if (ISV(2)) printf("client %s W not interested\n",c->c->backingname); return(0); } if (c->state != CS_UP) { if (ISV(2)) printf("client %s W not yet up\n",c->c->backingname); return(1); } for (m=c->c->bwclasses;m;m=m->f_cl) { if (m->w_debt > m->bwclass->tolerance) { if (c->c != m->client) abort(); m->bwclass->ioflags |= BWIOF_WBLOCK; m->ioflags |= BWCMIO_WBLOCK; if (ISV(2)) { printf("client %s W blocking on %.*s (debt %u tolerance %u)\n", m->client->backingname,m->bwclass->namelen,m->bwclass->name, m->w_debt,m->bwclass->tolerance); } return(0); } } if (ISV(2)) printf("client %s W not blocking\n",c->c->backingname); return(1); } static void save_data(CLIENT *c, unsigned int bno, const void *data) { int w; w = (*c->backops->write)(c->backvp,bno,data); if (w < 0) { fprintf(stderr,"%s: %s: write block %u: %s\n",__progname,c->backingname,bno,strerror(errno)); exit(1); } if (w != 512) { fprintf(stderr,"%s: %s: write block %u: wanted %d, did %d\n",__progname,c->backingname,bno,512,w); exit(1); } } static void compute_cksum(int ckt, const void *data, void *into) { void *h; switch (ckt) { case CKT_SHA1: h = sha1_init(); sha1_process_bytes(h,data,512); sha1_result(h,into); break; #if 0 case CKT_SUM_SHA1: { unsigned char s; int i; s = 0; for (i=0;i<512;i++) s += ((const unsigned char *)data)[i]; ((unsigned char *)into)[0] = s; h = sha1_init(); sha1_process_bytes(h,data,512); sha1_result(h,((unsigned char *)into)+1); } break; #endif default: panic("impossible ckt %d",ckt); break; } } static void stop_sums(CONN *c) { CCLIST *cl; if (c->sumid != PL_NOID) { remove_block_id(c->sumid); c->sumid = PL_NOID; for (cl=c->c->conflicts;cl;cl=cl->f_cl) { /* cl->conflict->busy != c can happen; consider a client that is blocked because of a conflict disconnecting or going passive. */ if (cl->conflict->busy == c) cl->conflict->busy = 0; } } } static int sum_block(BLOCKFN_ARGS) { CONN *c; int n; int i; char buf[256*512]; c = arg; if (c->oq.qlen > 1048576) return(0); if (c->hand >= c->end) { stop_sums(c); return(1); } n = c->bf; if (c->end-c->hand < n) n = c->end - c->hand; i = (*c->c->backops->read)(c->c->backvp,c->hand,n,&buf[0]); if (i < 0) { fprintf(stderr,"%s: backing file read error: %s\n",__progname,strerror(errno)); exit(1); } if (i < n*512) bzero(&buf[i],(n*512)-i); for (i=0;ickt,&buf[i*512],&buf[5+(i*c->cklen)]); } buf[0] = LB_SUMS; vprf(2,"%s sending %s %u\n",c->c->backingname,msg_name(LB_SUMS),c->hand); put_4(&buf[1],c->hand); wcrypted(c,&buf[0],5+(n*20)); c->hand += n; return(1); } static int await_start_sums(BLOCKFN_ARGS) { CONN *c; CCLIST *cl; c = arg; if (! c->c->conflicts) panic("await_start_sums no conflicts"); for (cl=c->c->conflicts;cl;cl=cl->f_cl) if (cl->conflict->busy) return(0); remove_block_id(c->sumid); c->sumid = add_block_fn(&sum_block,c); for (cl=c->c->conflicts;cl;cl=cl->f_cl) cl->conflict->busy = c; return(1); } static void start_sums(CONN *c) { CCLIST *cl; c->start = get_4(&c->crbuf[1]); c->hand = c->start; c->end = c->hand + get_4(&c->crbuf[5]); c->bf = c->crbuf[9]; c->ckt = c->crbuf[10]; switch (c->ckt) { case CKT_SHA1: c->cklen = 20; break; #if 0 case CKT_SUM_SHA1: c->cklen = 21; break; #endif default: fprintf(stderr,"%s: (%s) unknown checksum type 0x%02x\n",__progname,c->c->backingname,c->ckt); c->state = CS_DEAD; return; break; } do <"blocked"> { for (cl=c->c->conflicts;cl;cl=cl->f_cl) { if (cl->conflict->busy) { if (cl->conflict->busy == c) panic("duplicate busy"); if ( (cl->conflict->busy->c == c->c) && (cl->conflict->busy->state != CS_DEAD) ) panic("busy conflict"); c->sumid = add_block_fn(&await_start_sums,c); break <"blocked">; } } c->sumid = add_block_fn(&sum_block,c); for (cl=c->c->conflicts;cl;cl=cl->f_cl) cl->conflict->busy = c; } while (0); } static void set_size(CONN *c) { (*c->c->backops->set_size)(c->c->backvp,get_4(&c->crbuf[1])); /* ignore errors, in case it's really a disk or some such */ } static void set_status(CONN *c) { free(c->status); c->status = malloc(c->crbuf[1]+1); if (c->crbuf[1]) bcopy(&c->crbuf[2],c->status,c->crbuf[1]); c->status[c->crbuf[1]] = '\0'; } static void rd_conn(RWFN_ARGS) { CONN *c; int r; BWCM *m; c = arg; if (c->state == CS_DEAD) return; if (c->rbptr < c->rbfill) return; r = read(c->fd,&c->rbuf[0],sizeof(c->rbuf)); if (r < 0) { switch (errno) { case EWOULDBLOCK: case EINTR: return; break; } fprintf(stderr,"%s: client read error: %s\n",__progname,strerror(errno)); c->state = CS_DEAD; return; } if (r == 0) { if ((c->state != CS_UP) || (c->cfill > 0)) { fprintf(stderr,"%s: unexpected client EOF\n",__progname); } vprf(1,"client EOF fd %d, conn %p, %s\n",c->fd,(void *)c,c->c->backingname); c->state = CS_DEAD; return; } c->rbfill = r; c->rbptr = 0; for (m=c->c->bwclasses;m;m=m->f_cl) { if (ISV(2)) { printf("client %s class %.*s r_debt += %d\n", m->client->backingname,m->bwclass->namelen,m->bwclass->name,r); } m->r_debt += r; } } static void wr_conn(RWFN_ARGS) { CONN *c; int n; BWCM *m; c = arg; if (c->state == CS_DEAD) return; n = oq_write(&c->oq,c->fd); if (n < 0) { fprintf(stderr,"%s: network write [%s]: %s\n",__progname,c->c->backingname,strerror(errno)); c->state = CS_DEAD; } for (m=c->c->bwclasses;m;m=m->f_cl) { if (ISV(2)) { printf("client %s class %.*s w_debt += %d\n", m->client->backingname,m->bwclass->namelen,m->bwclass->name,n); } m->w_debt += n; } } static void client_version(CONN *c, unsigned int vers) { if (vers != CURRENT_VERSION) { fprintf(stderr,"%s: version mismatch (us %u, %s %u)\n",__progname,CURRENT_VERSION,c->c->backingname,vers); c->state = CS_DEAD; } } static void capmsg(CONN *c, int subtype, const unsigned char *capname, int capnamelen, const unsigned char *rest) { unsigned char replyhdr[3]; switch (subtype) { case LB_CAP_QUERY: replyhdr[0] = LB_CAP; replyhdr[1] = LB_CAP_NEGATIVE; replyhdr[2] = capnamelen; wcrypted(c,&replyhdr[0],3); if (capnamelen > 0) wcrypted(c,capname,capnamelen); break; case LB_CAP_NEGATIVE: case LB_CAP_POSITIVE: rest=rest; fprintf(stderr,"%s: incorrect capmsg reply\n",c->c->backingname); c->state = CS_DEAD; break; default: panic("impossible capmsg subtype %02x",subtype); break; } } static int block_conn(BLOCKFN_ARGS) { CONN *c; int l; int r; c = arg; switch (c->state) { case CS_RR_A: case CS_RR_B: case CS_RR_C: l = 16 - c->cfill; break; case CS_UP: l = c->clen - c->cfill; break; case CS_DEAD: return(0); break; default: panic("block_conn bad state 1"); break; } if (c->rbptr >= c->rbfill) return(0); r = c->rbfill - c->rbptr; if (r > l) r = l; switch (c->state) { case CS_RR_A: bcopy(&c->rbuf[c->rbptr],&c->crbuf[c->cfill],r); break; default: arc4_crypt(&c->c2senc,&c->rbuf[c->rbptr],r,&c->crbuf[c->cfill]); break; } c->rbptr += r; c->cfill += r; if (r < l) return(1); switch (c->state) { case CS_RR_A: crypto_step1(c); break; case CS_RR_B: crypto_step2(c); break; case CS_RR_C: crypto_step3(c); break; case CS_UP: vprf(2,"%s msg %s fill %d\n",c->c->backingname,msg_name(c->crbuf[0]),c->cfill); switch (c->crbuf[0]) { case LB_VERSION: if (c->cfill == 1) { c->clen = 5; return(1); } client_version(c,get_4(&c->crbuf[1])); break; case LB_DATA: if (c->cfill == 1) { c->clen = 517; return(1); } save_data(c->c,get_4(&c->crbuf[1]),&c->crbuf[5]); break; case LB_RQSUMS: if (c->sumid != PL_NOID) { fprintf(stderr,"%s: RQSUMS during another RQSUMS\n",__progname); c->state = CS_DEAD; return(1); } if (c->cfill == 1) { c->clen = 11; return(1); } start_sums(c); break; case LB_STOPSUM: stop_sums(c); vprf(2,"%s sending %s\n",c->c->backingname,msg_name(abortedmsg[0])); wcrypted(c,&abortedmsg[0],1); break; case LB_SIZE: if (c->cfill == 1) { c->clen = 5; return(1); } set_size(c); break; case LB_STATUS: switch (c->cfill) { case 1: c->clen = 2; return(1); break; case 2: c->clen += c->crbuf[1]; if (c->clen > 2) return(1); /* fall through */ } set_status(c); break; case LB_PING: vprf(2,"%s sending %s\n",c->c->backingname,msg_name(pongmsg[0])); wcrypted(c,&pongmsg[0],1); break; case LB_PONG: break; case LB_CAP: switch (c->cfill) { case 1: c->clen = 3; return(1); break; case 3: switch (c->crbuf[1]) { case LB_CAP_QUERY: case LB_CAP_NEGATIVE: break; case LB_CAP_POSITIVE: c->clen ++; break; default: fprintf(stderr,"%s: bad LB_CAP subtype %02x\n",c->c->backingname,c->crbuf[1]); c->state = CS_DEAD; return(1); break; } c->clen += c->crbuf[2]; if (c->clen > 3) return(1); break; } capmsg(c,c->crbuf[1],&c->crbuf[3],c->crbuf[2],&c->crbuf[3+c->crbuf[2]]); break; default: fprintf(stderr,"%s: bad packet type %02x\n",c->c->backingname,c->crbuf[0]); c->state = CS_DEAD; break; } c->cfill = 0; c->clen = 1; break; default: panic("block_conn bad state 2"); break; } return(1); } static void free_conn(CONN *n) { CCLIST *l; for (l=n->c->conflicts;l;l=l->f_cl) { if (l->conflict->busy == n) { l->conflict->busy = 0; } } remove_poll_id(n->id); remove_block_id(n->crid); close(n->fd); if (n->sumid != PL_NOID) remove_block_id(n->sumid); remove_poll_id(n->tmoid); close(n->tmofd); oq_flusho(&n->oq); free(n); } static int weed_conns(BLOCKFN_ARGS) { CLIENT *c; CONN *n; CONN **np; int rv; rv = 0; for (c=config.clients;c;c=c->link) { np = &c->conns; while ((n = *np)) { if (n->state == CS_DEAD) { *np = n->link; free_conn(n); rv = 1; } else { np = &n->link; } } } return(rv); } static void rd_tmo(RWFN_ARGS) { CONN *c; struct timersock_event e; c = arg; read(c->tmofd,&e,sizeof(e)); if (c->state == CS_DEAD) return; if (c->countdown > 0) { c->countdown --; } else { switch (c->state) { case CS_RR_A: case CS_RR_B: case CS_RR_C: vprf(1,"dropping %p fd %d, timeout\n",(void *)c,c->fd); c->state = CS_DEAD; break; case CS_UP: vprf(2,"%s sending %s\n",c->c->backingname,msg_name(pingmsg[0])); wcrypted(c,&pingmsg[0],1); c->countdown = 10; break; default: abort(); break; } } } static void acc_accept(RWFN_ARGS) { ACCSOCK *s; int new; struct sockaddr_storage ss; int sslen; int on; s = arg; sslen = sizeof(ss); new = accept(s->fd,(void *)&ss,&sslen); if (new < 0) { switch (errno) { case EWOULDBLOCK: case EINTR: return; break; } fprintf(stderr,"%s: accept (%s): %s\n",__progname,s->txt,strerror(errno)); return; } fcntl(new,F_SETFL,fcntl(new,F_GETFL,0)|O_NONBLOCK); on = 1; setsockopt(new,SOL_SOCKET,SO_KEEPALIVE,&on,sizeof(on)); (*s->a->ops->accept)(s,new,&ss,sslen); } static char *blk_to_str(const void *data, int len) { char *t; t = malloc(len+1); bcopy(data,t,len); t[len] = '\0'; return(t); } static void free_accsock_list(ACCSOCK *l) { ACCSOCK *s; while (l) { s = l; l = s->link; free(s->addr); if (s->fd >= 0) close(s->fd); if (s->id != PL_NOID) remove_poll_id(s->id); free(s->txt); free(s); } } static void free_one_acc(ACC *a) { free_accsock_list(a->socks); free(a->hoststr); free(a->portstr); free(a); } static void free_acc_list(ACC *l) { ACC *a; while (l) { a = l; l = a->link; free_one_acc(a); } } static void free_one_client(CLIENT *c) { CONN *n; CCLIST *cl; BWCM *m; free_acc_list(c->accs); while (c->conns) { n = c->conns; c->conns = n->link; free_conn(n); } while ((cl = c->conflicts)) { if (cl->f_cf) cl->f_cf->b_cf = cl->b_cf; if (cl->b_cf) cl->b_cf->f_cf = cl->f_cf; else cl->conflict->clients = cl->f_cf; if (!cl->conflict->clients && cl->conflict->busy) panic("free_one_client: busy"); c->conflicts = cl->f_cl; free(cl); } while ((m = c->bwclasses)) { if (m->f_bw) m->f_bw->b_bw = m->b_bw; if (m->b_bw) m->b_bw->f_bw = m->f_bw; else m->bwclass->members = m->f_bw; c->bwclasses = m->f_cl; free(m); } if (c->backvp) (*c->backops->done)(c->backvp); free(c->backingname); free(c->keyfile); free(c->keydata); free(c->tmp); free(c); } static void free_client_list(CLIENT *l) { CLIENT *c; while (l) { c = l; l = l->link; free_one_client(c); } } static void free_bwclass_list(BWCLASS *l) { BWCLASS *b; while (l) { b = l; l = b->link; if (b->members) panic("free_bwclass_list: has members"); free(b->name); free(b); } } static void clear_config(CONFIG *cf) { free_client_list(cf->clients); free_acc_list(cf->admin); free_bwclass_list(cf->bwclasses); } static void add_conflict_tag(CLIENT *c, char *tag, int len) { CCLIST *cl; CONFLICT *f; do <"havecf"> { for (f=conflicts;f;f=f->link) { if ((f->taglen == len) && !bcmp(f->tag,tag,len)) break <"havecf">; } f = malloc(sizeof(CONFLICT)); f->tag = malloc(len); bcopy(tag,f->tag,len); f->taglen = len; f->busy = 0; f->clients = 0; f->link = conflicts; conflicts = f; } while (0); cl = malloc(sizeof(CCLIST)); cl->conflict = f; cl->client = c; cl->f_cf = f->clients; cl->b_cf = 0; if (f->clients) f->clients->b_cf = cl; f->clients = cl; cl->f_cl = c->conflicts; cl->b_cl = 0; if (c->conflicts) c->conflicts->b_cl = cl; c->conflicts = cl; } static void add_bwcm(CLIENT *c, BWCLASS *b) { BWCM *m; for (m=c->bwclasses;m;m=m->f_cl) if (m->bwclass == b) return; m = malloc(sizeof(BWCM)); m->client = c; m->bwclass = b; m->f_bw = b->members; m->b_bw = 0; b->members = m; m->f_cl = c->bwclasses; m->b_cl = 0; c->bwclasses = m; m->r_debt = 0; m->w_debt = 0; } static void err(const char *, ...) __attribute__((__format__(__printf__,1,2))); static void err(const char *fmt, ...) { va_list ap; fprintf(stderr,"%s: ",__progname); va_start(ap,fmt); vfprintf(stderr,fmt,ap); va_end(ap); fprintf(stderr,"\n"); } static int set_Bps(BWCLASS *b, const char *limstr, int limlen) { unsigned long long int v; int i; int dv; v = 0; for <"digits"> (i=0;i; break; } v = (v * 10) + dv; } if (i == 0) { err("%s: missing number in bandwidth value `%.*s'",configfile,limlen,limstr); return(1); } if (i < limlen) { switch (limstr[i]) { case 'k': case 'K': i ++; v *= 1000ULL; break; case 'm': case 'M': i ++; v *= 1000000ULL; break; case 'g': case 'G': i ++; v *= 1000000000ULL; break; case 't': case 'T': i ++; v *= 1000000000000ULL; break; } } if ((i+3 <= limlen) && !strncasecmp(limstr+i,"bps",3)) { i += 3; v /= 8; } if (i < limlen) { err("junk after bandwidth value `%.*s'",limlen,limstr); return(1); } b->Bps = v; return(0); } static BWCLASS *new_bwclass(void) { BWCLASS *b; b = malloc(sizeof(BWCLASS)); b->Bps = 0; b->nmembers = 0; b->members = 0; return(b); } static BWCLASS *find_named_bwclass(CONFIG *cf, const char *name, int namelen) { BWCLASS *b; for (b=cf->bwclasses;b;b=b->link) { if ((b->namelen == namelen) && !bcmp(b->name,name,namelen)) return(b); } b = new_bwclass(); b->name = malloc(namelen); bcopy(name,b->name,namelen); b->namelen = namelen; b->link = cf->bwclasses; cf->bwclasses = b; return(b); } static int add_named_bwclass(CONFIG *cf, CLIENT *c, const char *name, int namelen) { add_bwcm(c,find_named_bwclass(cf,name,namelen)); return(0); } static int add_anon_bwclass(CONFIG *cf, CLIENT *c, const char *limstr, int limlen) { BWCLASS *b; b = new_bwclass(); if (set_Bps(b,limstr,limlen)) { free(b); return(1); } b->name = 0; b->namelen = -1; b->link = cf->bwclasses; cf->bwclasses = b; add_bwcm(c,b); return(0); } static void accops_client_dbgprint(FILE *to, ACC *a, CLIENT *c) { ACCSOCK *s; if (a->private != c) fprintf(to,"*** client backlink %p\n",a->private); for (s=a->socks;s;s=s->link) { fprintf(to," sock %p: %d %s\n",(void *)s,s->fd,s->txt); } } static void accops_client_accept( ACCSOCK *s, int new, struct sockaddr_storage *from, int fromlen ) { CLIENT *c; CONN *n; struct itimerval itv; c = s->a->private; n = malloc(sizeof(CONN)); n->link = c->conns; c->conns = n; n->s = s; n->c = c; n->state = CS_RR_A; n->fd = new; n->from = *from; n->fromlen = fromlen; oq_init(&n->oq); n->sumid = PL_NOID; n->tmofd = timer_socket(); if (n->tmofd < 0) { fprintf(stderr,"%s: timer socket: %s\n",__progname,strerror(errno)); exit(1); } itv.it_interval.tv_sec = 60; itv.it_interval.tv_usec = 0; itv.it_value = itv.it_interval; write(n->tmofd,&itv,sizeof(itv)); n->tmoid = add_poll_fd(n->tmofd,&rwtest_always,&rwtest_never,&rd_tmo,0,n); n->countdown = 10; read(rndfd,&n->srnd[0],16); oq_queue_copy(&n->oq,&n->srnd[0],16); n->cfill = 0; n->rbfill = 0; n->rbptr = 0; n->id = add_poll_fd(n->fd,&rtest_conn,&wtest_conn,&rd_conn,&wr_conn,n); n->crid = add_block_fn(&block_conn,n); n->status = 0; vprf(1,"accepted %p fd %d for %s\n",(void *)n,n->fd,c->backingname); } /* * ACC ops vector for ACCs corresponding to clients. */ static ACCOPS accops_client = { &accops_client_dbgprint, &accops_client_accept }; static int admin_iline(ADMIN *a) { int x; int x0; int (*ckfn)(CLIENT *c); int ck_all(CLIENT *c __attribute__((__unused__))) { return(1); } int ck_up(CLIENT *c) { CONN *n; for (n=c->conns;n;n=n->link) if (n->state == CS_UP) return(1); return(0); } for (x=0;(xl)&&FIXisspace(a->b[x]);x++) ; if (x >= a->l) return(1); x0 = x; for (;(xl)&&!FIXisspace(a->b[x]);x++) ; if ((x-x0 == 4) && (!bcmp(a->b+x0,"quit",4) || !bcmp(a->b+x0,"exit",4))) { a->flags |= AF_DEAD; return(0); } if ( ((x-x0 == 1) && (a->b[x0] == '?')) || ((x-x0 == 4) && !bcmp(a->b+x0,"help",4)) ) { oq_queue_point(&a->oq, "?, help Print this help\n" "exit, quit Disconnect\n" "all Print status for all filesystems\n" "up Print status for all filesystems with a connection\n" "drain Block until all queued output has been sent\n" "prompt Change prompt to \n" "echo Print \n" "detail Print a detailed data dump for client using \n" ,-1); return(1); } if ( ((x-x0 == 3) && !bcmp(a->b+x0,"all",3) && ((ckfn=&ck_all),1)) || ((x-x0 == 2) && !bcmp(a->b+x0,"up",2) && ((ckfn=&ck_up),1)) ) { CLIENT *cl; CONN *cn; for (cl=config.clients;cl;cl=cl->link) { if ((*ckfn)(cl)) { for (cn=cl->conns;cn;cn=cn->link) { if (cn->state == CS_UP) break; } if (cn) { oq_printf(&a->oq,"%s [%s]: fd %d - %s\n", cl->backingname,(*cl->backops->detail)(cl->backvp), cn->fd,cn->status?:"(no status)"); } else { oq_printf(&a->oq,"%s [%s]: disconnected\n", cl->backingname,(*cl->backops->detail)(cl->backvp)); } } } return(1); } if ((x-x0 == 5) && !bcmp(a->b+x0,"drain",5)) { a->flags |= AF_DRAIN; return(0); } if ((x-x0 == 6) && !bcmp(a->b+x0,"prompt",6)) { for (;(xl)&&FIXisspace(a->b[x]);x++) ; free(a->prompt); a->prompt = strdup(a->b+x); return(1); } if ((x-x0 == 4) && !bcmp(a->b+x0,"echo",4)) { for (;(xl)&&FIXisspace(a->b[x]);x++) ; oq_queue_copy(&a->oq,a->b+x,a->l-x); oq_queue_point(&a->oq,"\n",1); return(1); } if ((x-x0 == 6) && !bcmp(a->b+x0,"detail",6)) { CLIENT *cl; CONN *cn; ACC *acc; for (;(xl)&&FIXisspace(a->b[x]);x++) ; do <"found"> { for (cl=config.clients;cl;cl=cl->link) { if (! strcmp(cl->backingname,a->b+x)) { oq_printf(&a->oq,"Backing file: %s\n",cl->backingname); oq_printf(&a->oq,"Key file: %s\n",cl->keyfile); oq_printf(&a->oq,"Conflicts: "); if (cl->conflicts) { CCLIST *l; CONFLICT *f; for (l=cl->conflicts;l;l=l->f_cl) { f = l->conflict; oq_printf(&a->oq," %.*s [",f->taglen,f->tag); if (f->busy) { oq_printf(&a->oq,"%p:%s",(void *)f->busy,f->busy->c->backingname); } else { oq_printf(&a->oq,"free"); } oq_printf(&a->oq,"]"); } } else { oq_printf(&a->oq,"(none)"); } oq_printf(&a->oq,"\n"); oq_printf(&a->oq,"Accs:"); for (acc=cl->accs;acc;acc=acc->link) oq_printf(&a->oq," %s/%s",acc->hoststr,acc->portstr); oq_printf(&a->oq,"\n"); oq_printf(&a->oq,"Backing detail: %s\n",(*cl->backops->detail)(cl->backvp)); oq_printf(&a->oq,"Bandwidth classes:"); if (cl->bwclasses) { BWCM *bm; BWCLASS *bc; for (bm=cl->bwclasses;bm;bm=bm->f_cl) { bc = bm->bwclass; oq_printf(&a->oq," %.*s (Bps %llu, debt %u/%u)", bc->namelen,bc->name,bc->Bps,bm->r_debt,bm->w_debt); } } else { oq_printf(&a->oq," (none)"); } oq_printf(&a->oq,"\n"); oq_printf(&a->oq,"Connections:"); if (! cl->conns) oq_printf(&a->oq," (none)"); oq_printf(&a->oq,"\n"); for (cn=cl->conns;cn;cn=cn->link) { char pbuf[256]; const char *pp; oq_printf(&a->oq," %p:\n State: ",(void *)cn); switch (cn->state) { case CS_RR_A: oq_printf(&a->oq,"RR_A"); break; case CS_RR_B: oq_printf(&a->oq,"RR_B"); break; case CS_RR_C: oq_printf(&a->oq,"RR_C"); break; case CS_UP: oq_printf(&a->oq,"UP"); break; case CS_DEAD: oq_printf(&a->oq,"DEAD"); break; default: oq_printf(&a->oq,"?%d",(int)cn->state); break; } oq_printf(&a->oq,"\n"); switch (cn->from.ss_family) { case AF_INET: pp = inet_ntop(AF_INET,&((struct sockaddr_in *)&cn->from)->sin_addr,&pbuf[0],sizeof(pbuf)); if (pp) { oq_printf(&a->oq," From: %s/%d\n",pp,((struct sockaddr_in *)&cn->from)->sin_port); } else { oq_printf(&a->oq," From: (%s)\n",strerror(errno)); } break; case AF_INET6: pp = inet_ntop(AF_INET6,&((struct sockaddr_in6 *)&cn->from)->sin6_addr,&pbuf[0],sizeof(pbuf)); if (pp) { oq_printf(&a->oq," From: %s/%d\n",pp,((struct sockaddr_in6 *)&cn->from)->sin6_port); } else { oq_printf(&a->oq," From: (%s)\n",strerror(errno)); } break; default: oq_printf(&a->oq," From: ?af%d\n",cn->from.ss_family); break; } oq_printf(&a->oq," FD %d, OQ length %d\n",cn->fd,cn->oq.qlen); oq_printf(&a->oq," cfill/clen: %d/%d\n",cn->cfill,cn->clen); oq_printf(&a->oq," start=%u, hand=%u, end=%u, bf=%u, ckt=%u, cklen=%u\n",cn->start,cn->hand,cn->end,cn->bf,cn->ckt,cn->cklen); oq_printf(&a->oq," rbfill=%d rbptr=%d countdown=%u\n",cn->rbfill,cn->rbptr,cn->countdown); oq_printf(&a->oq," Status: %s\n",cn->status?:"(nil)"); } break <"found">; } } oq_printf(&a->oq,"No client found for `%s'\n",a->b+x); } while (0); return(1); } oq_queue_point(&a->oq,"? for help\n",-1); return(1); } static void admin_prompt(ADMIN *a) { oq_queue_copy(&a->oq,a->prompt,-1); } static void accops_admin_dbgprint( FILE *to __attribute__((__unused__)), ACC *a __attribute__((__unused__)), CLIENT *c __attribute__((__unused__)) ) { fprintf(to,"*** not a client acc\n"); } static int rtest_admin(RWTESTFN_ARGS) { ADMIN *a; a = arg; return(!(a->flags&(AF_DRAIN|AF_DEAD))); } static int wtest_admin(RWTESTFN_ARGS) { ADMIN *a; a = arg; return(!(a->flags&AF_DEAD) && a->oq.q); } static void rd_admin(RWFN_ARGS) { ADMIN *a; char rbuf[64]; int n; int i; a = arg; if (a->flags & AF_DEAD) return; n = read(a->fd,&rbuf[0],sizeof(rbuf)); if (n < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: break; default: a->flags |= AF_DEAD; break; } return; } if (n == 0) { a->flags |= AF_DEAD; return; } for (i=0;il >= a->a) a->b = realloc(a->b,a->a=a->l+8); a->b[a->l++] = ch; } switch (rbuf[i]) { case '\n': save('\0'); a->l --; if (admin_iline(a)) admin_prompt(a); if (a->flags & AF_DEAD) return; a->l = 0; break; case '\r': break; default: save(rbuf[i]); break; } } } static void wr_admin(RWFN_ARGS) { ADMIN *a; a = arg; if (a->flags & AF_DEAD) return; if (oq_write(&a->oq,a->fd) < 0) { a->flags |= AF_DEAD; return; } if ((a->flags & AF_DRAIN) && !a->oq.q) { a->flags &= ~AF_DRAIN; admin_prompt(a); } } static int block_admin(BLOCKFN_ARGS) { ADMIN *a; a = arg; if (a->flags & AF_DEAD) { if (a->flink) a->flink->blink = a->blink; if (a->blink) a->blink->flink = a->flink; else admins = a->flink; close(a->fd); free(a->b); oq_flusho(&a->oq); remove_poll_id(a->ioid); remove_block_id(a->bid); free(a->prompt); free(a); return(1); } return(0); } static void accops_admin_accept( ACCSOCK *s __attribute__((__unused__)), int new, struct sockaddr_storage *from, int fromlen ) { ADMIN *a; int on; on = 1; setsockopt(new,SOL_SOCKET,SO_KEEPALIVE,&on,sizeof(on)); a = malloc(sizeof(ADMIN)); a->flink = admins; a->blink = 0; admins = a; if (a->flink) a->flink->blink = a; a->from = *from; a->fromlen = fromlen; a->fd = new; a->b = 0; a->a = 0; a->l = 0; oq_init(&a->oq); a->ioid = add_poll_fd(a->fd,&rtest_admin,&wtest_admin,&rd_admin,&wr_admin,a); a->bid = add_block_fn(&block_admin,a); a->flags = 0; a->prompt = strdup("lbd> "); admin_prompt(a); vprf(1,"accepted admin fd %d, %p\n",new,(void *)a); } /* * ACC ops vector for administrative ACCs. */ static ACCOPS accops_admin = { &accops_admin_dbgprint, &accops_admin_accept }; static ACC *listen_for(char *spec, int speclen, ACCOPS *ops, void *priv) { char *slash; ACC *a; slash = memchr(spec,'/',speclen); if (slash == 0) { err("%s: no slash in listen spec `%.*s'",configfile,speclen,spec); return(0); } a = malloc(sizeof(ACC)); a->hoststr = blk_to_str(spec,slash-spec); a->portstr = blk_to_str(slash+1,speclen-((slash+1)-spec)); a->socks = 0; a->ops = ops; a->private = priv; return(a); } static void ensure_backtypes_lens(void) { int i; if (backtype_namelens[0] >= 0) return; for (i=N_BACKTYPES-1;i>=0;i--) backtype_namelens[i] = strlen(backtypes[i].name); } static const BACKING *lookup_backops(const void *name, int namelen) { int i; ensure_backtypes_lens(); for (i=N_BACKTYPES-1;i>=0;i--) { if ( (backtype_namelens[i] == namelen) && !bcmp(backtypes[i].name,name,namelen) ) return(backtypes[i].ops); } return(0); } typedef enum { KVKT_ONE = 1, KVKT_ONE_OR_MORE, KVKT_ZERO_OR_MORE, KVKT_ZERO_OR_ONE } KVKTYPE; typedef struct kvkey KVKEY; typedef struct clause CLAUSE; typedef struct kv KV; struct kvkey { const char *key; KVKTYPE type; int keylen; int n; int a; int *v; } ; struct clause { const char *type; KVKEY *keys; int (*process)(CONFIG *, CLAUSE *, KV *, char *); int typelen; } ; struct kv { int kx; int kl; int vx; int vl; } ; static int clause_client(CONFIG *, CLAUSE *, KV *, char *); /* forward */ static int clause_bwclass(CONFIG *, CLAUSE *, KV *, char *); /* forward */ static int clause_admin(CONFIG *, CLAUSE *, KV *, char *); /* forward */ static KVKEY keys_client[] = { { "file", KVKT_ONE }, #define KVX_CLIENT_FILE 0 { "key", KVKT_ONE }, #define KVX_CLIENT_KEY (KVX_CLIENT_FILE+1) { "listen", KVKT_ONE_OR_MORE }, #define KVX_CLIENT_LISTEN (KVX_CLIENT_KEY+1) { "conflict", KVKT_ZERO_OR_MORE }, #define KVX_CLIENT_CONFLICT (KVX_CLIENT_LISTEN+1) { "bwclass", KVKT_ZERO_OR_MORE }, #define KVX_CLIENT_BWCLASS (KVX_CLIENT_CONFLICT+1) { "bandwidth", KVKT_ZERO_OR_MORE }, #define KVX_CLIENT_BANDWIDTH (KVX_CLIENT_BWCLASS+1) { "type", KVKT_ZERO_OR_ONE }, #define KVX_CLIENT_TYPE (KVX_CLIENT_BANDWIDTH+1) { 0 } }; static KVKEY keys_bwclass[] = { { "name", KVKT_ONE }, #define KVX_BWCLASS_NAME 0 { "bandwidth", KVKT_ONE }, #define KVX_BWCLASS_BANDWIDTH (KVX_BWCLASS_NAME+1) { 0 } }; static KVKEY keys_admin[] = { { "listen", KVKT_ONE_OR_MORE }, #define KVX_ADMIN_LISTEN 0 { 0 } }; static CLAUSE clauses[] = { { "client", &keys_client[0], &clause_client }, { "bwclass", &keys_bwclass[0], &clause_bwclass }, { "admin", &keys_admin[0], &clause_admin }, { 0 } }; static int need_clause_init = 1; static int clause_client(CONFIG *cf, CLAUSE *l, KV *kv, char *s) { int n; int i; int j; CLIENT *c; ACC *a; KVKEY *kvk; struct stat stb; int fd; vprf(1,"new client\n"); c = malloc(sizeof(CLIENT)); c->backingname = 0; c->backvp = 0; c->keyfile = 0; c->conflicts = 0; c->accs = 0; c->keydata = 0; c->conns = 0; c->tmp = 0; c->bwclasses = 0; do <"fail"> { kvk = &l->keys[KVX_CLIENT_TYPE]; if (kvk->n) { j = kvk->v[0]; c->backops = lookup_backops(s+kv[j].vx,kv[j].vl); if (! c->backops) { err("%.*s: unrecognized backing type",kv[j].vl,s+kv[j].vx); break <"fail">; } } else { c->backops = DEFAULT_BACKOPS; } i = l->keys[KVX_CLIENT_KEY].v[0]; c->keyfile = blk_to_str(s+kv[i].vx,kv[i].vl); if (stat(c->keyfile,&stb) < 0) { err("stat %s: %s",c->keyfile,strerror(errno)); break <"fail">; } if (stb.st_size > 65536) { err("%s: unreasonably large (%llu)",c->keyfile,(unsigned long long int)stb.st_size); break <"fail">; } c->keylen = stb.st_size; c->keydata = malloc(c->keylen+1); fd = open(c->keyfile,O_RDONLY,0); if (fd < 0) { err("%s: %s",c->keyfile,strerror(errno)); break <"fail">; } n = read(fd,c->keydata,c->keylen+1); if ( ( (n < 0) && ( err("reading key data from %s: %s", c->keyfile, strerror(errno)),1 ) ) || ( (n > c->keylen) && ( err("%s: file grew (size %d, read %d)", c->keyfile, c->keylen, n),1 ) ) || ( (n != c->keylen) && ( err("reading key data from %s: wanted %d, got %d", c->keyfile, c->keylen, n),1 ) ) ) { close(fd); break <"fail">; } close(fd); vprf(1,"key data len %d\n",c->keylen); i = l->keys[KVX_CLIENT_FILE].v[0]; c->backingname = blk_to_str(s+kv[i].vx,kv[i].vl); c->backvp = (*c->backops->open)(c->backingname,&err); if (! c->backvp) break <"fail">; vprf(1,"backing file %s, %s\n",c->backingname,(*c->backops->detail)(c->backvp)); kvk = &l->keys[KVX_CLIENT_CONFLICT]; for (i=kvk->n-1;i>=0;i--) { j = kvk->v[i]; add_conflict_tag(c,s+kv[j].vx,kv[j].vl); } if (kvk->n == 0) { i = l->keys[KVX_CLIENT_FILE].v[0]; j = kv[i].vl; if ( (j >= 2) && (c->backingname[j-1] >= 'a') && (c->backingname[j-1] <= 'z') && FIXisdigit(c->backingname[j-2]) ) { add_conflict_tag(c,c->backingname,j-1); } else { vprf(1,"no conflict tag\n"); } } kvk = &l->keys[KVX_CLIENT_LISTEN]; for (i=kvk->n-1;i>=0;i--) { j = kvk->v[i]; a = listen_for(s+kv[j].vx,kv[j].vl,&accops_client,c); if (! a) break <"fail">; a->link = c->accs; c->accs = a; vprf(1,"acc %s/%s\n",a->hoststr,a->portstr); } kvk = &l->keys[KVX_CLIENT_BWCLASS]; for (i=kvk->n-1;i>=0;i--) { j = kvk->v[i]; if (add_named_bwclass(cf,c,s+kv[j].vx,kv[j].vl)) break <"fail">; } kvk = &l->keys[KVX_CLIENT_BANDWIDTH]; for (i=kvk->n-1;i>=0;i--) { j = kvk->v[i]; if (add_anon_bwclass(cf,c,s+kv[j].vx,kv[j].vl)) break <"fail">; } c->link = cf->clients; cf->clients = c; return(0); } while (0); free_one_client(c); return(1); } static int clause_bwclass(CONFIG *cf, CLAUSE *l, KV *kv, char *s) { BWCLASS *b; int i; vprf(1,"new bwclass\n"); i = l->keys[KVX_BWCLASS_NAME].v[0]; b = find_named_bwclass(cf,s+kv[i].vx,kv[i].vl); if (b->Bps) { err("%s: bwclass %.*s already specified",configfile,b->namelen,b->name); return(1); } i = l->keys[KVX_BWCLASS_BANDWIDTH].v[0]; return(set_Bps(b,s+kv[i].vx,kv[i].vl)); } static int clause_admin(CONFIG *cf, CLAUSE *l, KV *kv, char *s) { int i; int j; CLIENT *c; ACC *a; KVKEY *kvk; kvk = &l->keys[KVX_ADMIN_LISTEN]; for (i=kvk->n-1;i>=0;i--) { j = kvk->v[i]; a = listen_for(s+kv[j].vx,kv[j].vl,&accops_admin,c); if (! a) return(1); a->link = cf->admin; cf->admin = a; vprf(1,"new admin acc %s/%s\n",a->hoststr,a->portstr); } return(0); } static int check_config(CONFIG *cf) { BWCLASS *b; BWCM *m; CLIENT *c; CLIENT **cp; int errs; errs = 0; for (b=cf->bwclasses;b;b=b->link) { if (! b->Bps) { errs = 1; err("%s: bandwidth class %.*s never defined\n",configfile,b->namelen,b->name); cp = &cf->clients; while <"classes"> ((c = *cp)) { for (m=c->bwclasses;m;m=m->f_cl) { if (m->bwclass == b) { *cp = c->link; free_one_client(c); continue <"classes">; } } cp = &c->link; } } } return(errs); } /* * Load the config from the file into in-core data structures. This * creates the clients list, with accs but not accsocks. It also * opens all backing files, but does not lock them. Return value is * one of the LOAD_ constants: * * LOAD_OK * All went well. * LOAD_EMPTY * The file was completely empty, not even a comment. * LOAD_ERROR * An error occurred, but it was one that can be recovered * from by ignoring the line that provoked it. * LOAD_FATAL * An error occurred, and it is one that cannot be tied to * a specific line (eg, file not found on open). */ static int loadconfig(CONFIG *cf) #define LOAD_OK 1 #define LOAD_EMPTY 2 #define LOAD_ERROR 3 #define LOAD_FATAL 4 { FILE *f; char *l; int alloc; int n; int ch; int i; int j; int x; int rv; int x0; int xe; KV *kv; int akv; int nkv; CLAUSE *clause; KVKEY *kvk; void savec(int c) { if (n >= alloc) l = realloc(l,alloc=n+16); l[n++] = c; } void save_kv(int kx, int kl, int vx, int vl) { if (nkv >= akv) kv = realloc(kv,(akv=nkv+8)*sizeof(KV)); kv[nkv++] = (KV) { .kx=kx, .kl=kl, .vx=vx, .vl=vl }; } void save_kvx(KVKEY *k, int x) { if (k->n >= k->a) k->v = realloc(k->v,(k->a=k->n+8)*sizeof(int)); k->v[k->n++] = x; } if (need_clause_init) { for (i=0;(clause=&clauses[i])->type;i++) { clause->typelen = strlen(clause->type); for (j=0;(kvk=&clause->keys[j])->key;j++) { kvk->keylen = strlen(kvk->key); kvk->a = 0; kvk->v = 0; } } need_clause_init = 0; } cf->clients = 0; cf->admin = 0; cf->bwclasses = 0; rv = LOAD_OK; f = fopen(configfile,"r"); if (f == 0) { err("%s: %s",configfile,strerror(errno)); return(LOAD_FATAL); } ch = getc(f); if (ch == EOF) { fclose(f); return(LOAD_EMPTY); } ungetc(ch,f); l = 0; alloc = 0; n = 0; akv = 0; kv = 0; while <"readloop"> (1) { ch = getc(f); if (ch == EOF) { if (ferror(f)) { err("%s: read error: %s",configfile,strerror(errno)); free(l); clear_config(cf); return(LOAD_FATAL); } if (! n) break; ch = '\n'; } if (ch != '\n') { savec(ch); continue; } do <"nextline"> { savec('\0'); n --; nkv = 0; for (x=0;;x++) { if (x >= n) break <"nextline">; if (l[x] == '#') break <"nextline">; if (! FIXisspace(l[x])) break; } if ((n > 0) && (l[n-1] == '\\')) { if (x == n-1) n = 0; else n --; continue <"readloop">; } x0 = x; while ((x < n) && !FIXisspace(l[x])) x ++; save_kv(-1,0,x0,x-x0); while (1) { while ((x < n) && FIXisspace(l[x])) x ++; if (x >= n) break; xe = -1; x0 = x; while ((x < n) && !FIXisspace(l[x])) { if ((xe < 0) && (l[x] == '=')) xe = x; x ++; } if (xe < 0) { err("%s: no = in field `%.*s'",configfile,x0-x,l+x0); rv = LOAD_ERROR; break <"nextline">; } save_kv(x0,xe-x0,xe+1,x-(xe+1)); } if (ISV(1)) { printf("line:"); printf(" %.*s",kv[0].vl,l+kv[0].vx); for (i=1;i { for (i=0;clauses[i].type;i++) { if ( (kv[0].vl == clauses[i].typelen) && !bcmp(l+kv[0].vx,clauses[i].type,kv[0].vl) ) { clause = &clauses[i]; break <"foundclause">; } } err("%s: unknown clause keyword `%.*s'",configfile,kv[0].vl,l+kv[0].vx); rv = LOAD_ERROR; break <"nextline">; } while (0); for (i=0;clause->keys[i].key;i++) clause->keys[i].n = 0; for <"nextkey"> (i=nkv-1;i>0;i--) { for (j=0;(kvk=&clause->keys[j])->key;j++) { if ( (kv[i].kl == kvk->keylen) && !bcmp(l+kv[i].kx,kvk->key,kvk->keylen) ) { save_kvx(kvk,i); continue <"nextkey">; } } err("%s: unknown key `%.*s' in `%.*s' clause",configfile,kv[i].kl,l+kv[i].kx,kv[0].vl,l+kv[0].vx); rv = LOAD_ERROR; break <"nextline">; } for (j=0;(kvk=&clause->keys[j])->key;j++) { switch (kvk->type) { case KVKT_ONE: if (kvk->n < 1) { err("%s: missing key `%.*s' in `%.*s' clause",configfile,kvk->keylen,kvk->key,clause->typelen,clause->type); rv = LOAD_ERROR; break <"nextline">; } else if (kvk->n > 1) { err("%s: multiple keys `%.*s' in `%.*s' clause",configfile,kvk->keylen,kvk->key,clause->typelen,clause->type); rv = LOAD_ERROR; break <"nextline">; } break; case KVKT_ONE_OR_MORE: if (kvk->n < 1) { err("%s: missing key `%.*s' in `%.*s' clause",configfile,kvk->keylen,kvk->key,clause->typelen,clause->type); rv = LOAD_ERROR; break <"nextline">; } break; case KVKT_ZERO_OR_MORE: break; case KVKT_ZERO_OR_ONE: if (kvk->n > 1) { err("%s: multiple keys `%.*s' in `%.*s' clause",configfile,kvk->keylen,kvk->key,clause->typelen,clause->type); rv = LOAD_ERROR; break <"nextline">; } break; default: abort(); break; } } if ((*clause->process)(cf,clause,kv,l)) { rv = LOAD_ERROR; break <"nextline">; } } while (0); n = 0; } fclose(f); free(l); if (check_config(cf)) rv = LOAD_ERROR; return(rv); } static int known_af(const struct sockaddr *sa, int salen) { switch (sa->sa_family) { case AF_INET: return(salen==sizeof(struct sockaddr_in)); break; case AF_INET6: return(salen==sizeof(struct sockaddr_in6)); break; } return(0); } static int same_sockaddr(const struct sockaddr *a, int alen, const struct sockaddr *b, int blen) { if (alen != blen) return(0); if (a->sa_family != b->sa_family) return(0); switch (a->sa_family) { case AF_INET: if (alen != sizeof(struct sockaddr_in)) return(0); return( ( ((const struct sockaddr_in *)a)->sin_addr.s_addr == ((const struct sockaddr_in *)b)->sin_addr.s_addr ) && ( ((const struct sockaddr_in *)a)->sin_port == ((const struct sockaddr_in *)b)->sin_port ) ); break; case AF_INET6: if (alen != sizeof(struct sockaddr_in6)) return(0); return( ( ((const struct sockaddr_in6 *)a)->sin6_port == ((const struct sockaddr_in6 *)b)->sin6_port ) && !bcmp( &((const struct sockaddr_in6 *)a)->sin6_addr.s6_addr[0], &((const struct sockaddr_in6 *)a)->sin6_addr.s6_addr[0], 16 ) ); break; } return(0); } static int same_file(const struct stat *a, const struct stat *b) { return( (a->st_dev == b->st_dev) && (a->st_ino == b->st_ino) && (a->st_gen == b->st_gen) ); } static void dump_client_list(CLIENT *c, FILE *to) { ACC *a; ACCSOCK *s; CONN *n; for (;c;c=c->link) { fprintf(to," %p: %s %s (detail %s, keylen %d)\n",(void *)c,c->backingname,c->keyfile,(*c->backops->detail)(c->backvp),c->keylen); for (a=c->accs;a;a=a->link) { fprintf(to," acc %p: %s/%s\n",(void *)a,a->hoststr,a->portstr); for (s=a->socks;s;s=s->link) { fprintf(to," sock %p: fd=%d id=%d txt=%s\n",(void *)s,s->fd,s->id,s->txt); } } for (n=c->conns;n;n=n->link) { printf(" conn %p: s=%p c=%p state=%d[",(void *)n,(void *)n->s,(void *)n->c,n->state); switch (n->state) { case CS_RR_A: printf("RR_A"); break; case CS_RR_B: printf("RR_B"); break; case CS_RR_C: printf("RR_C"); break; case CS_UP: printf("UP"); break; case CS_DEAD: printf("DEAD"); break; default: printf("?""?"); break; } printf("] fd=%d id=%d\n",n->fd,n->id); } } } static int make_accsocks(ACC *a) { ACCSOCK *s; int e; int gnie; struct addrinfo hints; struct addrinfo *ai0; struct addrinfo *ai; char hn[NI_MAXHOST]; char sn[NI_MAXSERV]; hints.ai_flags = AI_PASSIVE; hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = 0; hints.ai_addrlen = 0; hints.ai_canonname = 0; hints.ai_addr = 0; hints.ai_next = 0; e = getaddrinfo(strcmp(a->hoststr,"*")?a->hoststr:0,a->portstr,&hints,&ai0); if (e) { err("%s/%s: %s",a->hoststr,a->portstr,gai_strerror(e)); return(1); } if (! ai0) { err("%s/%s: successful lookup but no addresses?",a->hoststr,a->portstr); return(1); } for (ai=ai0;ai;ai=ai->ai_next) { int fd; if (! known_af(ai->ai_addr,ai->ai_addrlen)) continue; gnie = getnameinfo(ai->ai_addr,ai->ai_addrlen,&hn[0],NI_MAXHOST,&sn[0],NI_MAXSERV,NI_NUMERICHOST|NI_NUMERICSERV|NI_WITHSCOPEID); fd = socket(ai->ai_family,ai->ai_socktype,ai->ai_protocol); if (fd < 0) { e = errno; if (gnie) { err("socket [af %d - can't get address text: %s]: %s",ai->ai_family,strerror(gnie),strerror(e)); } else { err("socket (%s/%s): %s",&hn[0],&sn[0],strerror(e)); } continue; } s = malloc(sizeof(ACCSOCK)); s->link = a->socks; a->socks = s; s->a = a; s->addr = malloc(ai->ai_addrlen); bcopy(ai->ai_addr,s->addr,ai->ai_addrlen); s->addrlen = ai->ai_addrlen; s->fd = fd; s->id = PL_NOID; if (gnie) { asprintf(&s->txt,"[af%d: can't get address text: %s]",ai->ai_family,strerror(gnie)); } else { asprintf(&s->txt,"%s/%s",&hn[0],&sn[0]); } vprf(1,"added accsock %p [fd=%d txt=%s] to acc %p\n",(void *)s,s->fd,s->txt,(void *)a); } if (a->socks == 0) { err("%s/%s: no listening sockets",a->hoststr,a->portstr); return(1); } return(0); } static int maybe_move_accsock(ACCSOCK *old, ACCSOCK *new) { if (same_sockaddr(old->addr,old->addrlen,new->addr,new->addrlen)) { vprf(1,"moving fd %d from %p to %p, replacing %d\n",old->fd,(void *)old,(void *)new,new->fd); if (new->fd >= 0) close(new->fd); new->fd = old->fd; old->fd = -1; if (new->id != PL_NOID) remove_poll_id(new->id); new->id = old->id; old->id = PL_NOID; return(1); } return(0); } static void finish_accsock(ACCSOCK *s) { if (s->id == PL_NOID) { int v; v = 1; vprf(1,"doing bind-&-listen on %d, to %s\n",s->fd,s->txt); setsockopt(s->fd,SOL_SOCKET,SO_REUSEADDR,&v,sizeof(v)); if (bind(s->fd,(void *)s->addr,s->addrlen) < 0) { err("bind %s: %s",s->txt,strerror(errno)); close(s->fd); s->fd = -1; } else { listen(s->fd,10); fcntl(s->fd,F_SETFL,fcntl(s->fd,F_GETFL,0)|O_NONBLOCK); } } else { remove_poll_id(s->id); } s->id = (s->fd >= 0) ? add_poll_fd(s->fd,&rwtest_always,&rwtest_never,&acc_accept,0,s) : PL_NOID; } static void finalize_config(CONFIG *cf) { BWCLASS *b; for (b=cf->bwclasses;b;b=b->link) { b->tolerance = (BW_TIME_TOLERANCE * b->Bps) / 1000000; } } /* * We have a new config in clients and an old config in oldclients. * Switch over to using the new config. Return value is one of the * LOAD_ constants, as for loadconfig, except that LOAD_EMPTY's * meaning is different: it means that (possibly after discarding * clients because of errors) the new config is empty. The old config * is left untouched in this case, unless fstat() on an old backing * file returns an error. Also, LOAD_FATAL is not possible. * * The reason we restrict ourselves to address families we recognize is * that we need to be able to tell when two addresses match, and to do * that we have to go into the interior of the struct sockaddr. (If * all AFs used structs sockaddr such that one could just bcmp() them * to get a useful equality test, we could do that AF-independently. * But that's not promised, and I think it's even not so.) */ static int new_config(CONFIG *oldcf) { CLIENT **cp; CLIENT *c; CLIENT *oc; ACC **ap; ACC *a; ACC *oa; ACCSOCK *s; ACCSOCK *os; CONN **onp; CONN *on; int i; int rv; if (ISV(1)) { printf("new_config\nold client list:\n"); dump_client_list(oldcf?oldcf->clients:0,stdout); printf("new client list:\n"); dump_client_list(config.clients,stdout); } /* * Allocate a struct stat for each client's backing file. */ for (i=1;i>=0;i--) { if (i) { cp = &config.clients; } else { if (! oldcf) continue; cp = &oldcf->clients; } while ((c = *cp)) { free(c->tmp); c->tmp = malloc(sizeof(struct stat)); if ((*c->backops->fstat)(c->backvp,c->tmp) < 0) { err("fstat %s: %s (disabling client)",c->backingname,strerror(errno)); *cp = c->link; free_one_client(c); } else { cp = &c->link; } } } vprf(1,"allocated structs stat\n"); /* * Create accsocks for the new config. Also create the sockets, but * do not bind() or listen() (don't do that until we're sure we want * to use this socket, instead of moving one from the old config). * In the process, discard any references to AFs we don't recognize * or can't create sockets for. */ rv = LOAD_OK; cp = &config.clients; while <"nextclient"> ((c = *cp)) { ap = &c->accs; while <"nextacc"> ((a = *ap)) { if (make_accsocks(a)) { rv = LOAD_ERROR; vprf(1,"error, dropping acc %p\n",(void *)a); *ap = a->link; free_one_acc(a); } else { ap = &a->link; } } if (c->accs == 0) { err("%s %s: no listening sockets",c->backingname,c->keyfile); vprf(1,"deleting client %p\n",(void *)c); *cp = c->link; free_one_client(c); } else { cp = &c->link; } } ap = &config.admin; while <"nextacc"> ((a = *ap)) { if (make_accsocks(a)) { vprf(1,"error, dropping admin acc %p\n",(void *)a); *ap = a->link; free_one_acc(a); } else { ap = &a->link; } } /* * If no clients left, bail. */ if ((rv != LOAD_OK) && !config.clients) { vprf(1,"bailing (empty with errors)\n"); return(LOAD_EMPTY); } /* * We are now committed to switching configs. * * Move over existing connections when possible. */ if (oldcf) { for (oc=oldcf->clients;oc;oc=oc->link) { onp = &oc->conns; while <"nextconn"> ((on = *onp)) { vprf(1,"considering conn %p (s=%p c=%p)\n",(void *)on,(void *)on->s,(void *)on->c); for (c=config.clients;c;c=c->link) { if ( !same_file(c->tmp,on->c->tmp) || (c->keylen != on->c->keylen) || bcmp(c->keydata,on->c->keydata,c->keylen) ) continue; for (a=c->accs;a;a=a->link) { for (s=a->socks;s;s=s->link) { if (same_sockaddr(on->s->addr,on->s->addrlen,s->addr,s->addrlen)) { vprf(1,"moving conn %p (was s=%p c=%p) to s=%p c=%p\n",(void *)on,(void *)on->s,(void *)on->c,(void *)s,(void *)c); *onp = on->link; on->s = s; on->c = c; on->link = c->conns; c->conns = on; continue <"nextconn">; } } } } onp = &on->link; } } } /* * Move over existing listening sockets. We want to do this to * preserve queues of pending connections if nothing else. The code * duplication here is a little annoying. Given the way the loops * have to work, though, I don't see a better way short of doing * something like implementing generators, which strikes me as * overkill. After doing this, the poll IDs may be a bit broken, in * that their cookies are the old ACCSOCKs, but that's fixed up * below. */ if (oldcf) { for (oc=oldcf->clients;oc;oc=oc->link) { for (oa=oc->accs;oa;oa=oa->link) { for <"nextsock"> (os=oa->socks;os;os=os->link) { vprf(1,"considering accsock %p (acc=%p client=%p)\n",(void *)os,(void *)oa,(void *)oc); for (c=config.clients;c;c=c->link) { for (a=c->accs;a;a=a->link) { for (s=a->socks;s;s=s->link) { if (maybe_move_accsock(os,s)) continue <"nextsock">; } } } } } } for (oa=oldcf->admin;oa;oa=oa->link) { for <"nextsock"> (os=oa->socks;os;os=os->link) { vprf(1,"considering accsock %p (acc=%p admin)\n",(void *)os,(void *)oa); for (a=config.admin;a;a=a->link) { for (s=a->socks;s;s=s->link) { if (maybe_move_accsock(os,s)) continue <"nextsock">; } } } } } /* * Move over backing-file file descriptors when possible. */ if (oldcf) { for (oc=oldcf->clients;oc;oc=oc->link) { for (c=config.clients;c;c=c->link) { if (! c->tmp) continue; if ( same_file(oc->tmp,c->tmp) && (oc->backops == c->backops) && (oc->keylen == c->keylen) && !bcmp(oc->keydata,c->keydata,c->keylen) ) { vprf(1,"moving %s from %p to %p, replacing %s\n", (*oc->backops->detail)(oc->backvp),(void *)oc,(void *)c,(*c->backops->detail)(c->backvp)); (*c->backops->move)(&oc->backvp,&c->backvp); free(c->tmp); c->tmp = 0; } } } } /* * Okay, everything we can move over has been moved over. Create * anything that's missing - specifically, finish setting up * listening sockets for accsocks that didn't get them moved, lock * backing files for which we didn't move a descriptor, and (re)set * poll IDs for ACCSOCKs. * * Conflicts do not get moved, because they are shared between the old * and new configs. Let old CLIENTs get removed from their conflicts * inside clear_config. Old CONNs that get moved to the new config * can stay where they are; CONNs that don't get moved will be closed * when their CLIENTs are destroyed. */ cp = &config.clients; while <"nextclient"> ((c = *cp)) { for (a=c->accs;a;a=a->link) { for (s=a->socks;s;s=s->link) { finish_accsock(s); } } if (c->tmp) { vprf(1,"flocking %s (%s)\n",(*c->backops->detail)(c->backvp),c->backingname); if ((*c->backops->flock)(c->backvp,LOCK_EX|LOCK_NB) < 0) { err("%s is already locked",c->backingname); *cp = c->link; free_one_client(c); continue <"nextclient">; } } cp = &c->link; } for (a=config.admin;a;a=a->link) { for (s=a->socks;s;s=s->link) { finish_accsock(s); } } if (ISV(1)) { printf("done\nold client list:\n"); dump_client_list(oldcf?oldcf->clients:0,stdout); printf("new client list:\n"); dump_client_list(config.clients,stdout); } if (oldcf) clear_config(oldcf); finalize_config(&config); return(LOAD_OK); } static void weed_conflicts(void) { CONFLICT **cp; CONFLICT *c; for (cp=&conflicts;(c=*cp);) { if (c->clients) { cp = &c->link; } else { if (c->busy) panic("weed_conflicts busy"); *cp = c->link; free(c->tag); free(c); } } } static int block_timeout(BLOCKFN_ARGS) { BWCLASS *b; BWCM *m; int n; int nb; int us; for (b=config.bwclasses;b;b=b->link) { if (ISV(2)) printf("block_timeout %.*s rblock %s wblock %s\n", b->namelen,b->name, (b->ioflags & BWIOF_RBLOCK) ? "Y" : "N", (b->ioflags & BWIOF_WBLOCK) ? "Y" : "N" ); if (b->ioflags & BWIOF_RBLOCK) { n = 0; for (m=b->members;m;m=m->f_bw) if (m->ioflags & BWCMIO_RBLOCK) n ++; for (m=b->members;m;m=m->f_bw) { if (m->ioflags & BWCMIO_RBLOCK) { nb = (m->r_debt >= m->bwclass->tolerance) ? m->bwclass->tolerance/2 : 0; us = ceil((m->r_debt-nb)/(m->bwclass->Bps/(n*1e3))); if (ISV(2)) { printf("rblock %s %.*s setting us = %d\n", m->client->backingname,m->bwclass->namelen, m->bwclass->name,us); } block_lower_timeout(ctx,us); } } } if (b->ioflags & BWIOF_WBLOCK) { n = 0; for (m=b->members;m;m=m->f_bw) if (m->ioflags & BWCMIO_WBLOCK) n ++; for (m=b->members;m;m=m->f_bw) { if (m->ioflags & BWCMIO_WBLOCK) { nb = (m->w_debt >= m->bwclass->tolerance) ? m->bwclass->tolerance/2 : 0; us = ceil((m->w_debt-nb)/(m->bwclass->Bps/(n*1e3))); if (ISV(2)) { printf("wblock %s %.*s setting us = %d\n", m->client->backingname,m->bwclass->namelen, m->bwclass->name,us); } block_lower_timeout(ctx,us); } } } } return(0); } static void setup(void) { int rv; struct rlimit rl; getrlimit(RLIMIT_NOFILE,&rl); if (rl.rlim_cur < rl.rlim_max) { rl.rlim_cur = rl.rlim_max; setrlimit(RLIMIT_NOFILE,&rl); } conflicts = 0; rndfd = open(rnddev,O_RDONLY,0); if (rndfd < 0) { fprintf(stderr,"%s: %s: %s\n",__progname,rnddev,strerror(errno)); exit(1); } signal(SIGPIPE,SIG_IGN); add_block_fn(&weed_conns,0); add_block_fn(&block_timeout,0); rv = loadconfig(&config); switch (rv) { default: panic("setup: bad loadconfig return value"); break; case LOAD_OK: break; case LOAD_EMPTY: fprintf(stderr,"%s: %s: empty config\n",__progname,configfile); /* fall through */ case LOAD_ERROR: case LOAD_FATAL: exit(1); break; } rv = new_config(0); switch (rv) { default: panic("setup: bad new_config return value"); break; case LOAD_OK: break; case LOAD_EMPTY: fprintf(stderr,"%s: %s: empty config\n",__progname,configfile); /* fall through */ case LOAD_ERROR: exit(1); break; } weed_conflicts(); admins = 0; } static void sighup_reload(void) { int rv; CONFIG oldconfig; oldconfig = config; rv = loadconfig(&config); switch (rv) { default: panic("sighup_reload: bad loadconfig return value"); break; case LOAD_OK: break; case LOAD_ERROR: if (config.clients) break; if (0) { case LOAD_EMPTY: fprintf(stderr,"%s: %s: empty config\n",__progname,configfile); } case LOAD_FATAL: clear_config(&config); config = oldconfig; return; break; } rv = new_config(&oldconfig); switch (rv) { default: panic("sighup_reload: bad new_config return value"); break; case LOAD_OK: break; case LOAD_EMPTY: fprintf(stderr,"%s: %s: empty config\n",__progname,configfile); clear_config(&config); config = oldconfig; return; break; } weed_conflicts(); } static void create_sighup_pipe(void); /* forward */ static int do_sighup(BLOCKFN_ARGS) { vprf(1,"reloading config\n"); sighup_reload(); remove_block_id(sighup_block_id); sighup_block_id = PL_NOID; create_sighup_pipe(); return(1); } static void close_sighup_pipe(void) { int w; if (sighup_pipe_r >= 0) close(sighup_pipe_r); sighup_pipe_r = -1; w = sighup_pipe_w; /* If a SIGHUP arrives here, we may close the write end twice. But since no other fds can be opened in between, that doesn't hurt anythng; the later close will show EBADF, but we don't mind. */ if (w >= 0) close(w); sighup_pipe_w = -1; if (sighup_pipe_id != PL_NOID) remove_poll_id(sighup_pipe_id); sighup_pipe_id = PL_NOID; } static void rd_sighup(RWFN_ARGS) { vprf(1,"SIGHUP received\n"); close_sighup_pipe(); sighup_block_id = add_block_fn(&do_sighup,0); } static void create_sighup_pipe(void) { int p[2]; if (pipe(&p[0]) < 0) { fprintf(stderr,"%s: pipe: %s\n",__progname,strerror(errno)); exit(1); } sighup_pipe_r = p[0]; sighup_pipe_w = p[1]; sighup_pipe_id = add_poll_fd(sighup_pipe_r,&rwtest_always,&rwtest_never,&rd_sighup,0,0); } static void sighup_handler(int sig __attribute__((__unused__))) { int w; if (ISV(1)) write(1,"[SIGHUP]",8); w = sighup_pipe_w; if (w >= 0) { close(w); sighup_pipe_w = -1; } } static void setup_sighup(void) { struct sigaction sa; sa.sa_handler = &sighup_handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGHUP,&sa,0); create_sighup_pipe(); } static int flush_stdio(BLOCKFN_ARGS) { fflush(0); return(0); } static void init_bw(void) { BWCLASS *b; BWCM *m; for (b=config.bwclasses;b;b=b->link) { b->ioflags = 0; for (m=b->members;m;m=m->f_bw) m->ioflags = 0; } } static void credit_bw(void) { static struct timeval lasttv = {0,0}; struct timeval now; double us; BWCLASS *b; BWCM *m; BWCM *minm; int n; unsigned int credit; if (! config.bwclasses) return; gettimeofday(&now,0); if (lasttv.tv_sec == 0) { lasttv = now; return; } us = (((now.tv_sec - lasttv.tv_sec) * 1e6) + now.tv_usec) - lasttv.tv_usec; lasttv = now; if (ISV(2)) { printf("credit_bw lasttv=%lu.%06lu now=%lu.%06lu us=%g\n", (unsigned long int)lasttv.tv_sec, (unsigned long int)lasttv.tv_usec, (unsigned long int)now.tv_sec, (unsigned long int)now.tv_usec, us); } for (b=config.bwclasses;b;b=b->link) { while <"again"> (1) { minm = 0; n = 0; for (m=b->members;m;m=m->f_bw) { if (m->r_debt) { if (!minm || (m->r_debt < minm->r_debt)) minm = m; n ++; } } if (minm) { credit = (b->Bps * us) / (n * 1e6); if (ISV(2)) printf("credit_bw R bwclass %.*s credit %u: Bps %llu us %g n %d\n", b->namelen,b->name,credit,b->Bps,us,n); if (credit > minm->r_debt) { if (ISV(2)) { printf("credit_bw R wiping out %s (%u)\n", minm->client->backingname,minm->r_debt); } minm->r_debt = 0; continue <"again">; } if (ISV(2)) printf("credit_bw R crediting"); for (m=b->members;m;m=m->f_bw) { if (m->r_debt) { m->r_debt -= credit; if (ISV(2)) printf(" %s(%u)",m->client->backingname,m->r_debt); } } if (ISV(2)) printf("\n"); } else { if (ISV(2)) { printf("credit_bw R bwclass %.*s nothing to do\n", b->namelen,b->name); } } break; } while <"again"> (1) { minm = 0; n = 0; for (m=b->members;m;m=m->f_bw) { if (m->w_debt) { if (!minm || (m->w_debt < minm->w_debt)) minm = m; n ++; } } if (minm) { credit = (b->Bps * us) / (n * 1e6); if (ISV(2)) printf("credit_bw W bwclass %.*s credit %u: Bps %llu us %g n %d\n", b->namelen,b->name,credit,b->Bps,us,n); if (credit > minm->w_debt) { if (ISV(2)) { printf("credit_bw W wiping out %s (%u)\n", minm->client->backingname,minm->w_debt); } minm->w_debt = 0; continue <"again">; } if (ISV(2)) printf("credit_bw W crediting"); for (m=b->members;m;m=m->f_bw) { if (m->w_debt) { m->w_debt -= credit; if (ISV(2)) printf(" %s(%u)",m->client->backingname,m->w_debt); } } if (ISV(2)) printf("\n"); } else { if (ISV(2)) { printf("credit_bw W bwclass %.*s nothing to do\n", b->namelen,b->name); } } break; } } } static void setup_cwd(void) { cwd_fd = open(".",O_RDONLY,0); if (cwd_fd < 0) { fprintf(stderr,"%s: can't open .: %s\n",__progname,strerror(errno)); exit(1); } } int main(int, char **); int main(int ac, char **av) { handleargs(ac,av); timer_socket_init(); if (ISV(1)) { fflush(stdout); setlinebuf(stdout); } setup_cwd(); poll_init(); if (ISV(1)) add_block_fn(&flush_stdio,0); setup_sighup(); setup(); while (1) { init_bw(); pre_poll(); if (do_poll() < 0) { if (errno == EINTR) continue; fprintf(stderr,"%s: poll: %s\n",__progname,strerror(errno)); exit(1); } post_poll(); credit_bw(); } }