-(define (feed->msgs use-cache feed)
- (log-info "downloading feed nick:~a uri:~a"
- (feed-nick feed)
- (feed-uri feed))
- (with-handlers
- ([exn:fail:network?
- (λ (e)
- (log-error "network error nick:~a uri:~a exn:~a"
- (feed-nick feed)
- (feed-uri feed)
- e)
- #f)]
- [integer?
- (λ (status)
- (log-error "http error nick:~a uri:~a status:~a"
- (feed-nick feed)
- (feed-uri feed)
- status)
- #f)])
- (define uri (feed-uri feed))
- (str->msgs [feed-nick feed] uri [uri-fetch use-cache uri])))
-
-; TODO timeline contract : time-sorted list of messages
-(define (timeline use-cache num_workers feeds)
- (sort (append* (concurrent-filter-map num_workers (curry feed->msgs use-cache) feeds))
- (λ (a b) [< (msg-ts_epoch a) (msg-ts_epoch b)])))
-
-(define (str->feed str)
- ; TODO validation
- (define toks (string-split str))
- (apply feed toks))
-
-(define (filter-comments lines)
- (filter-not (λ (line) (string-prefix? line "#")) lines))
-
-(define (str->feeds str)
- (map str->feed (filter-comments (str->lines str))))
-
-(define (file->feeds filename)
- (str->feeds (file->string filename)))
-
-(define (user-agent prog-name prog-version)
- (let*
- ([prog-uri "https://github.com/xandkar/tt"]
- [user-feed-file (expand-user-path "~/twtxt-me.txt")]
- [user
- (if (file-exists? user-feed-file)
- (let ([user (first (file->feeds user-feed-file))])
- (format "+~a; @~a" (feed-uri user) (feed-nick user)))
- (format "+~a" prog-uri))]
- )
- (format "~a/~a (~a)" prog-name prog-version user)))
-
-(define (start-logger level)
- (let* ([logger (make-logger #f #f level #f)]
- [log-receiver (make-log-receiver logger level)])
- (void (thread (λ ()
- (parameterize
- ([date-display-format 'iso-8601])
- (let loop ()
- (define data (sync log-receiver))
- (define level (vector-ref data 0))
- (define msg (vector-ref data 1))
- (define ts (date->string (current-date) #t))
- (eprintf "~a [~a] ~a~n" ts level msg)
- (loop))))))
- (current-logger logger)))
+(: peer->msgs (-> Peer (Listof Msg)))
+(define (peer->msgs peer)
+ (match-define (Peer nick uri _) peer)
+ (log-info "Reading peer nick:~v uri:~v" nick (url->string uri))
+ (define msgs-data (uri-read-cached uri))
+ (if msgs-data
+ (str->msgs nick uri msgs-data)
+ '()))
+
+(: peer-download
+ (-> Positive-Float Peer
+ (Result (U 'skipped-cached 'downloaded-new)
+ Any)))
+(define (peer-download timeout peer)
+ (match-define (Peer nick uri _) peer)
+ (define u (url->string uri))
+ (log-info "Download BEGIN URL:~a" u)
+ (define-values (results _tm-cpu-ms tm-real-ms _tm-gc-ms)
+ (time-apply uri-download (list timeout uri)))
+ (define result (car results))
+ (log-info "Download END in ~a seconds, URL:~a, result:~s"
+ (/ tm-real-ms 1000.0)
+ u
+ result)
+ result)
+
+(: timeline-download (-> Integer Positive-Float (Listof Peer) Void))
+(define (timeline-download num-workers timeout peers)
+ (define results
+ (concurrent-filter-map num-workers
+ (λ (p) (cons p (peer-download timeout p)))
+ peers))
+ (define peers-ok
+ (filter-map (match-lambda
+ [(cons p (cons 'ok _)) p]
+ [(cons _ (cons 'error e)) #f])
+ results))
+ (define peers-err
+ (filter-map (match-lambda
+ [(cons _ (cons 'ok _))
+ #f]
+ [(cons p (cons 'error e))
+ (struct-copy Peer p [comment (format "~s" e)])])
+ results))
+ (peers->file peers-ok (build-path tt-home-dir "peers-last-downloaded-ok"))
+ (peers->file peers-err (build-path tt-home-dir "peers-last-downloaded-err")))
+
+(: uniq (∀ (α) (-> (Listof α) (Listof α))))
+(define (uniq xs)
+ (set->list (list->set xs)))
+
+(: peers->timeline (-> (listof Peer) (listof Msg)))
+(define (peers->timeline peers)
+ (append* (filter-map peer->msgs peers)))
+
+(: timeline-sort (-> (listof Msg) timeline-order (Listof Msgs)))
+(define (timeline-sort msgs order)
+ (define cmp (match order
+ ['old->new <]
+ ['new->old >]))
+ (sort msgs (λ (a b) (cmp (Msg-ts-epoch a)
+ (Msg-ts-epoch b)))))
+
+(: paths->peers (-> (Listof String) (Listof Peer)))
+(define (paths->peers paths)
+ (let* ([paths (match paths
+ ['()
+ (let ([peer-refs-file (build-path tt-home-dir "peers")])
+ (log-debug
+ "No peer ref file paths provided, defaulting to ~v"
+ (path->string peer-refs-file))
+ (list peer-refs-file))]
+ [paths
+ (log-debug "Peer ref file paths provided: ~v" paths)
+ (map string->path paths)])]
+ [peers (append* (map file->peers paths))])
+ (log-info "Read-in ~a peers." (length peers))
+ (uniq peers)))
+
+(: mentioned-peers-in-cache (-> (Listof Peer)))
+(define (mentioned-peers-in-cache)
+ (define msgs
+ (append* (map (λ (filename)
+ (define path (build-path cache-object-dir filename))
+ (define size (/ (file-size path) 1000000.0))
+ (log-info "BEGIN parsing ~a MB from file: ~v"
+ size
+ (path->string path))
+ (define t0 (current-inexact-milliseconds))
+ (define m (filter-map
+ (λ (line)
+ (str->msg #f (cache-object-filename->url filename) line))
+ (filter-comments
+ (file->lines path))))
+ (define t1 (current-inexact-milliseconds))
+ (log-info "END parsing ~a MB in ~a seconds from file: ~v."
+ size
+ (* 0.001 (- t1 t0))
+ (path->string path))
+ (when (empty? m)
+ (log-warning "No messages found in ~a" (path->string path)))
+ m)
+ (directory-list cache-object-dir))))
+ (uniq (append* (map Msg-mentions msgs))))
+
+(: log-writer-stop (-> Thread Void))
+(define (log-writer-stop log-writer)
+ (log-message (current-logger) 'fatal 'stop "Exiting." #f)
+ (thread-wait log-writer))
+
+(: log-writer-start (-> Log-Level Thread))
+(define (log-writer-start level)
+ (let* ([logger
+ (make-logger #f #f level #f)]
+ [log-receiver
+ (make-log-receiver logger level)]
+ [log-writer
+ (thread
+ (λ ()
+ (parameterize
+ ([date-display-format 'iso-8601])
+ (let loop ()
+ (match-define (vector level msg _ topic) (sync log-receiver))
+ (unless (equal? topic 'stop)
+ (eprintf "~a [~a] ~a~n" (date->string (current-date) #t) level msg)
+ (loop))))))])
+ (current-logger logger)
+ log-writer))