}
}
+void
+slot_expire(Slot *s, struct timespec t, char *buf)
+{
+ struct timespec td;
+
+ timespecsub(&t, &(s->in_last_read), &td);
+ if (timespeccmp(&td, &(s->out_ttl), >=)) {
+ /* TODO: Maybe configurable expiry character. */
+ memset(buf + s->out_pos_lo, '_', s->out_pos_hi - s->out_pos_lo);
+ khlib_warn("Slot expired: \"%s\"\n", s->in_fifo);
+ }
+}
+
+void
+slot_read_error(Slot *s, char *buf)
+{
+ char *b;
+ int i;
+
+ b = buf + s->out_pos_lo;
+ /* Copy as much of the error message as possible.
+ * EXCLUDING the terminating \0. */
+ for (i = 0; i < errlen && i < s->out_width; i++)
+ b[i] = errmsg[i];
+ /* Any remaining positions: */
+ for (; i < s->out_width; i++)
+ b[i] = '_';
+}
+
+enum read_status
+slot_read(Slot *s, struct timespec t, char *buf)
+{
+ char c; /* Character read. */
+ int r; /* Remaining unused positions in buffer range. */
+
+ for (;;) {
+ switch (read(s->in_fd, &c, 1)) {
+ case -1:
+ khlib_error(
+ "Failed to read: \"%s\". errno: %d, msg: %s\n",
+ s->in_fifo,
+ errno,
+ strerror(errno)
+ );
+ switch (errno) {
+ case EINTR:
+ case EAGAIN:
+ return RETRY;
+ default:
+ return FAILURE;
+ }
+ case 0:
+ khlib_debug("%s: End of FILE\n", s->in_fifo);
+ s->out_pos_cur = s->out_pos_lo;
+ return END_OF_FILE;
+ case 1:
+ /* TODO: Consider making msg term char a CLI option */
+ if (c == '\n' || c == '\0') {
+ r = s->out_pos_hi - s->out_pos_cur;
+ if (r > 0)
+ memset(buf + s->out_pos_cur, ' ', r);
+ s->out_pos_cur = s->out_pos_lo;
+ s->in_last_read = t;
+ return END_OF_MESSAGE;
+ } else {
+ if (s->out_pos_cur <= s->out_pos_hi)
+ buf[s->out_pos_cur++] = c;
+ /* Drop beyond available range. */
+ /*
+ * TODO Define max after which we stop reading.
+ * To ensure that a rogue large message
+ * doesn't trap us here.
+ */
+ }
+ break;
+ default:
+ assert(0);
+ }
+ }
+}
+
+void
+slots_read(Config *cfg, struct timespec *ti, char *buf)
+{
+ fd_set fds;
+ int maxfd = -1;
+ int ready = 0;
+ struct stat st;
+ struct timespec t;
+ Slot *s;
+
+ FD_ZERO(&fds);
+ for (s = cfg->slots; s; s = s->next) {
+ /* TODO: Create the FIFO if it doesn't already exist. */
+ if (lstat(s->in_fifo, &st) < 0) {
+ khlib_error(
+ "Cannot stat \"%s\". Error: %s\n",
+ s->in_fifo,
+ strerror(errno)
+ );
+ slot_read_error(s, buf);
+ continue;
+ }
+ if (!(st.st_mode & S_IFIFO)) {
+ khlib_error("\"%s\" is not a FIFO\n", s->in_fifo);
+ slot_read_error(s, buf);
+ continue;
+ }
+ if (s->in_fd < 0) {
+ khlib_debug(
+ "%s: closed. opening. in_fd: %d\n",
+ s->in_fifo,
+ s->in_fd
+ );
+ s->in_fd = open(s->in_fifo, O_RDONLY | O_NONBLOCK);
+ } else {
+ khlib_debug(
+ "%s: already openned. in_fd: %d\n",
+ s->in_fifo,
+ s->in_fd
+ );
+ }
+ if (s->in_fd == -1) {
+ /* TODO Consider backing off retries for failed slots */
+ khlib_error("Failed to open \"%s\"\n", s->in_fifo);
+ slot_read_error(s, buf);
+ continue;
+ }
+ khlib_debug("%s: open. in_fd: %d\n", s->in_fifo, s->in_fd);
+ if (s->in_fd > maxfd)
+ maxfd = s->in_fd;
+ FD_SET(s->in_fd, &fds);
+ }
+ khlib_debug("selecting...\n");
+ ready = pselect(maxfd + 1, &fds, NULL, NULL, ti, NULL);
+ khlib_debug("ready: %d\n", ready);
+ clock_gettime(CLOCK_MONOTONIC, &t);
+ if (ready == -1) {
+ switch (errno) {
+ case EINTR:
+ khlib_error(
+ "pselect temp failure: %d, errno: %d, msg: %s\n",
+ ready,
+ errno,
+ strerror(errno)
+ );
+ /* TODO: Reconsider what to do here. */
+ return;
+ default:
+ khlib_fatal(
+ "pselect failed: %d, errno: %d, msg: %s\n",
+ ready,
+ errno,
+ strerror(errno)
+ );
+ }
+ }
+ /* At-least-once ensures that expiries are still checked on timeouts. */
+ do {
+ for (s = cfg->slots; s; s = s->next) {
+ if (FD_ISSET(s->in_fd, &fds)) {
+ khlib_debug("reading: %s\n", s->in_fifo);
+ switch (slot_read(s, t, buf)) {
+ /*
+ * ### MESSAGE LOSS ###
+ * is introduced by closing at EOM in addition
+ * to EOF, since there may be unread messages
+ * remaining in the pipe. However,
+ *
+ * ### INTER-MESSAGE PUSHBACK ###
+ * is also gained, since pipes block at the
+ * "open" call.
+ *
+ * This is an acceptable trade-off because we
+ * are a stateless reporter of a _most-recent_
+ * status, not a stateful accumulator.
+ */
+ case END_OF_MESSAGE:
+ case END_OF_FILE:
+ case FAILURE:
+ close(s->in_fd);
+ s->in_fd = -1;
+ ready--;
+ break;
+ case RETRY:
+ break;
+ default:
+ assert(0);
+ }
+ } else {
+ slot_expire(s, t, buf);
+ }
+ }
+ } while (ready);
+ assert(ready == 0);
+}
+
void
config_print(Config *cfg)
{
cfg->slots = slots_rev(cfg->slots);
}
-void
-slot_expire(Slot *s, struct timespec t, char *buf)
-{
- struct timespec td;
-
- timespecsub(&t, &(s->in_last_read), &td);
- if (timespeccmp(&td, &(s->out_ttl), >=)) {
- /* TODO: Maybe configurable expiry character. */
- memset(buf + s->out_pos_lo, '_', s->out_pos_hi - s->out_pos_lo);
- khlib_warn("Slot expired: \"%s\"\n", s->in_fifo);
- }
-}
-
-void
-slot_read_error(Slot *s, char *buf)
-{
- char *b;
- int i;
-
- b = buf + s->out_pos_lo;
- /* Copy as much of the error message as possible.
- * EXCLUDING the terminating \0. */
- for (i = 0; i < errlen && i < s->out_width; i++)
- b[i] = errmsg[i];
- /* Any remaining positions: */
- for (; i < s->out_width; i++)
- b[i] = '_';
-}
-
-enum read_status
-slot_read(Slot *s, struct timespec t, char *buf)
-{
- char c; /* Character read. */
- int r; /* Remaining unused positions in buffer range. */
-
- for (;;) {
- switch (read(s->in_fd, &c, 1)) {
- case -1:
- khlib_error(
- "Failed to read: \"%s\". errno: %d, msg: %s\n",
- s->in_fifo,
- errno,
- strerror(errno)
- );
- switch (errno) {
- case EINTR:
- case EAGAIN:
- return RETRY;
- default:
- return FAILURE;
- }
- case 0:
- khlib_debug("%s: End of FILE\n", s->in_fifo);
- s->out_pos_cur = s->out_pos_lo;
- return END_OF_FILE;
- case 1:
- /* TODO: Consider making msg term char a CLI option */
- if (c == '\n' || c == '\0') {
- r = s->out_pos_hi - s->out_pos_cur;
- if (r > 0)
- memset(buf + s->out_pos_cur, ' ', r);
- s->out_pos_cur = s->out_pos_lo;
- s->in_last_read = t;
- return END_OF_MESSAGE;
- } else {
- if (s->out_pos_cur <= s->out_pos_hi)
- buf[s->out_pos_cur++] = c;
- /* Drop beyond available range. */
- /*
- * TODO Define max after which we stop reading.
- * To ensure that a rogue large message
- * doesn't trap us here.
- */
- }
- break;
- default:
- assert(0);
- }
- }
-}
-
-void
-slots_read(Config *cfg, struct timespec *ti, char *buf)
-{
- fd_set fds;
- int maxfd = -1;
- int ready = 0;
- struct stat st;
- struct timespec t;
- Slot *s;
-
- FD_ZERO(&fds);
- for (s = cfg->slots; s; s = s->next) {
- /* TODO: Create the FIFO if it doesn't already exist. */
- if (lstat(s->in_fifo, &st) < 0) {
- khlib_error(
- "Cannot stat \"%s\". Error: %s\n",
- s->in_fifo,
- strerror(errno)
- );
- slot_read_error(s, buf);
- continue;
- }
- if (!(st.st_mode & S_IFIFO)) {
- khlib_error("\"%s\" is not a FIFO\n", s->in_fifo);
- slot_read_error(s, buf);
- continue;
- }
- if (s->in_fd < 0) {
- khlib_debug(
- "%s: closed. opening. in_fd: %d\n",
- s->in_fifo,
- s->in_fd
- );
- s->in_fd = open(s->in_fifo, O_RDONLY | O_NONBLOCK);
- } else {
- khlib_debug(
- "%s: already openned. in_fd: %d\n",
- s->in_fifo,
- s->in_fd
- );
- }
- if (s->in_fd == -1) {
- /* TODO Consider backing off retries for failed slots */
- khlib_error("Failed to open \"%s\"\n", s->in_fifo);
- slot_read_error(s, buf);
- continue;
- }
- khlib_debug("%s: open. in_fd: %d\n", s->in_fifo, s->in_fd);
- if (s->in_fd > maxfd)
- maxfd = s->in_fd;
- FD_SET(s->in_fd, &fds);
- }
- khlib_debug("selecting...\n");
- ready = pselect(maxfd + 1, &fds, NULL, NULL, ti, NULL);
- khlib_debug("ready: %d\n", ready);
- clock_gettime(CLOCK_MONOTONIC, &t);
- if (ready == -1) {
- switch (errno) {
- case EINTR:
- khlib_error(
- "pselect temp failure: %d, errno: %d, msg: %s\n",
- ready,
- errno,
- strerror(errno)
- );
- /* TODO: Reconsider what to do here. */
- return;
- default:
- khlib_fatal(
- "pselect failed: %d, errno: %d, msg: %s\n",
- ready,
- errno,
- strerror(errno)
- );
- }
- }
- /* At-least-once ensures that expiries are still checked on timeouts. */
- do {
- for (s = cfg->slots; s; s = s->next) {
- if (FD_ISSET(s->in_fd, &fds)) {
- khlib_debug("reading: %s\n", s->in_fifo);
- switch (slot_read(s, t, buf)) {
- /*
- * ### MESSAGE LOSS ###
- * is introduced by closing at EOM in addition
- * to EOF, since there may be unread messages
- * remaining in the pipe. However,
- *
- * ### INTER-MESSAGE PUSHBACK ###
- * is also gained, since pipes block at the
- * "open" call.
- *
- * This is an acceptable trade-off because we
- * are a stateless reporter of a _most-recent_
- * status, not a stateful accumulator.
- */
- case END_OF_MESSAGE:
- case END_OF_FILE:
- case FAILURE:
- close(s->in_fd);
- s->in_fd = -1;
- ready--;
- break;
- case RETRY:
- break;
- default:
- assert(0);
- }
- } else {
- slot_expire(s, t, buf);
- }
- }
- } while (ready);
- assert(ready == 0);
-}
-
int
main(int argc, char *argv[])
{