- assert(ready != 0);
- if (ready < 0)
- fatal("%s", strerror(errno));
- for (Fifo *f = cfg->fifos; f; f = f->next) {
- if (FD_ISSET(f->fd, &fds)) {
- reading++;
- debug("reading: %s\n", f->name);
- switch (fifo_read_one(f, buf)) {
- /*
- * ### MESSAGE LOSS ###
- * is introduced by closing at EOM in addition to EOF,
- * since there may be unread messages remaining in the
- * pipe. However,
- *
- * ### INTER-MESSAGE PUSHBACK ###
- * is also gained, since pipes block at the "open" call.
- *
- * This is an acceptable trade-off because we are a
- * stateless reporter of a _most-recent_ status, not a
- * stateful accumulator.
- */
- case END_OF_MESSAGE:
- case END_OF_FILE:
- case FAILURE:
- close(f->fd);
- f->fd = -1;
- reading--;
- break;
- case RETRY:
- break;
- default:
- assert(0);
+ assert(ready >= 0);
+ clock_gettime(CLOCK_MONOTONIC, &t);
+ while (ready) {
+ for (Fifo *f = cfg->fifos; f; f = f->next) {
+ if (FD_ISSET(f->fd, &fds)) {
+ debug("reading: %s\n", f->name);
+ switch (fifo_read_one(f, t, buf)) {
+ /*
+ * ### MESSAGE LOSS ###
+ * is introduced by closing at EOM in addition
+ * to EOF, since there may be unread messages
+ * remaining in the pipe. However,
+ *
+ * ### INTER-MESSAGE PUSHBACK ###
+ * is also gained, since pipes block at the
+ * "open" call.
+ *
+ * This is an acceptable trade-off because we
+ * are a stateless reporter of a _most-recent_
+ * status, not a stateful accumulator.
+ */
+ case END_OF_MESSAGE:
+ case END_OF_FILE:
+ case FAILURE:
+ close(f->fd);
+ f->fd = -1;
+ ready--;
+ break;
+ case RETRY:
+ break;
+ default:
+ assert(0);
+ }