/* This file is in the public domain. */ #include #include #include #include #include #include #include extern const char *__progname; #include "oq.h" #include "util.h" #include "repo.h" #include "time.h" #include "listen.h" #include "structs.h" #include "pollloop.h" #include "netfd.h" /* * Network interface communication code. This manages network * connections once they've come up in the underlying network sense. */ /* * Manifest constants. * * IRINGSIZE is the size of the ring buffer we read stuff from the * network connection into. This does not need to be large for any * reason other than performance. * * ISTRSIZE is the maximum size of a protocol string - a repository * name, a datum, a timestamp, etc. * * NETFD_TIMEOUT is the idle timeout, in seconds. */ #define IRINGSIZE 15000 #define ISTRSIZE 1000 #define NETFD_TIMEOUT 60 typedef struct netfd NETFD; typedef struct iops IOPS; /* * A NETFD is a network connection. * * fd is the underlying socket file descriptor. * * ioid is the poll ID for I/O on the descriptor, as returned by * add_poll_id(). * * l is the LISTEN this descriptor was created from. This is important * because that's how we get hold of permissions and mappings and * repositories. * * text is a text string loosely identifying this connection, for use * in intended-for-humans messages. * * oq is the OQ of data pending to be written to the connection. * * tmofd and tmoid are the file descriptor and poll ID of the AF_TIMER * socket used to impose a timeout on the connection. * * ieof is a boolean indicating whether the connection has reached EOF * on input. * * iring is the first level input data ring buffer, with irh and irt * its head and tail; irn is the current fill count for it. * * procid is the block ID of the function currently being used to * process stuff for this NETFD, as returned by add_block_fn(), or * PL_NOID if there is none at the moment. * * istr is a buffer for acumulating protocol input strings; istrlen is * its current length. Overflowing this is a fatal protocol error. * * now is the current time, in nanoseconds, or zero if it's not * currently known. This is reset when more data is read from the * network connection and set whenever it's needed (see record_data). * * iops and ipriv are an OO dispatch vector and associated instance * data for handling input from this NETFD. This changes depending on * the operation being performed (at the coarse level, as in, "submit * new data"). There's one such per IOPS declared below. */ struct netfd { int fd; int ioid; LISTEN *l; char *text; OQ oq; int tmofd; int tmoid; int ieof; unsigned char iring[IRINGSIZE]; unsigned int irh; unsigned int irt; unsigned int irn; int procid; unsigned char istr[ISTRSIZE]; int istrlen; unsigned long long int now; const IOPS *iops; void *ipriv; } ; /* * Method dispatch vector for iops in NETFD, above. */ struct iops { /* * (*setup)() is called once, when the NETFD's iops is changed. It's * responsible for allocating and initializing any instance-private * data and storing it in the NETFD's ipriv. (Any former instance * has already been torn down.) */ void (*setup)(NETFD *); /* * (*perchar)() is called for each char received; it's passed the * NETFD pointer and the char in question and mus return zero if * nothing special is to be done or nonzero if something special has * been done and we should go around the poll loop again. */ int (*perchar)(NETFD *, unsigned char); /* * (*eof)() is called when EOF is read from the network. (The way * the protocol is designed, it usually won't have much to do.) */ void (*eof)(NETFD *); /* * (*abort)() is called to tear down any private data for the * isntance. THis is called when switching to a different ops * vector and when tearing down the NETFD itself. */ void (*abort)(NETFD *); } ; /* * Initializer for an IOPS, assuming a regular function naming scheme. */ #define IOPS_INIT(name) { \ &iop_##name##_setup, \ &iop_##name##_perchar, \ &iop_##name##_eof, \ &iop_##name##_abort, \ } /* * Reset a NETFD's timeout. This should be called periodically when * the NETFD is active. It is also called once during setup to start * the timeout in the first place. */ static void reset_timeout(NETFD *nf) { struct itimerval itv; itv.it_value.tv_sec = NETFD_TIMEOUT; itv.it_value.tv_usec = 0; itv.it_interval.tv_sec = 0; itv.it_interval.tv_usec = 0; write(nf->tmofd,&itv,sizeof(itv)); } /* * Tear down the NETFD. Close everything, free internal data * structures, etc. All forms of NETFD teardown eventually call this. */ static void netfd_destroy(NETFD *nf) { printf("destroying %s\n",nf->text); // debugging close(nf->fd); remove_poll_id(nf->ioid); listen_deref(nf->l); free(nf->text); oq_flush(&nf->oq); close(nf->tmofd); remove_poll_id(nf->tmoid); if (nf->procid != PL_NOID) remove_block_id(nf->procid); (*nf->iops->abort)(nf); free(nf); } /* * Read test function for a NETFD's network connection. We want to * read input whenever we haven't reached EOF and there's room in the * ring buffer. */ static int rtest_netfd_data(void *nfv) { NETFD *nf; nf = nfv; return( !nf->ieof && (((NETFD *)nfv)->irn < IRINGSIZE) ); } /* * Write test function for a NETFD's network connection. We want to * write whenever anything's queued. */ static int wtest_netfd_data(void *nfv) { return(oq_nonempty(&((NETFD *)nfv)->oq)); } /* * Read function for a NETFD's network connetion. We just try to read * into the ring buffer and do housekeeping like resetting `now'. * This could perhaps be optimized a bit by doing a readv() if the * available space wraps around (which it usually will), but I expect * the available data to wrap around fairly seldom, so I'm not * convinced it's worth it. * * We don't actually do anything with the data here. It's up to the * NETFD's block function to notice the presence of data in the ring * buffer and do something with it (or not, as appropriate). */ static void rd_netfd_data(void *nfv) { NETFD *nf; int want; int got; nf = nfv; if (nf->ieof) return; want = IRINGSIZE - nf->irn; if (want > IRINGSIZE-nf->irh) want = IRINGSIZE - nf->irh; got = read(nf->fd,&nf->iring[nf->irh],want); if (got < 0) { switch (errno) { case EINTR: case NONBLOCKING: return; } fprintf(stderr,"%s: %s: read: %s\n",__progname,nf->text,strerror(errno)); netfd_destroy(nf); return; } nf->now = 0; if (got == 0) { nf->ieof = 1; printf("read EOF from %s\n",nf->text); // debugging return; } nf->irh += got; nf->irn += got; if (nf->irh > IRINGSIZE) abort(); if (nf->irh == IRINGSIZE) nf->irh = 0; printf("read %d from %s\n",got,nf->text); // debugging } /* * Write function for a NETFD's network connection. This is a totally * stock OQ write function, calling oq_writev, handling errors, and * using oq_dropdata to drop any written data. It also resets the * NETFD's timeout; a successful send to the network connection counts * as activity. */ static void wr_netfd_data(void *nfv) { NETFD *nf; int w; nf = nfv; w = oq_writev(&nf->oq,nf->fd); if (w < 0) { switch (errno) { case EINTR: case NONBLOCKING: return; break; } fprintf(stderr,"%s: %s: writev: %s\n",__progname,nf->text,strerror(errno)); netfd_destroy(nf); return; } oq_dropdata(&nf->oq,w); reset_timeout(nf); } /* * Read function for a NETFD's timeout socket. Getting here at all * indicates the timeout has expired, so just tear it down. */ static void rd_netfd_tmo(void *nfv) { netfd_destroy(nfv); } /* * This is the processing block function for a NETFD for most of its * existence. It just pulls chars out of the ring buffer and calls * the current perchar function on each one; if EOF has been reached * and the ring buffer is drained, calls the current EOF function. */ static int netfd_process(void *nfv) { NETFD *nf; unsigned char c; nf = nfv; if (nf->irn < 1) { if (nf->ieof) { remove_block_id(nf->procid); nf->procid = PL_NOID; (*nf->iops->eof)(nf); return(BLOCK_LOOP); } return(BLOCK_NIL); } while (nf->irn > 0) { c = nf->iring[nf->irt]; nf->irt ++; if (nf->irt >= IRINGSIZE) nf->irt = 0; nf->irn --; if ((*nf->iops->perchar)(nf,c)) break; } return(BLOCK_LOOP); } /* * Ensure the NETFD has a valid now value. * * Note that we never leave now set to zero; if we somehow manage to * run at exactly the moment when time is zero, we lie by 1ns. */ static void netfd_get_now(NETFD *nf) { if (nf->now == 0) { struct timeval tv; gettimeofday(&tv,0); nf->now = (tv.tv_sec * 1000000000ULL) + (tv.tv_usec * 1000ULL); if (nf->now == 0) nf->now = 1; } } /* * Internal function to record data. Called from new-data processing. * * Just parse the timestamp, look up the repo, and tell the repo to * record the data. The closest thing to noteworthy here is errwhat, * which records the variant part of the message generated on error. */ static void record_data(NETFD *nf, char *name, char *stamp, char *data) { __label__ errjmp; TIMESTAMP ts; REPO *r; const char *errwhat; char *errstr; int errlen; void err(const char *fmt, ...) { va_list ap; va_start(ap,fmt); errlen = vasprintf(&errstr,fmt,ap); va_end(ap); goto errjmp; } unsigned long long int getnow(void) { netfd_get_now(nf); return(nf->now); } printf("record_data: name=%s stamp=%s data=%s\n",name,stamp,data); // debugging errwhat = "timestamp parse"; ts = timestamp_parse(stamp,&err,&getnow); errwhat = "repo lookup"; r = repo_lookup(nf->l,name,&err); printf("calling repo to record\n"); // debugging errwhat = "data recording"; repo_record(r,ts,data,&err); if (0) { errjmp:; printf("%s error [%s]\n",errwhat,errstr); // debugging oq_queue_free(&nf->oq,errstr,errlen); } oq_queue_point(&nf->oq,"",1); } /* * Set a NETFD's iops. Assumes the previous one, if any, has already * been torn down. Not a lot to do here. */ static void netfd_set_iops(NETFD *nf, const IOPS *ops) { nf->iops = ops; (*ops->setup)(nf); } /* * The common part of perchar functions that want to accumulate a * string in istr/istrlen. */ static int iop_getstr_perchar(NETFD *nf, unsigned char c) { if (nf->istrlen >= ISTRSIZE) { netfd_destroy(nf); return(1); } nf->istr[nf->istrlen++] = c; return(0); } /* * A processing function that's installed when all that remains to be * done is drain queued output to the network and tear the NETFD down. */ static int destroy_when_drained(void *nfv) { NETFD *nf; nf = nfv; if (oq_nonempty(&nf->oq)) return(BLOCK_NIL); printf("output drained, shutting down %s\n",nf->text); remove_block_id(nf->procid); netfd_destroy(nf); return(BLOCK_LOOP); } /* * New-data processing. */ /* * Private data for new-data procesing. All we do is buffer strings in * triples: (repository) name, timestamp, and data. eos is called at * the end of each string; it is changed after each string. */ typedef struct iop_new_priv IOP_NEW_PRIV; struct iop_new_priv { char *name; char *stamp; char *data; void (*eos)(NETFD *, IOP_NEW_PRIV *); } ; /* * The three eos functions. We can't get away without a forward * declaration of at least one of the three, not without switching * paradigms altogether. */ static void iop_new_gotname(NETFD *, IOP_NEW_PRIV *); static void iop_new_gotstamp(NETFD *, IOP_NEW_PRIV *); static void iop_new_gotdata(NETFD *, IOP_NEW_PRIV *); /* * We've just finished a repository name. Save it and arrange to call * iop_new_gotstamp at end of next string - but, first, check; if the * name is zero-length, what we actually have is an end-of-data * indication, which calls for totally different handling. */ static void iop_new_gotname(NETFD *nf __attribute__((__unused__)), IOP_NEW_PRIV *p) { printf("new-data name `%s' on %s\n",&nf->istr[0],nf->text); // debugging if (! nf->istr[0]) { shutdown(nf->fd,SHUT_RD); remove_block_id(nf->procid); nf->procid = add_block_fn(&destroy_when_drained,nf); printf("closing down after drain\n"); // debugging return; } p->name = strdup(&nf->istr[0]); p->eos = &iop_new_gotstamp; } /* * We've just finshed a timestamp. Save it and arrange to call * iop_new_gotdata at end of next string. */ static void iop_new_gotstamp(NETFD *nf __attribute__((__unused__)), IOP_NEW_PRIV *p) { printf("new-data stamp `%s' on %s\n",&nf->istr[0],nf->text); // debugging p->stamp = strdup(&nf->istr[0]); p->eos = &iop_new_gotdata; } /* * We've just finshed a data string, which also means the end of a * triple. Save it, call record_data to save the * item, free and clear all thre strings, and arrange to call * iop_new_gotname at end of next string. */ static void iop_new_gotdata(NETFD *nf, IOP_NEW_PRIV *p) { printf("new-data data `%s' on %s\n",&nf->istr[0],nf->text); // debugging p->data = strdup(&nf->istr[0]); record_data(nf,p->name,p->stamp,p->data); free(p->name); p->name = 0; free(p->stamp); p->stamp = 0; free(p->data); p->data = 0; p->eos = &iop_new_gotname; } /* * This is the setup method for new-data processing. Allocate and set * up the IOP_NEW_PRIV, reset for a new string, and set the NETFD's * ipriv to our IOP_NEW_PRIV. */ static void iop_new_setup(NETFD *nf) { IOP_NEW_PRIV *p; p = malloc(sizeof(IOP_NEW_PRIV)); p->name = 0; p->stamp = 0; p->data = 0; p->eos = &iop_new_gotname; nf->istrlen = 0; nf->ipriv = p; printf("starting new-data on %s\n",nf->text); // debugging } /* * This is the perchar method for new-data processing. Call * iop_getstr_perchar to save the char; if this overruns the buffer, * iop_getstr_perchar will destroy the NETFD; all we need to do is * return in that case. Otherwise, do nothing more unless it's \0, in * which case call the current eos method and reset for a new string. * * This depends on the eos method to never destory the NETFD (because * we access it after that call). */ static int iop_new_perchar(NETFD *nf, unsigned char c) { IOP_NEW_PRIV *p; if (iop_getstr_perchar(nf,c)) return(1); if (c) return(0); p = nf->ipriv; (*p->eos)(nf,p); nf->istrlen = 0; return(1); } /* * EOF method for new-data processing. Not much to do here; the * protocol is self-terminating, so any EOF that makes it through to * here is an unexpected one. Just squawk and destroy the NETFD. */ static void iop_new_eof(NETFD *nf) { printf("unexpected new-data EOF on %s\n",nf->text); netfd_destroy(nf); } /* * Done with new-data processing. Tear down the private data blob. */ static void iop_new_abort(NETFD *nf) { IOP_NEW_PRIV *p; printf("aborting new-data on %s\n",nf->text); // debugging p = nf->ipriv; free(p->name); free(p->stamp); free(p->data); free(p); } /* * Tne new-data IOPS. */ static const IOPS iops_new = IOPS_INIT(new); /* * Old-data fetch processing. */ /* * Private data for fetch procesing. We accumulate a set of repository * names (not a set of repositories; it's possible for two external * names to map to the same internal name), each one with its * repository (and a REPO_CURSOR for actually generating data). We * also have start and stop timestamps (string and parsed forms). * There's a string of accumulated errors, so multiple erroneous * repository names' errors can be collected and reported together in * the single string the protocol provides, and a perchar function, * which serves basically the same purpose as a NETFD's perchar * method: context-dependent per-character handling. We can't make * this totally common the way new-data processing does because some * strings are terminated by space or \0 and some always by \0, so * it's either this or a flag somewhere, and this is more in keeping * with the OO paradigm the rest of the code uses. */ typedef struct iop_fetch_priv IOP_FETCH_PRIV; typedef struct fetch_repo FETCH_REPO; struct fetch_repo { FETCH_REPO *link; char *name; REPO *r; REPO_CURSOR *rc; } ; struct iop_fetch_priv { NETFD *nf; FETCH_REPO *repos; char *tstart; char *tstop; TIMESTAMP start; TIMESTAMP stop; char *errs; int (*perchar)(IOP_FETCH_PRIV *, unsigned char); } ; /* * Processing function for sending data in response to a fetch request. * If the output queue is too full, do nothing; otherwise, crank out * entries until we either run out (all REPO_CURSORs hit EOF) or fill * up the queue. */ static int fetch_send_some(void *pv) { IOP_FETCH_PRIV *p; FETCH_REPO *fr; int any; TIMESTAMP ts; FETCH_REPO *bestfr; TIMESTAMP bestts; p = pv; if (oq_qlen(&p->nf->oq) >= 65536) return(BLOCK_NIL); printf("sending data chunk\n"); // debugging while (oq_qlen(&p->nf->oq) < 65536) { any = 0; bestfr = 0; for (fr=p->repos;fr;fr=fr->link) { if (fr->rc) { if (repo_cursor_eof(fr->rc)) { repo_cursor_close(fr->rc); fr->rc = 0; continue; } any = 1; ts = repo_cursor_stamp(fr->rc); if (!bestfr || (ts < bestts)) { bestfr = fr; bestts = ts; } } } if (! any) { oq_queue_point(&p->nf->oq,"",1); remove_block_id(p->nf->procid); p->nf->procid = add_block_fn(&destroy_when_drained,p->nf); break; } oq_queue_point(&p->nf->oq,bestfr->name,OQ_STRLEN); oq_queue_point(&p->nf->oq,"",1); oq_queue_printf(&p->nf->oq,"%llu",bestts); oq_queue_point(&p->nf->oq,"",1); oq_queue_copy(&p->nf->oq,repo_cursor_data(bestfr->rc),OQ_STRLEN); oq_queue_point(&p->nf->oq,"",1); repo_cursor_advance(bestfr->rc); } return(BLOCK_LOOP); } /* * Processing function for starting to send data in response to a fetch * request. This could and arguably should be inlined in * iop_fetch_tstop.... * * Open and rewind all the cursors, then start cranking entries out via * fetch_send_some. */ static int fetch_start_send(void *pv) { IOP_FETCH_PRIV *p; FETCH_REPO *fr; p = pv; printf("starting to send data\n"); // debugging for (fr=p->repos;fr;fr=fr->link) { fr->rc = repo_cursor_open(fr->r,p->start,p->stop); repo_cursor_rewind(fr->rc); } remove_block_id(p->nf->procid); p->nf->procid = add_block_fn(&fetch_send_some,p); return(BLOCK_LOOP); } /* * Internal utility function to append an error string to the * accumulated-errors string. The input string must be mallocked and * is taken over (either freed or saved and freed later) by this * function. */ static void fetch_add_errstr(IOP_FETCH_PRIV *p, char *errstr) { if (p->errs) { char *s; asprintf(&s,"%s / %s",p->errs,errstr); free(p->errs); p->errs = s; free(errstr); } else { p->errs = errstr; } } /* * Per-char callback for accumulating the ending timestamp of a fetch * operation. When the string ends, we close down the client->server * data stream, parse the timestamps, and then do one of three things: * (1) if we have errors, return the error string and switch to * destroy-when-drained processing; (2) if we have a trivially vacuous * timestamp interval, just send a second \0 and switch to * destroy-when-drained processing; or (3) switch to fetch_start_send * processing to kick off sending the data. In any case, then send a * \0 (all three options call for us to generate at least one) and * return 1 to indicate something has changed. */ static int iop_fetch_tstop(IOP_FETCH_PRIV *p, unsigned char c) { __label__ errjmp; char *errstr; void err(const char *fmt, ...) { va_list ap; va_start(ap,fmt); vasprintf(&errstr,fmt,ap); va_end(ap); goto errjmp; } if (c) return(0); p->tstop = strdup(&p->nf->istr[0]); p->nf->istrlen = 0; shutdown(p->nf->fd,SHUT_RD); remove_block_id(p->nf->procid); p->start = timestamp_parse(p->tstart,&err,0); p->stop = timestamp_parse(p->tstop,&err,0); if (0) { errjmp:; fetch_add_errstr(p,errstr); } if (p->errs) { printf("end of request, errors\n"); // debugging oq_queue_free(&p->nf->oq,p->errs,OQ_STRLEN); p->nf->procid = add_block_fn(&destroy_when_drained,p->nf); } else if (p->stop <= p->start) { printf("end of request, empty time interval\n"); // debugging oq_queue_point(&p->nf->oq,"",1); p->nf->procid = add_block_fn(&destroy_when_drained,p->nf); } else { printf("end of request, sending data [%llu,%llu)\n",(unsigned long long int)p->start,(unsigned long long int)p->stop); // debugging p->nf->procid = add_block_fn(&fetch_start_send,p); } oq_queue_point(&p->nf->oq,"",1); return(1); } /* * Per-char callback for accumulating the starting timestamp of a * fetch. Just wait until it ends, then save it and switch to * accumulating the ending timestamp. */ static int iop_fetch_tstart(IOP_FETCH_PRIV *p, unsigned char c) { if (c) return(0); p->tstart = strdup(&p->nf->istr[0]); p->nf->istrlen = 0; p->perchar = &iop_fetch_tstop; return(1); } /* * Per-char function to accumulate repository names for a fetch * operation. Do nothing special until getting a terminator, at which * point look the repo up and, if the name is one we don't already * have, save it in a new FETCH_REPO. Then either just reset for the * next name (if the terminator was a space) or reset and switch to * getting the start timestamp (if it was a \0). */ static int iop_fetch_repos(IOP_FETCH_PRIV *p, unsigned char c) { __label__ errjmp; REPO *r; FETCH_REPO *fr; char *errstr; char *n; void err(const char *fmt, ...) { va_list ap; va_start(ap,fmt); vasprintf(&errstr,fmt,ap); va_end(ap); goto errjmp; } switch (c) { case ' ': case '\0': n = blk_to_nulterm(&p->nf->istr[0],p->nf->istrlen-1); r = repo_lookup(p->nf->l,n,&err); do <"found"> { for (fr=p->repos;fr;fr=fr->link) { if (! strcmp(fr->name,n)) { free(n); break <"found">; } } fr = malloc(sizeof(FETCH_REPO)); fr->name = n; fr->r = r; fr->rc = 0; fr->link = p->repos; p->repos = fr; } while (0); if (0) { errjmp:; fetch_add_errstr(p,errstr); } p->nf->istrlen = 0; if (! c) p->perchar = &iop_fetch_tstart; return(1); break; } return(0); } /* * This is the setup method for fetch processing. Allocate and set * up the IOP_FETCH_PRIV, reset for a new string, and set the NETFD's * ipriv to our IOP_FETCH_PRIV. */ static void iop_fetch_setup(NETFD *nf) { IOP_FETCH_PRIV *p; p = malloc(sizeof(IOP_FETCH_PRIV)); p->nf = nf; p->repos = 0; p->tstart = 0; p->tstop = 0; p->errs = 0; p->perchar = &iop_fetch_repos; nf->istrlen = 0; nf->ipriv = p; printf("starting fetch on %s\n",nf->text); // debugging } /* * This is the perchar method for data fetch requests. We just call * iop_getstr_perchar to do the common processing, then call out to * the current per-phase per-char function. */ static int iop_fetch_perchar(NETFD *nf, unsigned char c) { IOP_FETCH_PRIV *p; if (iop_getstr_perchar(nf,c)) return(1); p = nf->ipriv; return((*p->perchar)(p,c)); } /* * This is the EOF method for data fetch requests. Because the * protocol is self-terminating, any EOF that reaches here is * unexpected; just squawk and destroy the NETFD. */ static void iop_fetch_eof(NETFD *nf) { printf("unexpected fetch EOF on %s\n",nf->text); netfd_destroy(nf); } /* * This is the abort method for data fetch requests. Tear down the * IOP_FETCH_PRIV, including the list of repos. */ static void iop_fetch_abort(NETFD *nf) { IOP_FETCH_PRIV *p; FETCH_REPO *fr; printf("aborting fetch on %s\n",nf->text); // debugging p = nf->ipriv; while (p->repos) { fr = p->repos; p->repos = fr->link; free(fr->name); if (fr->rc) repo_cursor_close(fr->rc); free(fr); } free(p->tstart); free(p->tstop); free(p); } /* * Tne fetch IOPS. */ static const IOPS iops_fetch = IOPS_INIT(fetch); /* * Setup method for choosing what oepration to perform. Just reset for * a new string - don't even bother with a private data blob. */ static void iop_pickop_setup(NETFD *nf) { nf->istrlen = 0; } /* * Per-character method for choosing what oepration to perform. Use * ipo_getstr_perchar to accumulate the method name; upon geting the * \0, switch out on it and either error out (if it's unknokwn or not * permitted to this LISTEN) or switch to the appropriate methods * (otherwise). */ static int iop_pickop_perchar(NETFD *nf, unsigned char c) { if (iop_getstr_perchar(nf,c)) return(1); if (c) return(0); if (! strcmp(&nf->istr[0],"new-data")) { printf("new-data from %s\n",nf->text); if (nf->l->permit & OP_NEW) { netfd_set_iops(nf,&iops_new); } else { printf("not permitted\n"); netfd_destroy(nf); } } else if (! strcmp(&nf->istr[0],"fetch")) { printf("fetch from %s\n",nf->text); if (nf->l->permit & OP_FETCH) { netfd_set_iops(nf,&iops_fetch); } else { printf("not permitted\n"); netfd_destroy(nf); } } else { printf("bad key `%s' from %s\n",&nf->istr[0],nf->text); netfd_destroy(nf); } return(1); } /* * EOF method for choosing what oepration to perform. EOFs at this * phase are always unpexcted and therefore erroneous. */ static void iop_pickop_eof(NETFD *nf) { printf("EOF while reading operation string from %s\n",nf->text); // debugging netfd_destroy(nf); } /* * Abort method for choosing what oepration to perform. Since we have * no private data blob, there's nothing to do here. */ static void iop_pickop_abort(NETFD *nf __attribute__((__unused__))) { } /* * Tne choose-operation IOPS. */ static const IOPS iops_pickop = IOPS_INIT(pickop); /* * External entry point. Add a ref to the LISTEN, allocate the NETFD, * set it up, and start things off at the choose-operation phase. */ void netfd_new(int fd, LISTEN *l, char *text) { NETFD *nf; listen_ref(l); nf = malloc(sizeof(NETFD)); nf->fd = fd; nf->ioid = add_poll_fd(fd,&rtest_netfd_data,&wtest_netfd_data,&rd_netfd_data,&wr_netfd_data,nf); nf->l = l; nf->text = text; oq_init(&nf->oq); nf->tmofd = socket(AF_TIMER,SOCK_STREAM,0); nf->tmoid = add_poll_fd(nf->tmofd,&rwtest_always,&rwtest_never,&rd_netfd_tmo,0,nf); nf->ieof = 0; nf->irh = 0; nf->irt = 0; nf->irn = 0; nf->procid = add_block_fn(&netfd_process,nf); nf->now = 0; reset_timeout(nf); netfd_set_iops(nf,&iops_pickop); printf("new connection: %s\n",text); // debugging }