Implement concurrency 0.1.0
authorSiraaj Khandkar <siraaj@khandkar.net>
Wed, 11 Nov 2020 15:04:09 +0000 (10:04 -0500)
committerSiraaj Khandkar <siraaj@khandkar.net>
Wed, 11 Nov 2020 15:04:45 +0000 (10:04 -0500)
tt

diff --git a/tt b/tt
index 4605943..e20cf8b 100755 (executable)
--- a/tt
+++ b/tt
@@ -11,6 +11,8 @@
 ; TODO named timelines
 ; TODO CLI params
 ; TODO config files
+; TODO parse "following" from feed
+; - following = <nick> <uri>
 ; TODO parse mentions:
 ; - @<source.nick source.url> | @<source.url>
 ; TODO highlight mentions
@@ -19,7 +21,6 @@
 ; TODO filter on hashtags
 ; TODO hashtags as channels? initial hashtag special?
 ; TODO query language
-; TODO concurrency
 ; TODO console logger colors by level ('error)
 ; TODO file logger ('debug)
 ; TODO commands:
 (struct msg  (tm_epoch tm_rfc3339 nick uri text))
 (struct feed (nick uri))
 
+(define (concurrent-filter-map num_workers f xs)
+  (define (make-worker id f)
+    (define parent (current-thread))
+    (λ ()
+       (define self (current-thread))
+       (define (work)
+         (thread-send parent (cons 'next self))
+         (match (thread-receive)
+           ['done          (thread-send parent (cons 'exit id))]
+           [(cons 'unit x) (begin
+                             (define y (f x))
+                             (when y (thread-send parent (cons 'result y)))
+                             (work))]))
+       (work)))
+  (define (dispatch ws xs ys)
+    (if (empty? ws)
+      ys
+      (match (thread-receive)
+        [(cons 'exit w)   (dispatch (remove w ws =) xs ys)]
+        [(cons 'result y) (dispatch ws xs (cons y ys))]
+        [(cons 'next thd) (match xs
+                            ['()         (begin
+                                           (thread-send thd 'done)
+                                           (dispatch ws xs ys))]
+                            [(cons x xs) (begin
+                                           (thread-send thd (cons 'unit x))
+                                           (dispatch ws xs ys))])])))
+  (define workers
+    (range 1 (add1 num_workers)))
+  (define threads
+    (map (λ (id) (thread (make-worker id f))) workers))
+  (define results
+    (dispatch workers xs '()))
+  (for-each thread-wait threads)
+  results)
+
 (define (msg-print out-format odd msg)
   (printf
     (match out-format
     (str->msgs [feed-nick feed] uri [uri-fetch uri])))
 
 ; TODO timeline contract : time-sorted list of messages
-(define (timeline feeds)
-  (sort (append* (filter-map feed->msgs feeds))
+(define (timeline num_workers feeds)
+  (sort (append* (concurrent-filter-map num_workers feed->msgs feeds))
         (λ (a b) [< (msg-tm_epoch a) (msg-tm_epoch b)])))
 
 (define (we-are-twtxt)
 (define (main)
   (setup-logging)
   (current-http-response-auto #f)
-  (current-http-user-agent "xandkar/tt 0.0.0")
+  (current-http-user-agent "xandkar/tt 0.1.0")
   (date-display-format 'rfc2822)
 
   (define feeds (we-are-twtxt))
   (define out-format 'multi-line)
-  (timeline-print out-format (timeline feeds)))
+  (define num_workers 15) ; 15 was fastest out of the tried 1, 5, 10, 15 and 20.
+  (timeline-print out-format (timeline num_workers feeds)))
 
 (main)
This page took 0.038909 seconds and 4 git commands to generate.