-(define (msg<-toks nick toks)
- (define tm_rfc3339 (list-ref toks 0))
- (define tok_text (list-ref toks 1))
- (define t (string->rfc3339-record tm_rfc3339))
- ; TODO handle tz offset
- (define tm_epoch (find-seconds [rfc3339-record:second t]
- [rfc3339-record:minute t]
- [rfc3339-record:hour t]
- [rfc3339-record:mday t]
- [rfc3339-record:month t]
- [rfc3339-record:year t]))
- (msg tm_epoch tm_rfc3339 nick tok_text))
+(define (concurrent-filter-map num_workers f xs)
+ ; TODO switch from mailboxes to channels
+ (define (make-worker id f)
+ (define parent (current-thread))
+ (λ ()
+ (define self (current-thread))
+ (define (work)
+ (thread-send parent (cons 'next self))
+ (match (thread-receive)
+ ['done (thread-send parent (cons 'exit id))]
+ [(cons 'unit x) (begin
+ (define y (f x))
+ (when y (thread-send parent (cons 'result y)))
+ (work))]))
+ (work)))
+ (define (dispatch ws xs ys)
+ (if (empty? ws)
+ ys
+ (match (thread-receive)
+ [(cons 'exit w) (dispatch (remove w ws =) xs ys)]
+ [(cons 'result y) (dispatch ws xs (cons y ys))]
+ [(cons 'next thd) (match xs
+ ['() (begin
+ (thread-send thd 'done)
+ (dispatch ws xs ys))]
+ [(cons x xs) (begin
+ (thread-send thd (cons 'unit x))
+ (dispatch ws xs ys))])])))
+ (define workers
+ (range 1 (add1 num_workers)))
+ (define threads
+ (map (λ (id) (thread (make-worker id f))) workers))
+ (define results
+ (dispatch workers xs '()))
+ (for-each thread-wait threads)
+ results)
+
+(define (msg-print out-format odd msg)
+ (printf
+ (match out-format
+ ['single-line "~a \033[1;37m<~a ~a>\033[0m \033[0;~am~a\033[0m~n"]
+ ['multi-line "~a~n\033[1;37m<~a ~a>\033[0m~n\033[0;~am~a\033[0m~n~n"]
+ [_ (raise (format "Invalid output format: ~a" out-format))])
+ (date->string (seconds->date [msg-ts_epoch msg]) #t)
+ (msg-nick msg)
+ (msg-uri msg)
+ (if odd 36 33)
+ (msg-text msg)))
+
+(define re-msg-begin
+ ; TODO Zulu offset. Maybe in several formats. Which ones?
+ (pregexp "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}"))
+
+(define (str->msg nick uri str)
+ (if (not (regexp-match? re-msg-begin str))
+ (begin
+ (log-debug "Non-msg line from nick:~a, line:~a" nick str)
+ #f)
+ (let ([toks (string-split str (regexp "\t+"))])
+ (if (not (= 2 (length toks)))
+ (begin
+ (log-warning "Invalid msg line from nick:~a, msg:~a" nick str)
+ #f)
+ (let*
+ ([ts_rfc3339 (first toks)]
+ [text (second toks)]
+ [t (string->rfc3339-record ts_rfc3339)]
+ ; TODO handle tz offset
+ [ts_epoch (find-seconds [rfc3339-record:second t]
+ [rfc3339-record:minute t]
+ [rfc3339-record:hour t]
+ [rfc3339-record:mday t]
+ [rfc3339-record:month t]
+ [rfc3339-record:year t])])
+ (msg ts_epoch ts_rfc3339 nick uri text))))))
+
+(define (str->lines str)
+ (string-split str (regexp "[\r\n]+")))
+
+(define (str->msgs nick uri str)
+ (filter-map (λ (line) (str->msg nick uri line)) (str->lines str)))