timespecsub(&t, &(s->in_last_read), &td);
if (timespeccmp(&td, &(s->out_ttl), >=)) {
/* TODO: Maybe configurable expiry character. */
- memset(buf + s->out_pos_lo, '_', s->out_pos_hi - s->out_pos_lo);
+ memset(buf + s->out_pos_lo, '_', s->out_width);
khlib_warn("Slot expired: \"%s\"\n", s->in_fifo);
}
}
void
-slot_read_error(Slot *s, char *buf)
+slot_set_error(Slot *s, char *buf)
{
char *b;
int i;
+ s->in_fd = -1;
b = buf + s->out_pos_lo;
/* Copy as much of the error message as possible.
* EXCLUDING the terminating \0. */
for (i = 0; i < errlen && i < s->out_width; i++)
b[i] = errmsg[i];
/* Any remaining positions: */
- for (; i < s->out_width; i++)
- b[i] = '_';
+ memset(b + i, '_', s->out_width - i);
}
enum read_status
-slot_read(Slot *s, struct timespec t, char *buf)
+slot_read(Slot *s, char *buf)
{
char c; /* Character read. */
int r; /* Remaining unused positions in buffer range. */
case 1:
/* TODO: Consider making msg term char a CLI option */
if (c == '\n' || c == '\0') {
- r = s->out_pos_hi - s->out_pos_cur;
+ r = (s->out_pos_hi - s->out_pos_cur) + 1;
if (r > 0)
memset(buf + s->out_pos_cur, ' ', r);
- s->out_pos_cur = s->out_pos_lo;
- s->in_last_read = t;
return END_OF_MESSAGE;
} else {
if (s->out_pos_cur <= s->out_pos_hi)
buf[s->out_pos_cur++] = c;
- /* Drop beyond available range. */
- /*
- * TODO Define max after which we stop reading.
- * To ensure that a rogue large message
- * doesn't trap us here.
- */
+ else
+ /*
+ * Force EOM beyond available range.
+ * To ensure that a rogue large message
+ * doesn't trap us here needlessly
+ * long.
+ */
+ return END_OF_MESSAGE;
}
break;
default:
s->in_fifo,
strerror(errno)
);
- slot_read_error(s, buf);
+ slot_set_error(s, buf);
continue;
}
if (!(st.st_mode & S_IFIFO)) {
khlib_error("\"%s\" is not a FIFO\n", s->in_fifo);
- slot_read_error(s, buf);
+ slot_set_error(s, buf);
continue;
}
if (s->in_fd < 0) {
if (s->in_fd == -1) {
/* TODO Consider backing off retries for failed slots */
khlib_error("Failed to open \"%s\"\n", s->in_fifo);
- slot_read_error(s, buf);
+ slot_set_error(s, buf);
continue;
}
khlib_debug("%s: open. in_fd: %d\n", s->in_fifo, s->in_fd);
/* At-least-once ensures that expiries are still checked on timeouts. */
do {
for (s = cfg->slots; s; s = s->next) {
+ if (s->in_fd < 0)
+ continue;
if (FD_ISSET(s->in_fd, &fds)) {
khlib_debug("reading: %s\n", s->in_fifo);
- switch (slot_read(s, t, buf)) {
+ switch (slot_read(s, buf)) {
/*
* ### MESSAGE LOSS ###
* is introduced by closing at EOM in addition
* This is an acceptable trade-off because we
* are a stateless reporter of a _most-recent_
* status, not a stateful accumulator.
+ *
+ * ### LOSSLESS ALTERNATIVES ###
+ * - Read each pipe until EOF before reading
+ * another.
+ * PROBLEM: a fast writer can trap us in the
+ * read loop.
+ *
+ * - Read each pipe until EOM, but close only
+ * at EOF.
+ * PROBLEM: a fast writer can fill the pipe
+ * faster than we can read it and we end-up
+ * displaying stale data.
+ *
*/
case END_OF_MESSAGE:
case END_OF_FILE:
case FAILURE:
close(s->in_fd);
- s->in_fd = -1;
+ s->in_fd = -1;
+ s->in_last_read = t;
+ s->out_pos_cur = s->out_pos_lo;
ready--;
break;
case RETRY: