Note problems with lossless msg-handling alternatives
[khatus.git] / x5 / khatus.c
index 382c873..5928dfb 100644 (file)
@@ -24,6 +24,7 @@
 }
 #define ERRMSG "ERROR"
 
+
 static const char errmsg[] = ERRMSG;
 static const int  errlen   = sizeof(ERRMSG) - 1;
 
@@ -60,8 +61,23 @@ enum read_status {
        FAILURE
 };
 
+Slot *
+slots_rev(Slot *old)
+{
+       Slot *tmp = NULL;
+       Slot *new = NULL;
+
+       while (old) {
+               tmp       = old->next;
+               old->next = new;
+               new       = old;
+               old       = tmp;
+       }
+       return new;
+}
+
 void
-slot_print_one(Slot *s)
+slot_print(Slot *s)
 {
        khlib_info("Slot "
            "{"
@@ -90,13 +106,225 @@ slot_print_one(Slot *s)
 }
 
 void
-slot_print_all(Slot *head)
+slots_print(Slot *head)
 {
        for (Slot *s = head; s; s = s->next) {
-               slot_print_one(s);
+               slot_print(s);
+       }
+}
+
+void
+slot_expire(Slot *s, struct timespec t, char *buf)
+{
+       struct timespec td;
+
+       timespecsub(&t, &(s->in_last_read), &td);
+       if (timespeccmp(&td, &(s->out_ttl), >=)) {
+               /* TODO: Maybe configurable expiry character. */
+               memset(buf + s->out_pos_lo, '_', s->out_width);
+               khlib_warn("Slot expired: \"%s\"\n", s->in_fifo);
+       }
+}
+
+void
+slot_set_error(Slot *s, char *buf)
+{
+       char *b;
+       int i;
+
+       s->in_fd = -1;
+       b = buf + s->out_pos_lo;
+       /* Copy as much of the error message as possible.
+        * EXCLUDING the terminating \0. */
+       for (i = 0; i < errlen && i < s->out_width; i++)
+               b[i] = errmsg[i];
+       /* Any remaining positions: */
+       memset(b + i, '_', s->out_width - i);
+}
+
+enum read_status
+slot_read(Slot *s, struct timespec t, char *buf)
+{
+       char c;  /* Character read. */
+       int  r;  /* Remaining unused positions in buffer range. */
+
+       for (;;) {
+               switch (read(s->in_fd, &c, 1)) {
+               case -1:
+                       khlib_error(
+                           "Failed to read: \"%s\". errno: %d, msg: %s\n",
+                           s->in_fifo,
+                           errno,
+                           strerror(errno)
+                       );
+                       switch (errno) {
+                       case EINTR:
+                       case EAGAIN:
+                               return RETRY;
+                       default:
+                               return FAILURE;
+                       }
+               case  0:
+                       khlib_debug("%s: End of FILE\n", s->in_fifo);
+                       s->out_pos_cur = s->out_pos_lo;
+                       return END_OF_FILE;
+               case  1:
+                       /* TODO: Consider making msg term char a CLI option */
+                       if (c == '\n' || c == '\0') {
+                               r = (s->out_pos_hi - s->out_pos_cur) + 1;
+                               if (r > 0)
+                                       memset(buf + s->out_pos_cur, ' ', r);
+                               s->out_pos_cur = s->out_pos_lo;
+                               s->in_last_read = t;
+                               return END_OF_MESSAGE;
+                       } else {
+                               if (s->out_pos_cur <= s->out_pos_hi)
+                                       buf[s->out_pos_cur++] = c;
+                               /* Drop beyond available range. */
+                               /*
+                                * TODO Define max after which we stop reading.
+                                *      To ensure that a rogue large message
+                                *      doesn't trap us here.
+                                */
+                       }
+                       break;
+               default:
+                       assert(0);
+               }
        }
 }
 
+void
+slots_read(Config *cfg, struct timespec *ti, char *buf)
+{
+       fd_set fds;
+       int maxfd = -1;
+       int ready = 0;
+       struct stat st;
+       struct timespec t;
+       Slot *s;
+
+       FD_ZERO(&fds);
+       for (s = cfg->slots; s; s = s->next) {
+               /* TODO: Create the FIFO if it doesn't already exist. */
+               if (lstat(s->in_fifo, &st) < 0) {
+                       khlib_error(
+                           "Cannot stat \"%s\". Error: %s\n",
+                           s->in_fifo,
+                           strerror(errno)
+                       );
+                       slot_set_error(s, buf);
+                       continue;
+               }
+               if (!(st.st_mode & S_IFIFO)) {
+                       khlib_error("\"%s\" is not a FIFO\n", s->in_fifo);
+                       slot_set_error(s, buf);
+                       continue;
+               }
+               if (s->in_fd < 0) {
+                       khlib_debug(
+                           "%s: closed. opening. in_fd: %d\n",
+                           s->in_fifo,
+                           s->in_fd
+                       );
+                       s->in_fd = open(s->in_fifo, O_RDONLY | O_NONBLOCK);
+               } else {
+                       khlib_debug(
+                           "%s: already openned. in_fd: %d\n",
+                           s->in_fifo,
+                           s->in_fd
+                       );
+               }
+               if (s->in_fd == -1) {
+                       /* TODO Consider backing off retries for failed slots */
+                       khlib_error("Failed to open \"%s\"\n", s->in_fifo);
+                       slot_set_error(s, buf);
+                       continue;
+               }
+               khlib_debug("%s: open. in_fd: %d\n", s->in_fifo, s->in_fd);
+               if (s->in_fd > maxfd)
+                       maxfd = s->in_fd;
+               FD_SET(s->in_fd, &fds);
+       }
+       khlib_debug("selecting...\n");
+       ready = pselect(maxfd + 1, &fds, NULL, NULL, ti, NULL);
+       khlib_debug("ready: %d\n", ready);
+       clock_gettime(CLOCK_MONOTONIC, &t);
+       if (ready == -1) {
+               switch (errno) {
+               case EINTR:
+                       khlib_error(
+                           "pselect temp failure: %d, errno: %d, msg: %s\n",
+                           ready,
+                           errno,
+                           strerror(errno)
+                       );
+                       /* TODO: Reconsider what to do here. */
+                       return;
+               default:
+                       khlib_fatal(
+                           "pselect failed: %d, errno: %d, msg: %s\n",
+                           ready,
+                           errno,
+                           strerror(errno)
+                       );
+               }
+       }
+       /* At-least-once ensures that expiries are still checked on timeouts. */
+       do {
+               for (s = cfg->slots; s; s = s->next) {
+                       if (s->in_fd < 0)
+                               continue;
+                       if (FD_ISSET(s->in_fd, &fds)) {
+                               khlib_debug("reading: %s\n", s->in_fifo);
+                               switch (slot_read(s, t, buf)) {
+                               /*
+                                * ### MESSAGE LOSS ###
+                                * is introduced by closing at EOM in addition
+                                * to EOF, since there may be unread messages
+                                * remaining in the pipe. However,
+                                *
+                                * ### INTER-MESSAGE PUSHBACK ###
+                                * is also gained, since pipes block at the
+                                * "open" call.
+                                *
+                                * This is an acceptable trade-off because we
+                                * are a stateless reporter of a _most-recent_
+                                * status, not a stateful accumulator.
+                                *
+                                * ### LOSSLESS ALTERNATIVES ###
+                                * - Read each pipe until EOF before reading
+                                *   another.
+                                *   PROBLEM: a fast writer can trap us in the
+                                *   read loop.
+                                *
+                                * - Read each pipe until EOM, but close only
+                                *   at EOF.
+                                *   PROBLEM: a fast writer can fill the pipe
+                                *   faster than we can read it and we end-up
+                                *   displaying stale data.
+                                *
+                                */
+                               case END_OF_MESSAGE:
+                               case END_OF_FILE:
+                               case FAILURE:
+                                       close(s->in_fd);
+                                       s->in_fd = -1;
+                                       ready--;
+                                       break;
+                               case RETRY:
+                                       break;
+                               default:
+                                       assert(0);
+                               }
+                       } else {
+                               slot_expire(s, t, buf);
+                       }
+               }
+       } while (ready);
+       assert(ready == 0);
+}
+
 void
 config_print(Config *cfg)
 {
@@ -114,7 +342,7 @@ config_print(Config *cfg)
            cfg->slot_count,
            cfg->total_width
        );
-       slot_print_all(cfg->slots);
+       slots_print(cfg->slots);
 }
 
 int
