reading++;
debug("reading: %s\n", f->name);
switch (fifo_read_one(f, buf)) {
+ /*
+ * ### MESSAGE LOSS ###
+ * is introduced by closing at EOM in addition to EOF,
+ * since there may be unread messages remaining in the
+ * pipe. However,
+ *
+ * ### INTER-MESSAGE PUSHBACK ###
+ * is also gained, since pipes block at the "open" call.
+ *
+ * This is an acceptable trade-off because we are a
+ * stateless reporter of a _most-recent_ status, not a
+ * stateful accumulator.
+ */
+ case END_OF_MESSAGE:
case END_OF_FILE:
case FAILURE:
close(f->fd);
f->fd = -1;
reading--;
break;
- case END_OF_MESSAGE:
case RETRY:
break;
default:
clock_gettime(CLOCK_MONOTONIC, &t1); // FIXME: check errors
timespecsub(&t1, &t0, &td);
debug("td {tv_sec = %ld, tv_nsec = %ld}\n", td.tv_sec, td.tv_nsec);
+ /*
+ * Skip sleep when we aren't done reading,
+ * to avoid filling-up the pipe during sleep.
+ */
if (!reading && timespeccmp(&td, &ti, <)) {
/* Pushback on data producers by refusing to read the
* pipe more frequently than the interval.