/* This file is in the public domain. */ #include #include #include #include #include #include "structs.h" #include "pollloop.h" #include "repo.h" /* * Implementation of repo.h's interface. In this implementation, a * repository is a directory. Trying to open a nonexistent directory * will try to mkdir(2) it; if this succeeds, it is set up for use. * Historical files are rolled over, kept in subdirectories if * necessary to cut down on directory loading. Each directory * contains a file called `list'; this file contains an integer level * value, then zero or more data listings. The integer level value is * zero if the list file is sibling to the data files, 1 if data files * appear in directories which are sibling to the list file, 2 for one * more level, etc. (Data subdirectories will themselves have list * files, whose level values will be one less than the parent * directory's list file level.) * * Each data listing consists of a name, sibling to the list file, * followed by two timestamp values, each preceded by a space. These * are the tightest possible timestamp window that includes all data * from that file-or-directory. These names represent the numbers * from 0 through BRANCHING-1 and are formatted with BRANCHFMT_FMT and * BRANCHFMT_ARGS (see below for these #defines). * * Only historical data files are managed using this scheme. The live * data file is kept in the top-level directory in a file called "cur" * and does not appear in any list file. The top-level directory also * contains a "count" file which contains the number of historical * files, in decimal. * * Care needs to be taken when rearranging structure, when rolling over * files. There are two cases: (1) the common case, which consists of * rolling the cur file over into a historical file and opening a new * cur file, and (2) the rare case, when rolling the cur file over * fills up a level and we have to deepen the hierarchy. These * require care in case the system, or scd itself, crashes during the * operation. In each case, we put markers in the filesystem which * are then noticed when opening the repository on startup later. * * Case (1) is handled with: * * (1.1) rename(2) "cur" to "rollver". * (1.2) Open a new "cur". fsync(2) it. * (1.3) Create any new directories needed. * (1.4) link(2) "rollover" to its final place. * (1.5) Update the list files, fsync(2)ing each one. * (1.6) Write the new count to "count+", fsync(2)ing it. * (1.7) rename(2) "count+" to "count". * (1.8) unlink(2) "rollover". * * When "rollover" exists on startup, the recovery procedure is: * * (1r.1) If "cur" is a hardlink to "rollover", they system * crashed during the rename(2) in 1.1. Just unlink(2) * "cur" and proceed as above, starting a 1.2. * (1r.2) If "cur" is nonexistent, something crashed between 1.1 * and 1.2. Just pick up the above starting with 1.2. * (1r.3a) If "count+" exists: * (1r.3a.1) If "count" is hardlinked to "count+", the system * crashed during the rename(2) in 1.7. Just unlink * "count" and proceed with 1.8. * (1r.3a.2) The crash occurred after 1.6 began but before 1.7 * finished. Unlink "count+" and pick up starting with * step 1.6. * (1r.3b) If "count+" does not exist: * (1r.3b.1a) Check the file referred to by the count in "count". * If it's a hardlink to "rollover": * (1r.3b.1a.1) The crash occurred betwen 1.7 and 1.8. Pick up * starting with 1.8. * (1r.3b.1b) Check the file referred to by a count one more than * the one in in "count". If it exists and is a hardlink * to "rollover": * (1r.3b.1b.1) The crash occurred between 1.4 and 1.6. Rebuild * the list files for the relevant directories, then pick * up with 1.6. * (1r.3b.1c) Otherwise: * (1r.3b.1c.1) The crash occurred between 1.2 and 1.4. Pick up * starting with 1.3. * * Case (2) is handled with: * * (2.1) Create a directory "deepen". * (2.2) For each current file/directory, rename(2) it into * deepen/. * (2.3) rename(2) the top-level list file into deepen/. * (2.4) Write a new top-level list file. fsync(2) it. * (2.5) rename(2) "deepen" to its final name. * (2.6) Carry on with the rollover as usual, now that there is * room for it in the hierarchy. This includes the full * procedure for case (1). * * When "deepen" exists on startup, the recover procedure is: * * (2r.1a) If deepen/list doesn't exist, the crash occurred during * 2.2. Rename any current files/directories which remain * and pick up with 2.3. * (2r.1b) If deepen/list exists and is a hardlink to the * top-level list, the system crashed during the rename(2) * for 2.3. Just unlink92) the top-level list and pick up * with 2.4. * (2r.1c) If deepen/list exists and the top-level list does not * exist, the crash occurred between 2.3 and 2.4. Pick up * with 2.4. * (2r.1d) If deepen/list exists and the top-level list exists but * is not a hardlink to deepen/list, the crash occurred * after 2.4 started but before 2.5. unlink(2) the * top-level list file and pick up with 2.4. */ /* * Use an artificially low branching factor to test the deepening code. * * BRANCHING is the branching factor for the directory tree. * * BRANCHFMT_FMT is a printf format; BRANCHFMT_ARGS constructs the * corresponding argument(s), given an integer argument from 0 through * BRANCHING-1. BRANCHFMT_ARGS may expand to multiple arguments and * its argument may appear more than once in its expansion. * BRANCHFMT_* must be suitable for use in a call like * * printf( "..." BRANCHFMT_FMT "...", ..., BRANCHFMT_ARGS(x), ...) */ #define BRANCHING 4 #define BRANCHFMT_FMT "%d%d" #define BRANCHFMT_ARGS(n) (((n)>>1)&1),((n)&1) typedef struct pendwrite PENDWRITE; /* * A REPO's private data. In this implementation, this consists of * * - A FILE * open onto cur. * * - The path to the directory. * * - A struct stat for the top-level directory, for comparison with * other REPO_PRIVs. * * - A count of readers reading this repo. * * - A queue of pending writes (writes and wtail; we use a tconc * structure so as to preserve write order). * * - The block ID of the block function trying to write, or PL_NOID if * none is active. */ struct repo_priv { FILE *f; char *name; struct stat stb; int readers; PENDWRITE *writes; PENDWRITE **wtail; int writeid; } ; /* * A REPO_CURSOR. This contains: * * r, a backpointer to the REPO. * * loc, where to start reading for the next possible sample. * * start and stop, the TIMESTAMPs giving its time window. * * eof, a boolean indicating it's run out of samples. * * ts, the current sample's timestmap, when it has a sample. * * lb, la, and ll, which are an elastic buffer into which we read lines * from the data file. * * dataat, which is the offset into lb of the sample's data string, * when the cursor has a sample. * * There is no explicit indication of whether the cursor has a sample; * this is a concept introduced to explain when it's meaningful to * call repo_cursor_stamp and repo_cursor_data, not anything with a * direct representation. */ struct repo_cursor { REPO *r; off_t loc; TIMESTAMP start; TIMESTAMP stop; int eof; TIMESTAMP ts; char *lb; int la; int ll; int dataat; } ; /* * A pending write. If a repo is being read, repo_record just creates * one of these and queues it for later processing. * * This just contains the linked-list link and the arguments passed to * repo_record (copied, in the case of the data). */ struct pendwrite { PENDWRITE *link; TIMESTAMP ts; char *data; } ; /* * Open a repository. Just do the open(2), error-check, and set up the * corresponding REPO_PRIV. */ REPO_PRIV *repo_open(const char *fn, void (*err)(const char *, ...)) { int fd; REPO_PRIV *p; fd = open(fn,O_RDWR,0666); if (fd < 0) (*err)("%s: %s",fn,strerror(errno)); p = malloc(sizeof(REPO_PRIV)); p->f = fdopen(fd,"r+"); fstat(fd,&p->stb); p->readers = 0; p->writes = 0; p->wtail = &p->writes; p->writeid = PL_NOID; return(p); } /* * Check of two REPO_PRIVs are open onto the same repository. We just * check if the files are the same by comparing st_dev and st_ino * numbers. */ int repo_same(REPO_PRIV *a, REPO_PRIV *b) { return( (a->stb.st_dev == b->stb.st_dev) && (a->stb.st_ino == b->stb.st_ino) ); } /* * Close down a REPO_PRIV. Pretty simple in this implementation. */ void repo_close(REPO_PRIV *p) { fclose(p->f); free(p); } /* * Free a whole REPO. Nothing noteworthy here. */ void repo_free(REPO *r) { free(r->name); free(r->filename); repo_close(r->priv); free(r); } /* * Free a whoel linked list of REPOs. Nothing noteworthy here. */ void repo_free_chain(REPO *chain) { REPO *r; while (chain) { r = chain; chain = r->link; repo_free(r); } } /* * Look up a REPO by name. * * We go through all mappings, checking their match functions, * remembering the highest priority that matched and the highest * priority that indicated an error. If there was an error and it * wasn't overriddden by a higher-priority non-error, error out; * otherwise, if there were no matches, error out; otherwise, have all * the MAPs at the highest priority map the name and check that they * all match (error if not). Once we have the mapped-to string, look * for the repo and return it (error if not found). */ REPO *repo_lookup(LISTEN *l, const char *name, void (*err)(const char *, ...)) { MAP *m; int maxpri; int errpri; char *mapped; char *mstr; REPO *r; maxpri = -1; errpri = -1; for (m=l->mappings;m;m=m->link) { if (m->pri < maxpri) continue; switch ((*m->ops->match)(m->priv,name)) { case MATCH_NOMATCH: m->flags &= ~MF_MATCHED; break; case MATCH_MATCH: m->flags |= MF_MATCHED; maxpri = m->pri; break; case MATCH_ERROR: errpri = m->pri; break; default: abort(); break; } } if ((errpri >= 0) && (maxpri <= errpri)) { printf("error match\n"); // debugging (*err)("%s: bad repository name",name); } if (maxpri < 0) { printf("no matching mapping\n"); // debugging (*err)("%s: unmatched repository name",name); } mapped = 0; for (m=l->mappings;m;m=m->link) { if ((m->pri != maxpri) || !(m->flags & MF_MATCHED)) continue; mstr = (*m->ops->map)(m->priv,name); printf("mapped to %s\n",mstr); // debugging if (mapped && strcmp(mstr,mapped)) { printf("mapping conflict\n"); free(mapped); free(mstr); (*err)("%s: ambiguous repository name",name); } free(mapped); mapped = mstr; } if (! mapped) abort(); if (l->conf) { for (r=l->conf->repos;r;r=r->link) { if (! strcmp(mapped,r->name)) { printf("repo found\n"); // debugging return(r); } } } else { printf("orphaned LISTEN\n"); // debugging } printf("no repo found\n"); // debugging free(mapped); (*err)("%s: unknown repository name",name); abort(); } /* * Block function for flushing pending writes to a REPO. * * Once we find the repo not being read, we write them (all at once) * and shut down our block ID. */ static int repo_write_pending(void *rv) { REPO *r; REPO_PRIV *p; PENDWRITE *w; r = rv; p = r->priv; if (p->readers) return(BLOCK_NIL); fseek(p->f,0,SEEK_END); while ((w = p->writes)) { p->writes = w->link; fprintf(p->f,"%llu %s\n",w->ts,w->data); free(w->data); free(w); } p->wtail = &p->writes; remove_block_id(p->writeid); p->writeid = PL_NOID; return(BLOCK_LOOP); } /* * Record the data with the associated timestamp to the repository. * * Either write the data or queue it for writing, depending on whether * anyone is currently reading. */ void repo_record(REPO *r, unsigned long long int ts, const char *data, void (*err)(const char *, ...)) { REPO_PRIV *p; p = r->priv; if (index(data,'\n')) (*err)("invalid data"); if (p->readers) { PENDWRITE *w; w = malloc(sizeof(PENDWRITE)); w->ts = ts; w->data = strdup(data); w->link = 0; *p->wtail = w; p->wtail = &w->link; if (p->writeid == PL_NOID) p->writeid = add_block_fn(&repo_write_pending,r); return; } else { fseek(p->f,0,SEEK_END); fprintf(p->f,"%llu %s\n",ts,data); fflush(p->f); } } /* * See if we can start reading a REPO. We can iff there are no writes * pending for it. */ int repo_canread(REPO *r) { return(!r->priv->writes); } /* * Open a cursor. All we do here is set up the data structure; opening * does not imply an automatic rewind or advance. * * This should never be called if repo_canread would return false. */ REPO_CURSOR *repo_cursor_open(REPO *r, TIMESTAMP start, TIMESTAMP stop) { REPO_CURSOR *c; printf("repo_cursor_open %s [%llu,%llu)\n",r->name,(unsigned long long int)start,(unsigned long long int)stop); // debugging if (r->priv->writes) abort(); c = malloc(sizeof(REPO_CURSOR)); c->r = r; c->start = start; c->stop = stop; c->lb = 0; c->la = 0; r->priv->readers ++; return(c); } /* * Close down a cursor. Nothing noteworthy here. */ void repo_cursor_close(REPO_CURSOR *c) { printf("repo_cursor_close\n"); // debugging c->r->priv->readers --; free(c->lb); free(c); } /* * Rewind a cursor. We do this by just setting its location to zero * and advancing. Don't forget to clear its EOF indication at the * same time. */ void repo_cursor_rewind(REPO_CURSOR *c) { printf("repo_cursor_rewind\n"); // debugging c->loc = 0; c->eof = 0; repo_cursor_advance(c); } /* * Return EOF indication. We have an instance variable for this. */ int repo_cursor_eof(REPO_CURSOR *c) { printf("repo_cursor_eof -> %d\n",c->eof); // debugging return(c->eof); } /* * Return sample timestamp. We have an instance variable for this. */ TIMESTAMP repo_cursor_stamp(REPO_CURSOR *c) { printf("repo_cursor_stamp -> %llu\n",(unsigned long long int)c->ts); // debugging return(c->ts); } /* * Return sample data. This is just an offset into lb. */ const char *repo_cursor_data(REPO_CURSOR *c) { printf("repo_cursor_data -> %s\n",c->lb+c->dataat); // debugging return(c->lb+c->dataat); } /* * Advance a cursor. Just keep reading samples until we either hit EOF * or we find one within the appropriate range. */ void repo_cursor_advance(REPO_CURSOR *c) { FILE *f; int ch; char *ep; printf("repo_cursor_advance starting at %llu\n",(unsigned long long int)c->loc); // debugging f = c->r->priv->f; while (1) { fseeko(f,c->loc,SEEK_SET); c->ll = 0; while (1) { ch = getc(f); if (ch == EOF) { c->eof = 1; printf("repo_cursor_advance read to EOF\n"); // debugging return; } if (c->ll >= c->la) c->lb = realloc(c->lb,c->la=c->ll+8); c->lb[c->ll++] = ch; if (ch == '\n') { c->lb[c->ll-1] = '\0'; c->ts = strtoull(c->lb,&ep,10); printf("repo_cursor_advance read %s\n",c->lb); // debugging if ((c->ts >= c->start) && (c->ts < c->stop)) { if (*ep == ' ') ep ++; c->dataat = ep - c->lb; c->loc = ftello(f); printf("repo_cursor_advance returning at %llu\n",(unsigned long long int)c->loc); // debugging return; } c->ll = 0; } } } }