Fix queueing-up unhandled messages without pushback
[khatus.git] / x5 / khatus.c
index fad3994..d1574a5 100644 (file)
@@ -368,11 +368,12 @@ fifo_read_one(Fifo *f, char *buf)
        /* TODO Record timestamp read */
 }
 
-void
+int
 fifo_read_all(Config *cfg, char *buf)
 {
        fd_set fds;
        int maxfd = -1;
+       int reading = 0;  /* Number of FDs with unfinished reads */
        int ready;
        struct stat st;
 
@@ -414,12 +415,14 @@ fifo_read_all(Config *cfg, char *buf)
                fatal("%s", strerror(errno));
        for (Fifo *f = cfg->fifos; f; f = f->next) {
                if (FD_ISSET(f->fd, &fds)) {
+                       reading++;
                        debug("reading: %s\n", f->name);
                        switch (fifo_read_one(f, buf)) {
                        case END_OF_FILE:
                        case FAILURE:
                                close(f->fd);
                                f->fd = -1;
+                               reading--;
                                break;
                        case END_OF_MESSAGE:
                        case RETRY:
@@ -429,6 +432,8 @@ fifo_read_all(Config *cfg, char *buf)
                        }
                }
        }
+       assert(reading >= 0);
+       return reading;
 }
 
 int
@@ -439,6 +444,7 @@ main(int argc, char *argv[])
        int seplen = 0;
        int prefix = 0;
        int errors = 0;
+       int reading = 0;
        char *buf;
        Config cfg0 = defaults;
        Config *cfg = &cfg0;
@@ -514,7 +520,8 @@ main(int argc, char *argv[])
                 *       fifo_read_all and desired time of next TTL check.
                 */
                /* TODO: How long to wait on IO? Max TTL? */
-               fifo_read_all(cfg, buf);
+               reading = fifo_read_all(cfg, buf);
+               debug("reading: %d\n", reading);
                if (cfg->output_to_x_root_window) {
                        if (XStoreName(display, DefaultRootWindow(display), buf) < 0)
                                fatal("XStoreName failed.\n");
@@ -526,10 +533,11 @@ main(int argc, char *argv[])
                clock_gettime(CLOCK_MONOTONIC, &t1); // FIXME: check errors
                timespecsub(&t1, &t0, &td);
                debug("td {tv_sec = %ld, tv_nsec = %ld}\n", td.tv_sec, td.tv_nsec);
-               if (timespeccmp(&td, &ti, <)) {
+               if (!reading && timespeccmp(&td, &ti, <)) {
                        /* Pushback on data producers by refusing to read the
                         * pipe more frequently than the interval.
                         */
+                       /* FIXME: Client is never blocked after initial open. */
                        timespecsub(&ti, &td, &tc);
                        debug("snooze YES\n");
                        snooze(&tc);
This page took 0.02462 seconds and 4 git commands to generate.