Fix queueing-up unhandled messages without pushback
authorSiraaj Khandkar <siraaj@khandkar.net>
Tue, 24 Mar 2020 21:12:01 +0000 (17:12 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Tue, 24 Mar 2020 21:16:06 +0000 (17:16 -0400)
by skipping sleep when there're open FDs (which can be written to during
sleep).

While this doesn't grow the queue, this still prevents pushback by
allowing data to be written before reader can reach EOF.

x5/khatus.c

index fad3994..d1574a5 100644 (file)
@@ -368,11 +368,12 @@ fifo_read_one(Fifo *f, char *buf)
        /* TODO Record timestamp read */
 }
 
-void
+int
 fifo_read_all(Config *cfg, char *buf)
 {
        fd_set fds;
        int maxfd = -1;
+       int reading = 0;  /* Number of FDs with unfinished reads */
        int ready;
        struct stat st;
 
@@ -414,12 +415,14 @@ fifo_read_all(Config *cfg, char *buf)
                fatal("%s", strerror(errno));
        for (Fifo *f = cfg->fifos; f; f = f->next) {
                if (FD_ISSET(f->fd, &fds)) {
+                       reading++;
                        debug("reading: %s\n", f->name);
                        switch (fifo_read_one(f, buf)) {
                        case END_OF_FILE:
                        case FAILURE:
                                close(f->fd);
                                f->fd = -1;
+                               reading--;
                                break;
                        case END_OF_MESSAGE:
                        case RETRY:
@@ -429,6 +432,8 @@ fifo_read_all(Config *cfg, char *buf)
                        }
                }
        }
+       assert(reading >= 0);
+       return reading;
 }
 
 int
@@ -439,6 +444,7 @@ main(int argc, char *argv[])
        int seplen = 0;
        int prefix = 0;
        int errors = 0;
+       int reading = 0;
        char *buf;
        Config cfg0 = defaults;
        Config *cfg = &cfg0;
@@ -514,7 +520,8 @@ main(int argc, char *argv[])
                 *       fifo_read_all and desired time of next TTL check.
                 */
                /* TODO: How long to wait on IO? Max TTL? */
-               fifo_read_all(cfg, buf);
+               reading = fifo_read_all(cfg, buf);
+               debug("reading: %d\n", reading);
                if (cfg->output_to_x_root_window) {
                        if (XStoreName(display, DefaultRootWindow(display), buf) < 0)
                                fatal("XStoreName failed.\n");
@@ -526,10 +533,11 @@ main(int argc, char *argv[])
                clock_gettime(CLOCK_MONOTONIC, &t1); // FIXME: check errors
                timespecsub(&t1, &t0, &td);
                debug("td {tv_sec = %ld, tv_nsec = %ld}\n", td.tv_sec, td.tv_nsec);
-               if (timespeccmp(&td, &ti, <)) {
+               if (!reading && timespeccmp(&td, &ti, <)) {
                        /* Pushback on data producers by refusing to read the
                         * pipe more frequently than the interval.
                         */
+                       /* FIXME: Client is never blocked after initial open. */
                        timespecsub(&ti, &td, &tc);
                        debug("snooze YES\n");
                        snooze(&tc);
This page took 0.021421 seconds and 4 git commands to generate.