@@ -252,35 +480,41 @@ parse_opts_opt(Config *cfg, int argc, char *argv[], int i)
 void
 parse_opts_spec(Config *cfg, int argc, char *argv[], int i)
 {
+       char *n;
+       char *w;
+       char *t;
+       struct timespec in_last_read;
+       Slot *s;
+
        if ((i + 3) > argc)
                usage(
                    "[spec] Parameter(s) missing for fifo \"%s\".\n",
                    argv[i]
                );
 
-       char *n = argv[i++];
-       char *w = argv[i++];
-       char *t = argv[i++];
-
-       struct timespec in_last_read;
+       n = argv[i++];
+       w = argv[i++];
+       t = argv[i++];
 
        if (!is_pos_num(w))
                usage("[spec] Invalid width: \"%s\", for fifo \"%s\"\n", w, n);
        if (!is_decimal(t))
                usage("[spec] Invalid TTL: \"%s\", for fifo \"%s\"\n", t, n);
+
        in_last_read.tv_sec  = 0;
        in_last_read.tv_nsec = 0;
-       Slot *s = calloc(1, sizeof(struct Slot));
+       s = calloc(1, sizeof(struct Slot));
+
        if (s) {
                s->in_fifo      = n;
                s->in_fd        = -1;
-               s->out_width     = atoi(w);
-               s->out_ttl       = khlib_timespec_of_float(atof(t));
+               s->out_width    = atoi(w);
+               s->out_ttl      = khlib_timespec_of_float(atof(t));
                s->in_last_read = in_last_read;
-               s->out_pos_lo  = cfg->total_width;
+               s->out_pos_lo   = cfg->total_width;
                s->out_pos_cur  = s->out_pos_lo;
-               s->out_pos_hi = s->out_pos_lo + s->out_width - 1;
-               s->next      = cfg->slots;
+               s->out_pos_hi   = s->out_pos_lo + s->out_width - 1;
+               s->next         = cfg->slots;
 
                cfg->slots        = s;
                cfg->total_width += s->out_width;
@@ -309,211 +543,7 @@ void
 opts_parse(Config *cfg, int argc, char *argv[])
 {
        opts_parse_any(cfg, argc, argv, 1);
-
-       Slot *last = cfg->slots;
-       cfg->slots = NULL;
-       for (Slot *s = last; s; ) {
-               Slot *next = s->next;
-               s->next = cfg->slots;
-               cfg->slots = s;
-               s = next;
-       }
-}
-
-void
-slot_expire(Slot *s, struct timespec t, char *buf)
-{
-       struct timespec td;
-
-       timespecsub(&t, &(s->in_last_read), &td);
-       if (timespeccmp(&td, &(s->out_ttl), >=)) {
-               /* TODO: Maybe configurable expiry character. */
-               memset(buf + s->out_pos_lo, '_', s->out_pos_hi - s->out_pos_lo);
-               khlib_warn("Slot expired: \"%s\"\n", s->in_fifo);
-       }
-}
-
-void
-slot_read_error(Slot *s, char *buf)
-{
-       char *b;
-       int i;
-
-       b = buf + s->out_pos_lo;
-       /* Copy as much of the error message as possible.
-        * EXCLUDING the terminating \0. */
-       for (i = 0; i < errlen && i < s->out_width; i++)
-               b[i] = errmsg[i];
-       /* Any remaining slots: */
-       for (; i < s->out_width; i++)
-               b[i] = '_';
-}
-
-enum read_status
-slot_read_one(Slot *s, struct timespec t, char *buf)
-{
-       char c;  /* Character read. */
-       int  r;  /* Remaining unused slots in buffer range. */
-
-       for (;;) {
-               switch (read(s->in_fd, &c, 1)) {
-               case -1:
-                       khlib_error(
-                           "Failed to read: \"%s\". errno: %d, msg: %s\n",
-                           s->in_fifo,
-                           errno,
-                           strerror(errno)
-                       );
-                       switch (errno) {
-                       case EINTR:
-                       case EAGAIN:
-                               return RETRY;
-                       default:
-                               return FAILURE;
-                       }
-               case  0:
-                       khlib_debug("%s: End of FILE\n", s->in_fifo);
-                       s->out_pos_cur = s->out_pos_lo;
-                       return END_OF_FILE;
-               case  1:
-                       /* TODO: Consider making msg term char a CLI option */
-                       if (c == '\n' || c == '\0') {
-                               r = s->out_pos_hi - s->out_pos_cur;
-                               if (r > 0)
-                                       memset(buf + s->out_pos_cur, ' ', r);
-                               s->out_pos_cur = s->out_pos_lo;
-                               s->in_last_read = t;
-                               return END_OF_MESSAGE;
-                       } else {
-                               if (s->out_pos_cur <= s->out_pos_hi)
-                                       buf[s->out_pos_cur++] = c;
-                               /* Drop beyond available range. */
-                               /*
-                                * TODO Define max after which we stop reading.
-                                *      To ensure that a rogue large message
-                                *      doesn't trap us here.
-                                */
-                       }
-                       break;
-               default:
-                       assert(0);
-               }
-       }
-}
-
-void
-slot_read_all(Config *cfg, struct timespec *ti, char *buf)
-{
-       fd_set fds;
-       int maxfd = -1;
-       int ready = 0;
-       struct stat st;
-       struct timespec t;
-
-       FD_ZERO(&fds);
-       for (Slot *s = cfg->slots; s; s = s->next) {
-               /* TODO: Create the FIFO if it doesn't already exist. */
-               if (lstat(s->in_fifo, &st) < 0) {
-                       khlib_error(
-                           "Cannot stat \"%s\". Error: %s\n",
-                           s->in_fifo,
-                           strerror(errno)
-                       );
-                       slot_read_error(s, buf);
-                       continue;
-               }
-               if (!(st.st_mode & S_IFIFO)) {
-                       khlib_error("\"%s\" is not a FIFO\n", s->in_fifo);
-                       slot_read_error(s, buf);
-                       continue;
-               }
-               if (s->in_fd < 0) {
-                       khlib_debug(
-                           "%s: closed. opening. in_fd: %d\n",
-                           s->in_fifo,
-                           s->in_fd
-                       );
-                       s->in_fd = open(s->in_fifo, O_RDONLY | O_NONBLOCK);
-               } else {
-                       khlib_debug(
-                           "%s: already openned. in_fd: %d\n",
-                           s->in_fifo,
-                           s->in_fd
-                       );
-               }
-               if (s->in_fd == -1) {
-                       /* TODO Consider backing off retries for failed slots */
-                       khlib_error("Failed to open \"%s\"\n", s->in_fifo);
-                       slot_read_error(s, buf);
-                       continue;
-               }
-               khlib_debug("%s: open. in_fd: %d\n", s->in_fifo, s->in_fd);
-               if (s->in_fd > maxfd)
-                       maxfd = s->in_fd;
-               FD_SET(s->in_fd, &fds);
-       }
-       khlib_debug("selecting...\n");
-       ready = pselect(maxfd + 1, &fds, NULL, NULL, ti, NULL);
-       khlib_debug("ready: %d\n", ready);
-       clock_gettime(CLOCK_MONOTONIC, &t);
-       if (ready == -1) {
-               switch (errno) {
-               case EINTR:
-                       khlib_error(
-                           "pselect temp failure: %d, errno: %d, msg: %s\n",
-                           ready,
-                           errno,
-                           strerror(errno)
-                       );
-                       /* TODO: Reconsider what to do here. */
-                       return;
-               default:
-                       khlib_fatal(
-                           "pselect failed: %d, errno: %d, msg: %s\n",
-                           ready,
-                           errno,
-                           strerror(errno)
-                       );
-               }
-       }
-       /* At-least-once ensures that expiries are still checked on timeouts. */
-       do {
-               for (Slot *s = cfg->slots; s; s = s->next) {
-                       if (FD_ISSET(s->in_fd, &fds)) {
-                               khlib_debug("reading: %s\n", s->in_fifo);
-                               switch (slot_read_one(s, t, buf)) {
-                               /*
-                                * ### MESSAGE LOSS ###
-                                * is introduced by closing at EOM in addition
-                                * to EOF, since there may be unread messages
-                                * remaining in the pipe. However,
-                                *
-                                * ### INTER-MESSAGE PUSHBACK ###
-                                * is also gained, since pipes block at the
-                                * "open" call.
-                                *
-                                * This is an acceptable trade-off because we
-                                * are a stateless reporter of a _most-recent_
-                                * status, not a stateful accumulator.
-                                */
-                               case END_OF_MESSAGE:
-                               case END_OF_FILE:
-                               case FAILURE:
-                                       close(s->in_fd);
-                                       s->in_fd = -1;
-                                       ready--;
-                                       break;
-                               case RETRY:
-                                       break;
-                               default:
-                                       assert(0);
-                               }
-                       } else {
-                               slot_expire(s, t, buf);
-                       }
-               }
-       } while (ready);
-       assert(ready == 0);
+       cfg->slots = slots_rev(cfg->slots);
 }
 
 int
@@ -542,6 +572,7 @@ main(int argc, char *argv[])
                ti,  /* time interval desired    (t1 - t0) */
                td,  /* time interval measured   (t1 - t0) */
                tc;  /* time interval correction (ti - td) when td < ti */
+       Slot *s;
 
        argv0 = argv[0];
 
@@ -555,7 +586,7 @@ main(int argc, char *argv[])
                usage("No slot specs were given!\n");
 
        /* 1st pass to check file existence and type */
-       for (Slot *s = cfg.slots; s; s = s->next) {
+       for (s = cfg.slots; s; s = s->next) {
                if (lstat(s->in_fifo, &st) < 0) {
                        khlib_error(
                            "Cannot stat \"%s\". Error: %s\n",
@@ -580,7 +611,7 @@ main(int argc, char *argv[])
        seplen = strlen(cfg.separator);
 
        /* 2nd pass to make space for separators */
-       for (Slot *s = cfg.slots; s; s = s->next) {
+       for (s = cfg.slots; s; s = s->next) {
                s->out_pos_lo  += prefix;
                s->out_pos_hi += prefix;
                s->out_pos_cur = s->out_pos_lo;
@@ -597,7 +628,7 @@ main(int argc, char *argv[])
        memset(buf, ' ', width);
        buf[width] = '\0';
        /* 3rd pass to set the separators */
-       for (Slot *s = cfg.slots; s; s = s->next) {
+       for (s = cfg.slots; s; s = s->next) {
                if (s->out_pos_lo) {  /* Skip the first, left-most */
                        /* Copying only seplen ensures we omit the '\0' byte. */
                        strncpy(
@@ -613,7 +644,7 @@ main(int argc, char *argv[])
        /* TODO: Handle signals */
        for (;;) {
                clock_gettime(CLOCK_MONOTONIC, &t0); // FIXME: check errors
-               slot_read_all(&cfg, &ti, buf);
+               slots_read(&cfg, &ti, buf);
                if (cfg.output_to_x_root_window) {
                        if (XStoreName(d, DefaultRootWindow(d), buf) < 0)
                                khlib_fatal("XStoreName failed.\n");
@@ -630,16 +661,13 @@ main(int argc, char *argv[])
                    td.tv_nsec
                );
                if (timespeccmp(&td, &ti, <)) {
-                       /* Pushback on data producers by refusing to read the
+                       /*
+                        * Pushback on data producers by refusing to read the
                         * pipe more frequently than the interval.
                         */
                        timespecsub(&ti, &td, &tc);
-                       khlib_debug("khlib_sleep YES\n");
                        khlib_sleep(&tc);
-               } else {
-                       khlib_debug("khlib_sleep NO\n");
                }
        }
-
        return EXIT_SUCCESS;
 }
This page took 0.044344 seconds and 4 git commands to generate.