+(define (concurrent-filter-map num_workers f xs)
+ ; TODO switch from mailboxes to channels
+ (define (make-worker id f)
+ (define parent (current-thread))
+ (λ ()
+ (define self (current-thread))
+ (define (work)
+ (thread-send parent (cons 'next self))
+ (match (thread-receive)
+ ['done (thread-send parent (cons 'exit id))]
+ [(cons 'unit x) (begin
+ (define y (f x))
+ (when y (thread-send parent (cons 'result y)))
+ (work))]))
+ (work)))
+ (define (dispatch ws xs ys)
+ (if (empty? ws)
+ ys
+ (match (thread-receive)
+ [(cons 'exit w) (dispatch (remove w ws =) xs ys)]
+ [(cons 'result y) (dispatch ws xs (cons y ys))]
+ [(cons 'next thd) (match xs
+ ['() (begin
+ (thread-send thd 'done)
+ (dispatch ws xs ys))]
+ [(cons x xs) (begin
+ (thread-send thd (cons 'unit x))
+ (dispatch ws xs ys))])])))
+ (define workers
+ (range 1 (add1 num_workers)))
+ (define threads
+ (map (λ (id) (thread (make-worker id f))) workers))
+ (define results
+ (dispatch workers xs '()))
+ (for-each thread-wait threads)
+ results)
+