/* This file is in the public domain. */ #include #include #include #include #include #include #include extern const char *__progname; #include "oq.h" #include "util.h" #include "repo.h" #include "time.h" #include "listen.h" #include "structs.h" #include "pollloop.h" #include "netfd.h" #define IRINGSIZE 15000 #define ISTRSIZE 1000 typedef struct netfd NETFD; typedef struct iops IOPS; struct netfd { int fd; int ioid; LISTEN *l; char *text; OQ oq; int tmofd; int tmoid; int ieof; unsigned char iring[IRINGSIZE]; unsigned int irh; unsigned int irt; unsigned int irn; int procid; unsigned char istr[ISTRSIZE]; int istrlen; const IOPS *iops; void *ipriv; } ; struct iops { void (*setup)(NETFD *); int (*perchar)(NETFD *, unsigned char); void (*eof)(NETFD *); void (*abort)(NETFD *); } ; #define IOPS_INIT(name) { \ &iop_##name##_setup, \ &iop_##name##_perchar, \ &iop_##name##_eof, \ &iop_##name##_abort, \ } static void reset_timeout(NETFD *nf) { struct itimerval itv; itv.it_value.tv_sec = 60; itv.it_value.tv_usec = 0; itv.it_interval.tv_sec = 0; itv.it_interval.tv_usec = 0; write(nf->tmofd,&itv,sizeof(itv)); } static void netfd_destroy(NETFD *nf) { printf("destroying %s\n",nf->text); // debugging close(nf->fd); remove_poll_id(nf->ioid); listen_deref(nf->l); free(nf->text); oq_flush(&nf->oq); close(nf->tmofd); remove_poll_id(nf->tmoid); if (nf->procid != PL_NOID) remove_block_id(nf->procid); (*nf->iops->abort)(nf); free(nf); } static int rtest_netfd_data(void *nfv) { NETFD *nf; nf = nfv; return( !nf->ieof && (((NETFD *)nfv)->irn < IRINGSIZE) ); } static int wtest_netfd_data(void *nfv) { return(oq_nonempty(&((NETFD *)nfv)->oq)); } static void rd_netfd_data(void *nfv) { NETFD *nf; int want; int got; nf = nfv; if (nf->ieof) return; want = IRINGSIZE - nf->irn; if (want > IRINGSIZE-nf->irh) want = IRINGSIZE - nf->irh; got = read(nf->fd,&nf->iring[nf->irh],want); if (got < 0) { switch (errno) { case EINTR: case NONBLOCKING: return; } fprintf(stderr,"%s: %s: read: %s\n",__progname,nf->text,strerror(errno)); netfd_destroy(nf); return; } if (got == 0) { nf->ieof = 1; printf("read EOF from %s\n",nf->text); // debugging return; } nf->irh += got; nf->irn += got; if (nf->irh > IRINGSIZE) abort(); if (nf->irh == IRINGSIZE) nf->irh = 0; printf("read %d from %s\n",got,nf->text); // debugging } static void wr_netfd_data(void *nfv) { NETFD *nf; int w; nf = nfv; w = oq_writev(&nf->oq,nf->fd); if (w < 0) { switch (errno) { case EINTR: case NONBLOCKING: return; break; } fprintf(stderr,"%s: %s: writev: %s\n",__progname,nf->text,strerror(errno)); netfd_destroy(nf); return; } oq_dropdata(&nf->oq,w); reset_timeout(nf); } static void rd_netfd_tmo(void *nfv) { netfd_destroy(nfv); } static int netfd_process(void *nfv) { NETFD *nf; unsigned char c; nf = nfv; if (nf->irn < 1) { if (nf->ieof) { remove_block_id(nf->procid); nf->procid = PL_NOID; (*nf->iops->eof)(nf); return(BLOCK_LOOP); } return(BLOCK_NIL); } while (nf->irn > 0) { c = nf->iring[nf->irt]; nf->irt ++; if (nf->irt >= IRINGSIZE) nf->irt = 0; nf->irn --; if ((*nf->iops->perchar)(nf,c)) break; } return(BLOCK_LOOP); } static void record_data(NETFD *nf, char *name, char *stamp, char *data) { __label__ errjmp; TIMESTAMP ts; REPO *r; const char *errwhat; char *errstr; int errlen; void err(const char *fmt, ...) { va_list ap; va_start(ap,fmt); errlen = vasprintf(&errstr,fmt,ap); va_end(ap); goto errjmp; } printf("record_data: name=%s stamp=%s data=%s\n",name,stamp,data); // debugging errwhat = "timestamp parse"; ts = timestamp_parse(stamp,&err); errwhat = "repo lookup"; r = repo_lookup(nf->l,name,&err); printf("calling repo to record\n"); // debugging errwhat = "data recording"; repo_record(r,ts,data,&err); if (0) { errjmp:; printf("%s error [%s]\n",errwhat,errstr); // debugging oq_queue_free(&nf->oq,errstr,errlen); } oq_queue_point(&nf->oq,"",1); } static void netfd_set_iops(NETFD *nf, const IOPS *ops) { nf->iops = ops; (*ops->setup)(nf); } static int iop_getstr_perchar(NETFD *nf, unsigned char c) { if (nf->istrlen >= ISTRSIZE) { netfd_destroy(nf); return(1); } nf->istr[nf->istrlen++] = c; return(0); } static int destroy_when_drained(void *nfv) { NETFD *nf; nf = nfv; if (oq_nonempty(&nf->oq)) return(BLOCK_NIL); printf("output drained, shutting down %s\n",nf->text); remove_block_id(nf->procid); netfd_destroy(nf); return(BLOCK_LOOP); } typedef struct iop_new_priv IOP_NEW_PRIV; struct iop_new_priv { char *name; char *stamp; char *data; void (*eos)(NETFD *, IOP_NEW_PRIV *); } ; static void iop_new_gotname(NETFD *, IOP_NEW_PRIV *); static void iop_new_gotstamp(NETFD *, IOP_NEW_PRIV *); static void iop_new_gotdata(NETFD *, IOP_NEW_PRIV *); static void iop_new_gotname(NETFD *nf __attribute__((__unused__)), IOP_NEW_PRIV *p) { printf("new-data name `%s' on %s\n",&nf->istr[0],nf->text); // debugging if (! nf->istr[0]) { shutdown(nf->fd,SHUT_RD); remove_block_id(nf->procid); nf->procid = add_block_fn(&destroy_when_drained,nf); printf("closing down after drain\n"); // debugging return; } p->name = strdup(&nf->istr[0]); nf->istrlen = 0; p->eos = &iop_new_gotstamp; } static void iop_new_gotstamp(NETFD *nf __attribute__((__unused__)), IOP_NEW_PRIV *p) { printf("new-data stamp `%s' on %s\n",&nf->istr[0],nf->text); // debugging p->stamp = strdup(&nf->istr[0]); p->eos = &iop_new_gotdata; } static void iop_new_gotdata(NETFD *nf, IOP_NEW_PRIV *p) { printf("new-data data `%s' on %s\n",&nf->istr[0],nf->text); // debugging p->data = strdup(&nf->istr[0]); record_data(nf,p->name,p->stamp,p->data); free(p->name); p->name = 0; free(p->stamp); p->stamp = 0; free(p->data); p->data = 0; p->eos = &iop_new_gotname; } static void iop_new_setup(NETFD *nf) { IOP_NEW_PRIV *p; p = malloc(sizeof(IOP_NEW_PRIV)); p->name = 0; p->stamp = 0; p->data = 0; p->eos = &iop_new_gotname; nf->istrlen = 0; nf->ipriv = p; printf("starting new-data on %s\n",nf->text); // debugging } static int iop_new_perchar(NETFD *nf, unsigned char c) { IOP_NEW_PRIV *p; if (iop_getstr_perchar(nf,c)) return(1); if (c) return(0); p = nf->ipriv; (*p->eos)(nf,p); nf->istrlen = 0; return(1); } static void iop_new_eof(NETFD *nf) { printf("unexpected new-data EOF on %s\n",nf->text); netfd_destroy(nf); } static void iop_new_abort(NETFD *nf) { IOP_NEW_PRIV *p; printf("aborting new-data on %s\n",nf->text); // debugging p = nf->ipriv; free(p->name); free(p->stamp); free(p->data); free(p); } static const IOPS iops_new = IOPS_INIT(new); typedef struct iop_fetch_priv IOP_FETCH_PRIV; typedef struct fetch_repo FETCH_REPO; struct fetch_repo { FETCH_REPO *link; char *name; REPO *r; REPO_CURSOR *rc; } ; struct iop_fetch_priv { NETFD *nf; FETCH_REPO *repos; char *tstart; char *tstop; TIMESTAMP start; TIMESTAMP stop; char *errs; int (*perchar)(IOP_FETCH_PRIV *, unsigned char); } ; static int fetch_send_some(void *pv) { IOP_FETCH_PRIV *p; FETCH_REPO *fr; int any; TIMESTAMP ts; FETCH_REPO *bestfr; TIMESTAMP bestts; p = pv; if (oq_qlen(&p->nf->oq) >= 65536) return(BLOCK_NIL); printf("sending data chunk\n"); // debugging while (oq_qlen(&p->nf->oq) < 65536) { any = 0; bestfr = 0; for (fr=p->repos;fr;fr=fr->link) { if (fr->rc) { if (repo_cursor_eof(fr->rc)) { repo_cursor_close(fr->rc); fr->rc = 0; continue; } any = 1; ts = repo_cursor_stamp(fr->rc); if (!bestfr || (ts < bestts)) { bestfr = fr; bestts = ts; } } } if (! any) { oq_queue_point(&p->nf->oq,"",1); remove_block_id(p->nf->procid); p->nf->procid = add_block_fn(&destroy_when_drained,p->nf); break; } oq_queue_point(&p->nf->oq,bestfr->name,OQ_STRLEN); oq_queue_point(&p->nf->oq,"",1); oq_queue_printf(&p->nf->oq,"%llu",bestts); oq_queue_point(&p->nf->oq,"",1); oq_queue_copy(&p->nf->oq,repo_cursor_data(bestfr->rc),OQ_STRLEN); oq_queue_point(&p->nf->oq,"",1); repo_cursor_advance(bestfr->rc); } return(BLOCK_LOOP); } static int fetch_start_send(void *pv) { IOP_FETCH_PRIV *p; FETCH_REPO *fr; p = pv; printf("starting to send data\n"); // debugging for (fr=p->repos;fr;fr=fr->link) { fr->rc = repo_cursor_open(fr->r,p->start,p->stop); repo_cursor_rewind(fr->rc); } remove_block_id(p->nf->procid); p->nf->procid = add_block_fn(&fetch_send_some,p); return(1); } static void fetch_add_errstr(IOP_FETCH_PRIV *p, char *errstr) { if (p->errs) { char *s; asprintf(&s,"%s / %s",p->errs,errstr); free(p->errs); p->errs = s; free(errstr); } else { p->errs = errstr; } } static int iop_fetch_tstop(IOP_FETCH_PRIV *p, unsigned char c) { __label__ errjmp; char *errstr; void err(const char *fmt, ...) { va_list ap; va_start(ap,fmt); vasprintf(&errstr,fmt,ap); va_end(ap); goto errjmp; } if (c) return(0); p->tstop = strdup(&p->nf->istr[0]); p->nf->istrlen = 0; shutdown(p->nf->fd,SHUT_RD); remove_block_id(p->nf->procid); p->start = timestamp_parse(p->tstart,&err); p->stop = timestamp_parse(p->tstop,&err); if (0) { errjmp:; fetch_add_errstr(p,errstr); } if (p->errs) { printf("end of request, errors\n"); // debugging oq_queue_free(&p->nf->oq,p->errs,OQ_STRLEN); p->nf->procid = add_block_fn(&destroy_when_drained,p->nf); } else if (p->stop <= p->start) { printf("end of request, empty time interval\n"); // debugging oq_queue_point(&p->nf->oq,"",1); p->nf->procid = add_block_fn(&destroy_when_drained,p->nf); } else { printf("end of request, sending data [%llu,%llu)\n",(unsigned long long int)p->start,(unsigned long long int)p->stop); // debugging p->nf->procid = add_block_fn(&fetch_start_send,p); } oq_queue_point(&p->nf->oq,"",1); return(1); } static int iop_fetch_tstart(IOP_FETCH_PRIV *p, unsigned char c) { if (c) return(0); p->tstart = strdup(&p->nf->istr[0]); p->nf->istrlen = 0; p->perchar = &iop_fetch_tstop; return(1); } static int iop_fetch_repos(IOP_FETCH_PRIV *p, unsigned char c) { __label__ errjmp; REPO *r; FETCH_REPO *fr; char *errstr; char *n; void err(const char *fmt, ...) { va_list ap; va_start(ap,fmt); vasprintf(&errstr,fmt,ap); va_end(ap); goto errjmp; } switch (c) { case ' ': case '\0': n = blk_to_nulterm(&p->nf->istr[0],p->nf->istrlen-1); r = repo_lookup(p->nf->l,n,&err); do <"found"> { for (fr=p->repos;fr;fr=fr->link) { if (! strcmp(fr->name,n)) { free(n); break <"found">; } } fr = malloc(sizeof(FETCH_REPO)); fr->name = n; fr->r = r; fr->rc = 0; fr->link = p->repos; p->repos = fr; } while (0); if (0) { errjmp:; fetch_add_errstr(p,errstr); } p->nf->istrlen = 0; if (! c) p->perchar = &iop_fetch_tstart; return(1); break; } return(0); } static void iop_fetch_setup(NETFD *nf) { IOP_FETCH_PRIV *p; p = malloc(sizeof(IOP_FETCH_PRIV)); p->nf = nf; p->repos = 0; p->tstart = 0; p->tstop = 0; p->errs = 0; p->perchar = &iop_fetch_repos; nf->istrlen = 0; nf->ipriv = p; printf("starting fetch on %s\n",nf->text); // debugging } static int iop_fetch_perchar(NETFD *nf, unsigned char c) { IOP_FETCH_PRIV *p; if (iop_getstr_perchar(nf,c)) return(1); p = nf->ipriv; return((*p->perchar)(p,c)); } static void iop_fetch_eof(NETFD *nf) { printf("unexpected fetch EOF on %s\n",nf->text); netfd_destroy(nf); } static void iop_fetch_abort(NETFD *nf) { IOP_FETCH_PRIV *p; FETCH_REPO *fr; printf("aborting fetch on %s\n",nf->text); // debugging p = nf->ipriv; while (p->repos) { fr = p->repos; p->repos = fr->link; free(fr->name); if (fr->rc) repo_cursor_close(fr->rc); free(fr); } free(p->tstart); free(p->tstop); free(p); } static const IOPS iops_fetch = IOPS_INIT(fetch); static void iop_pickop_setup(NETFD *nf) { nf->istrlen = 0; } static int iop_pickop_perchar(NETFD *nf, unsigned char c) { if (iop_getstr_perchar(nf,c)) return(1); if (c) return(0); if (! strcmp(&nf->istr[0],"new-data")) { printf("new-data from %s\n",nf->text); if (nf->l->permit & OP_NEW) { netfd_set_iops(nf,&iops_new); } else { printf("--> not permitted\n"); netfd_destroy(nf); } } else if (! strcmp(&nf->istr[0],"fetch")) { printf("fetch from %s\n",nf->text); if (nf->l->permit & OP_FETCH) { netfd_set_iops(nf,&iops_fetch); } else { printf("--> not permitted\n"); netfd_destroy(nf); } } else { printf("bad key `%s' from %s\n",&nf->istr[0],nf->text); netfd_destroy(nf); } return(1); } static void iop_pickop_eof(NETFD *nf) { printf("EOF while reading pickop string from %s\n",nf->text); // debugging netfd_destroy(nf); } static void iop_pickop_abort(NETFD *nf __attribute__((__unused__))) { } static const IOPS iops_pickop = IOPS_INIT(pickop); void netfd_new(int fd, LISTEN *l, char *text) { NETFD *nf; listen_ref(l); nf = malloc(sizeof(NETFD)); nf->fd = fd; nf->ioid = add_poll_fd(fd,&rtest_netfd_data,&wtest_netfd_data,&rd_netfd_data,&wr_netfd_data,nf); nf->l = l; nf->text = text; oq_init(&nf->oq); nf->tmofd = socket(AF_TIMER,SOCK_STREAM,0); nf->tmoid = add_poll_fd(nf->tmofd,&rwtest_always,&rwtest_never,&rd_netfd_tmo,0,nf); nf->ieof = 0; nf->irh = 0; nf->irt = 0; nf->irn = 0; nf->procid = add_block_fn(&netfd_process,nf); reset_timeout(nf); netfd_set_iops(nf,&iops_pickop); printf("new connection: %s\n",text); // debugging }