#include #include #include #include #include #include #include extern const char *__progname; #include "es.h" #include "prf.h" #include "mp4.h" #include "vars.h" #include "worker.h" #include "decode.h" typedef struct dec DEC; typedef struct pip PIP; struct pip { char *file; MP4 *mp4; int ax; int vx; MP4TRK *a; MP4TRK *v; int dist_id; int maxvi; int nvi; } ; struct dec { pid_t proc; int fd; int id; AIO_OQ oq; DEC *freelink; ES db; } ; static WORKER *decode_worker; static DEC *decs; static int ndecs; static DEC *freedecs; static int parent_fd; static int parent_id; static AIO_OQ parent_oq; static ES parent_in; static void set_nonblock(int fd) { fcntl(fd,F_SETFL,fcntl(fd,F_GETFL,0)|O_NONBLOCK); } static void pip_destroy(PIP *p) { free(p->file); if (p->mp4) mp4_close(p->mp4); free(p); } static int dist_next(void *pv) { PIP *p; DEC *d; p = pv; if (! freedecs) return(AIO_BLOCK_NIL); if (p->nvi >= p->maxvi) { aio_remove_block(p->dist_id); return(AIO_BLOCK_LOOP); } d = freedecs; freedecs = d->freelink; prf("Telling %d to decode %d",(int)(d-decs),p->nvi); aio_oq_queue_point(&d->oq,"d",1); aio_oq_queue_copy(&d->oq,&p->nvi,sizeof(int)); p->nvi ++; return(AIO_BLOCK_LOOP); } static void start_playing(const char *fnp, int fnl) { char *fn; PIP *p; int i; const char *e; int c; FILE *f; fn = malloc(fnl+1); bcopy(fnp,fn,fnl); fn[fnl] = '\0'; p = malloc(sizeof(PIP)); p->file = fn; p->mp4 = mp4_open(fn,MP4F_ERRMSG); if (! p->mp4) { prf("Can't open %s",fn); pip_destroy(p); return; } e = mp4_scan(p->mp4); if (e) { prf("mp4 reader: %s",e); pip_destroy(p); return; } p->ax = -1; p->vx = -1; for (i=mp4_ntracks(p->mp4)-1;i>=0;i--) { switch (mp4_track_type(p->mp4,i)) { default: abort(); break; case MP4_TT_AUDIO: p->ax = i; break; case MP4_TT_VIDEO: p->vx = i; break; } } if (p->vx < 0) { prf("No video found"); pip_destroy(p); return; } if (p->ax < 0) { prf("Note: no audio found"); return; } p->a = (p->ax < 0) ? 0 : mp4_open_audio(p->mp4,p->ax); p->v = mp4_open_video(p->mp4,p->vx); p->maxvi = mp4_max_seekx(p->v); prf("seek indices: 0..%d:",p->maxvi); f = 0; for (i=0;i<=p->maxvi;i++) { if (! f) { f = prf_open(); c = 0; } c += fprintf(f,"%*s%d: %.2f",c?(((c/14)+1)*14)-c:0,"",i,mp4_seekx_time(p->v,i)); if (c > 64) { fclose(f); f = 0; } } if (f) fclose(f); p->nvi = 0; p->dist_id = aio_add_block(&dist_next,p); } static void decode_cmd(const char *s, int l) { if (l < 1) abort(); switch (s[0]) { case 'Q': exit(0); break; case 'p': start_playing(s+1,l-1); return; break; } printf("decode command from parent: %.*s\n",l,s); } static void parent_process(const unsigned char *buf, int len) { prf("Read %d",len); es_append_n(&parent_in,buf,len); if (es_len(&parent_in) < 1) abort(); buf = es_ptr(&parent_in); switch (buf[0]) { case 'd': prf("Unimplemented decode worker command `%c'",buf[0]); break; default: prf("Unrecognized decode worker command `%c'",buf[0]); break; } es_clear(&parent_in); } static int wtest_parent(void *arg __attribute__((__unused__))) { return(aio_oq_nonempty(&parent_oq)); } static void rd_parent(void *arg __attribute__((__unused__))) { int nr; unsigned char rbuf[512]; nr = read(parent_fd,&rbuf[0],sizeof(rbuf)); if (nr < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: decode worker read: %s\n",__progname,strerror(errno)); exit(1); } if (nr == 0) exit(0); parent_process(&rbuf[0],nr); } static void wr_parent(void *arg __attribute__((__unused__))) { int nw; nw = aio_oq_writev(&parent_oq,parent_fd,-1); switch (nw) { case AIO_WRITEV_ERROR: switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: decoder worker write: %s\n",__progname,strerror(errno)); exit(1); break; } if (nw < 0) abort(); aio_oq_dropdata(&parent_oq,nw); } static void decoder(int fd) { aio_oq_init(&parent_oq); prf_setup(&parent_oq); parent_fd = fd; set_nonblock(fd); es_init(&parent_in); parent_id = aio_add_poll(fd,&aio_rwtest_always,&wtest_parent,&rd_parent,&wr_parent,0); aio_event_loop(); } static int wtest_decode(void *dv) { return(aio_oq_nonempty(&((DEC *)dv)->oq)); } static void wr_decode(void *dv) { DEC *d; int nw; d = dv; nw = aio_oq_writev(&d->oq,d->fd,-1); switch (nw) { case AIO_WRITEV_ERROR: switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: write to decoder: %s\n",__progname,strerror(errno)); exit(1); break; } if (nw < 0) abort(); aio_oq_dropdata(&d->oq,nw); } static void decwork_process_2(DEC *d, const unsigned char *data1, int len1, const unsigned char *data2, int len2) { unsigned int gc(int o) { if (o >= len1) { o -= len1; if (o >= len2) return(-1); return(data2[o]); } return(data1[o]); } if (len1 < 1) abort(); switch (gc(0)) { case 'P': aio_oq_queue_printf(&decode_worker->oq,"P[worker %d] ",(int)(d-decs)); aio_oq_queue_copy(&decode_worker->oq,data1+1,len1-1); aio_oq_queue_copy(&decode_worker->oq,data2,len2); break; default: abort(); break; } } static void decwork_process_1(DEC *d, const unsigned char *data, int len) { decwork_process_2(d,data,len,0,0); } static void rd_decode(void *dv) { DEC *d; int nr; int o; unsigned char *vp; int l; unsigned char rbuf[512]; d = dv; nr = read(d->fd,&rbuf[0],sizeof(rbuf)); if (nr < 0) { switch (errno) { case EINTR: case EWOULDBLOCK: return; break; } fprintf(stderr,"%s: read from decoder: %s\n",__progname,strerror(errno)); exit(1); } if (nr == 0) { fprintf(stderr,"%s: read EOF from decoder\n",__progname); exit(1); } o = 0; while (o < nr) { vp = memchr(&rbuf[o],'\0',nr-o); if (vp) { l = (((const unsigned char *)vp) - &rbuf[o]) + 1; if (es_len(&d->db)) { decwork_process_2(d,es_ptr(&d->db),es_len(&d->db),&rbuf[o],l); es_clear(&d->db); } else { decwork_process_1(d,&rbuf[o],l); } o += l; } else { es_append_n(&d->db,&rbuf[o],nr-o); o = nr; } } } static void fork_procs(void) { int i; DEC *d; pid_t kid; int p[2]; decs = malloc(ndecs*sizeof(DEC)); fflush(0); for (i=ndecs-1;i>=0;i--) { d = &decs[i]; es_init(&d->db); if (socketpair(AF_LOCAL,SOCK_STREAM,0,&p[0]) < 0) { fprintf(stderr,"%s: socketpair: %s",__progname,strerror(errno)); exit(1); } // so socket fds are visible in syscall traces write(-3,0,p[0]); write(-4,0,p[1]); kid = fork(); if (kid < 0) { fprintf(stderr,"%s: fork: %s",__progname,strerror(errno)); exit(1); } if (kid == 0) { aio_poll_reinit(); close(workerfd); for (i++;iproc = kid; close(p[1]); d->fd = p[0]; d->id = aio_add_poll(d->fd,&aio_rwtest_always,&wtest_decode,&rd_decode,&wr_decode,d); aio_oq_init(&d->oq); } freedecs = 0; for (i=ndecs-1;i>=0;i--) { d = &decs[i]; d->freelink = freedecs; freedecs = d; } } void decode_main(WORKER *w) { decode_worker = w; worker_common_startup(w,&decode_cmd); ndecs = threads; if (ndecs < 1) ndecs = 1; if (ndecs > 100) { prf("Unreasonable decoder thread count %d, using 1",ndecs); ndecs = 1; } fork_procs(); aio_event_loop(); }