X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=tt.rkt;h=d6218b8305d4c116014f189dc36e08f5a51ae822;hb=d718efc4fa25667b6c42c97b8f0998ff3eb9e09c;hp=59214ffaa17c126401fe35f6206325ee6b99a0ef;hpb=3a4b22332acd7b76bcbe4150b5eba9fc80b7f459;p=tt.git diff --git a/tt.rkt b/tt.rkt index 59214ff..d6218b8 100644 --- a/tt.rkt +++ b/tt.rkt @@ -1,24 +1,55 @@ -#lang racket +#lang typed/racket/no-check (require openssl/sha1) (require racket/date) +(require + net/head + net/http-client + net/uri-codec + net/url-string + net/url-structs) -(require http-client) -(require rfc3339-old) +(require (prefix-in info: "info.rkt")) (module+ test (require rackunit)) -(struct msg (ts_epoch ts_rfc3339 nick uri text)) -(struct feed (nick uri)) +(define-type Url + net/url-structs:url) -(define (concurrent-filter-map num_workers f xs) +(define-type Out-Format + (U 'single-line + 'multi-line)) + +(define-type Timeline-Order + (U 'old->new + 'new->old)) + +(struct Msg + ([ts-epoch : Integer] + [ts-orig : String] + [nick : (Option String)] + [uri : Url] + [text : String] + [mentions : (Listof Peer)])) + +(struct Peer + ([nick : (Option String)] + [uri : Url]) + #:transparent) + +(: tt-home-dir Path-String) +(define tt-home-dir (build-path (expand-user-path "~") ".tt")) + +(: concurrent-filter-map (∀ (α β) (-> Natural (-> α β) (Listof α) (Listof β)))) +(define (concurrent-filter-map num-workers f xs) ; TODO preserve order of elements OR communicate that reorder is expected ; TODO switch from mailboxes to channels (define (make-worker id f) (define parent (current-thread)) (λ () - (define self (current-thread)) + (define self : Thread (current-thread)) + (: work (∀ (α) (-> α))) (define (work) (thread-send parent (cons 'next self)) (match (thread-receive) @@ -28,6 +59,7 @@ (when y (thread-send parent (cons 'result y))) (work))])) (work))) + (: dispatch (∀ (α β) (-> (Listof Nonnegative-Integer) (Listof α) (Listof β)))) (define (dispatch ws xs ys) (if (empty? ws) ys @@ -41,7 +73,7 @@ [(cons x xs) (begin (thread-send thd (cons 'unit x)) (dispatch ws xs ys))])]))) - (define workers (range num_workers)) + (define workers (range num-workers)) (define threads (map (λ (id) (thread (make-worker id f))) workers)) (define results (dispatch workers xs '())) (for-each thread-wait threads) @@ -54,55 +86,100 @@ [expected (sort ( filter-map f xs) <)]) (check-equal? actual expected "concurrent-filter-map"))) +(: msg-print (-> Out-Format Integer Msg Void)) (define msg-print (let* ([colors (vector 36 33)] [n (vector-length colors)]) (λ (out-format color-i msg) (let ([color (vector-ref colors (modulo color-i n))] - [nick (msg-nick msg)] - [uri (msg-uri msg)] - [text (msg-text msg)]) + [nick (Msg-nick msg)] + [uri (url->string (Msg-uri msg))] + [text (Msg-text msg)] + [mentions (Msg-mentions msg)]) (match out-format ['single-line - (printf "~a \033[1;37m<~a>\033[0m \033[0;~am~a\033[0m~n" - (parameterize ([date-display-format 'iso-8601]) - (date->string (seconds->date [msg-ts_epoch msg]) #t)) - nick color text)] + (let ([nick (if nick nick uri)]) + (printf "~a \033[1;37m<~a>\033[0m \033[0;~am~a\033[0m~n" + (parameterize + ([date-display-format 'iso-8601]) + (date->string (seconds->date (Msg-ts-epoch msg)) #t)) + nick color text))] ['multi-line - (printf "~a~n\033[1;37m<~a ~a>\033[0m~n\033[0;~am~a\033[0m~n~n" - (parameterize ([date-display-format 'rfc2822]) - (date->string (seconds->date [msg-ts_epoch msg]) #t)) - nick uri color text)]))))) - -; TODO Implement rfc3339->epoch and remove dependency on rfc3339-old - + (let ([nick (if nick (string-append nick " ") "")]) + (printf "~a (~a)~n\033[1;37m<~a~a>\033[0m~n\033[0;~am~a\033[0m~n~n" + (parameterize + ([date-display-format 'rfc2822]) + (date->string (seconds->date (Msg-ts-epoch msg)) #t)) + (Msg-ts-orig msg) + nick uri color text))]))))) + +(: rfc3339->epoch (-> String (Option Nonnegative-Integer))) +(define rfc3339->epoch + (let ([re (pregexp "^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2})(:([0-9]{2}))?(\\.[0-9]+)?(Z|([+-])([0-9]{1,2}):?([0-9]{2}))?$")]) + (λ (ts) + (match (regexp-match re ts) + [(list _wholething yyyy mm dd HH MM _:SS SS _fractional tz-whole tz-sign tz-HH tz-MM) + (let* + ([tz-offset + (match* (tz-whole tz-sign tz-HH tz-MM) + [("Z" #f #f #f) + 0] + [(_ (or "-" "+") (? identity h) (? identity m)) + (let ([h (string->number h)] + [m (string->number m)] + ; Reverse to get back to UTC: + [op (match tz-sign ["+" -] ["-" +])]) + (op 0 (+ (* 60 m) (* 60 (* 60 h)))))] + [(a b c d) + (log-warning "Impossible TZ string: ~v, components: ~v ~v ~v ~v" tz-whole a b c d) + 0])] + [ts-orig ts] + [local-time? #f] + [ts-epoch (find-seconds (if SS (string->number SS) 0) + (string->number MM) + (string->number HH) + (string->number dd) + (string->number mm) + (string->number yyyy) + local-time?)]) + (+ ts-epoch tz-offset))] + [_ + (log-error "Invalid timestamp: ~v" ts) + #f])))) + +(: str->msg (-> (Option String) Url String (Option Msg))) (define str->msg - (let ([re (pregexp "^([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}(:[0-9]{2})?)(\\.[0-9]+)?([^\\s\t]*)[\\s\t]+(.*)$")]) + (let ([re (pregexp "^([^\\s\t]+)[\\s\t]+(.*)$")]) (λ (nick uri str) (with-handlers* ([exn:fail? (λ (e) - (log-error "Failed to parse msg: ~v, from: ~v, at: ~v, because: ~v" str nick uri e) + (log-error + "Failed to parse msg: ~v, from: ~v, at: ~v, because: ~v" + str nick (url->string uri) e) #f)]) (match (regexp-match re str) - [(list _wholething ts s _fractional tz text) - (let* - ([ts_rfc3339 (string-append ts (if s "" ":00") (if tz tz ""))] - [t (string->rfc3339-record ts_rfc3339)] - [s (rfc3339-record:second t)] - ; TODO handle tz offset - [ts_epoch (find-seconds [if s s 0] - [rfc3339-record:minute t] - [rfc3339-record:hour t] - [rfc3339-record:mday t] - [rfc3339-record:month t] - [rfc3339-record:year t])]) - (msg ts_epoch ts_rfc3339 nick uri text))] + [(list _wholething ts-orig text) + (let ([ts-epoch (rfc3339->epoch ts-orig)]) + (if ts-epoch + (let ([mentions + (filter-map + (λ (m) (match (regexp-match #px"@<([^>]+)>" m) + [(list _wholething nick-uri) + (str->peer nick-uri)])) + (regexp-match* #px"@<[^\\s]+([\\s]+)?[^>]+>" text))]) + (Msg ts-epoch ts-orig nick uri text mentions)) + (begin + (log-error + "Msg rejected due to invalid timestamp: ~v, nick:~v, uri:~v" + str nick (url->string uri)) + #f)))] [_ - (log-debug "Non-msg line from nick:~a, line:~a" nick str) + (log-debug "Non-msg line from nick:~v, line:~a" nick str) #f]))))) (module+ test + ; TODO Test for when missing-nick case (let* ([tzs (for*/list ([d '("-" "+")] [h '("5" "05")] [m '("00" ":00" "57" ":57")]) @@ -120,10 +197,10 @@ z)] [m (str->msg n u (string-append ts sep txt))]) (check-not-false m) - (check-equal? (msg-nick m) n) - (check-equal? (msg-uri m) u) - (check-equal? (msg-text m) txt) - (check-equal? (msg-ts_rfc3339 m) ts (format "Given: ~v" ts)) + (check-equal? (Msg-nick m) n) + (check-equal? (Msg-uri m) u) + (check-equal? (Msg-text m) txt) + (check-equal? (Msg-ts-orig m) ts (format "Given: ~v" ts)) ))) (let* ([ts "2020-11-18T22:22:09-0500"] @@ -132,166 +209,272 @@ [nick "foo"] [uri "bar"] [actual (str->msg nick uri (string-append ts tab text))] - [expected (msg 1605756129 ts nick uri text)]) - ; FIXME re-enable after handling tz offset - ;(check-equal? - ; (msg-ts_epoch actual) - ; (msg-ts_epoch expected) - ; "str->msg ts_epoch") + [expected (Msg 1605756129 ts nick uri text)]) + (check-equal? + (Msg-ts-epoch actual) + (Msg-ts-epoch expected) + "str->msg ts-epoch") (check-equal? - (msg-ts_rfc3339 actual) - (msg-ts_rfc3339 expected) - "str->msg ts_rfc3339") + (Msg-ts-orig actual) + (Msg-ts-orig expected) + "str->msg ts-orig") (check-equal? - (msg-nick actual) - (msg-nick expected) + (Msg-nick actual) + (Msg-nick expected) "str->msg nick") (check-equal? - (msg-uri actual) - (msg-uri expected) + (Msg-uri actual) + (Msg-uri expected) "str->msg uri") (check-equal? - (msg-text actual) - (msg-text expected) + (Msg-text actual) + (Msg-text expected) "str->msg text"))) +(: str->lines (-> String (Listof String))) (define (str->lines str) (string-split str (regexp "[\r\n]+"))) (module+ test (check-equal? (str->lines "abc\ndef\n\nghi") '("abc" "def" "ghi"))) +(: str->msgs (-> (Option String) Url String (Listof Msg))) (define (str->msgs nick uri str) - (filter-map (λ (line) (str->msg nick uri line)) (str->lines str))) + (filter-map (λ (line) (str->msg nick uri line)) (filter-comments (str->lines str)))) + +(: cache-dir Path-String) +(define cache-dir (build-path tt-home-dir "cache")) + +(define cache-object-dir (build-path cache-dir "objects")) + +(: url->cache-file-path-v1 (-> Url Path-String)) +(define (url->cache-file-path-v1 uri) + (define (hash-sha1 str) : (-> String String) + (define in (open-input-string str)) + (define digest (sha1 in)) + (close-input-port in) + digest) + (build-path cache-object-dir (hash-sha1 (url->string uri)))) -(define (hash-sha1 str) - (define in (open-input-string str)) - (define digest (sha1 in)) - (close-input-port in) - digest) +(: url->cache-file-path-v2 (-> Url Path-String)) +(define (url->cache-file-path-v2 uri) + (build-path cache-object-dir (uri-encode (url->string uri)))) +(define url->cache-object-path url->cache-file-path-v2) + +(define (url->cache-etag-path uri) + (build-path cache-dir "etags" (uri-encode (url->string uri)))) + +(define (url->cache-lmod-path uri) + (build-path cache-dir "lmods" (uri-encode (url->string uri)))) + +; TODO Return Option +(: uri-read-cached (-> Url String)) (define (uri-read-cached uri) - (define cache-file-path - (expand-user-path (string-append "~/.tt/cache/" (hash-sha1 uri)))) - (if (file-exists? cache-file-path) - (file->string cache-file-path) + (define path-v1 (url->cache-file-path-v1 uri)) + (define path-v2 (url->cache-file-path-v2 uri)) + (when (file-exists? path-v1) + (rename-file-or-directory path-v1 path-v2 #t)) + (if (file-exists? path-v2) + (file->string path-v2) (begin - (log-warning "Cache file not found for URI: ~a" uri) + (log-warning "Cache file not found for URI: ~a" (url->string uri)) ""))) -; uri-download : String -> Void -(define (uri-download uri) - (define cache-file-path - (expand-user-path (string-append "~/.tt/cache/" (hash-sha1 uri)))) - (log-info "uri-download ~a" uri) - ; TODO Timeout. Currently hangs on slow connections. - (let* ([resp (http-get uri)] - [status (http-response-code resp)] - [body (http-response-body resp)]) - (log-debug "finished GET ~a status:~a body length:~a" - uri status (string-length body)) - ; TODO Handle redirects - (if (= status 200) - (display-to-file body cache-file-path #:exists 'replace) - ; TODO A more-informative exception - (raise status)))) +(: str->peer (String (Option Peer))) +(define (str->peer str) + (log-debug "Parsing peer string: ~v" str) + (with-handlers* + ([exn:fail? + (λ (e) + (log-error "Invalid URI in string: ~v, exn: ~v" str e) + #f)]) + (match (string-split str) + [(list u) (Peer #f (string->url u))] + [(list n u) (Peer n (string->url u))] + [_ + (log-error "Invalid peer string: ~v" str) + #f]))) + + +(: filter-comments (-> (Listof String) (Listof String))) +(define (filter-comments lines) + (filter-not (λ (line) (string-prefix? line "#")) lines)) + +(: str->peers (-> String (Listof Peer))) +(define (str->peers str) + (filter-map str->peer (filter-comments (str->lines str)))) + +(: file->peers (-> Path-String (Listof Peer))) +(define (file->peers file-path) + (if (file-exists? file-path) + (str->peers (file->string file-path)) + (begin + (log-error "File does not exist: ~v" (path->string file-path)) + '()))) + +(: user-agent String) +(define user-agent + (let* + ([prog-name "tt"] + [prog-version (info:#%info-lookup 'version)] + [prog-uri "https://github.com/xandkar/tt"] + [user-peer-file (build-path tt-home-dir "me")] + [user + (if (file-exists? user-peer-file) + (match (first (file->peers user-peer-file)) + [(Peer #f u) (format "+~a" (url->string u) )] + [(Peer n u) (format "+~a; @~a" (url->string u) n)]) + (format "+~a" prog-uri))]) + (format "~a/~a (~a)" prog-name prog-version user))) +(: header-get (-> (Listof Bytes) Bytes (Option Bytes))) +(define (header-get headers name) + (match (filter-map (curry extract-field name) headers) + [(list val) val] + [_ #f])) + +(: uri-download (-> Url Void)) +(define (uri-download u) + (define cached-object-path (url->cache-object-path u)) + (define cached-etag-path (url->cache-etag-path u)) + (define cached-lmod-path (url->cache-lmod-path u)) + (log-debug "uri-download ~v into ~v" u cached-object-path) + (match* ((url-scheme u) (url-host u) (url-port u)) + [(s h p) + #:when (and s h) + (define ssl? (string=? s "https")) + (define-values (status-line headers body-input) + ; TODO Timeout. Currently hangs on slow connections. + (http-sendrecv + h + (url->string (struct-copy url u [scheme #f] [host #f])) + #:ssl? ssl? + #:port (cond [p p] [ssl? 443] [else 80]) + #:headers (list (format "User-Agent: ~a" user-agent)))) + (log-debug "headers: ~v" headers) + (log-debug "status-line: ~v" status-line) + (define status + (string->number (second (string-split (bytes->string/utf-8 status-line))))) + (log-debug "status: ~v" status) + ; TODO Handle redirects + (match status + [200 + (let ([etag (header-get headers #"ETag")] + [lmod (header-get headers #"Last-Modified")]) + (if (and etag + (file-exists? cached-etag-path) + (bytes=? etag (file->bytes cached-etag-path))) + (log-info "ETags match, skipping the rest of ~v" (url->string u)) + (begin + (log-info + "Downloading the rest of ~v. ETag: ~a, Last-Modified: ~v" + (url->string u) etag lmod) + (make-parent-directory* cached-object-path) + (make-parent-directory* cached-etag-path) + (make-parent-directory* cached-lmod-path) + (call-with-output-file cached-object-path + (curry copy-port body-input) + #:exists 'replace) + (when etag + (display-to-file etag cached-etag-path #:exists 'replace)) + (when lmod + (display-to-file etag cached-lmod-path #:exists 'replace)))) + (close-input-port body-input))] + [_ + (raise status)])] + [(_ _ _) + (log-error "Invalid URI: ~v" u)])) + +(: timeline-print (-> Out-Format (Listof Msg) Void)) (define (timeline-print out-format timeline) (void (foldl (match-lambda** - [((and m (msg _ _ nick _ _)) (cons prev-nick i)) - (let ([i (if (string=? prev-nick nick) i (+ 1 i))]) + [((and m (Msg _ _ nick _ _ _)) (cons prev-nick i)) + (let ([i (if (equal? prev-nick nick) i (+ 1 i))]) (msg-print out-format i m) (cons nick i))]) (cons "" 0) timeline))) -; feed->msgs : Feed -> (Listof Msg) -(define (feed->msgs feed) - (log-info "Reading feed nick:~a uri:~a" - (feed-nick feed) - (feed-uri feed)) - (define uri (feed-uri feed)) - (str->msgs (feed-nick feed) uri (uri-read-cached uri))) - -; feed-download : Feed -> Void -(define (feed-download feed) - (log-info "Downloading feed nick:~a uri:~a" - (feed-nick feed) - (feed-uri feed)) +(: peer->msgs (-> Peer (Listof Msg))) +(define (peer->msgs f) + (match-define (Peer nick uri) f) + (log-info "Reading peer nick:~v uri:~v" nick (url->string uri)) + (str->msgs nick uri (uri-read-cached uri))) + +(: peer-download (-> Peer Void)) +(define (peer-download f) + (match-define (Peer nick uri) f) + (define u (url->string uri)) + (log-info "Downloading peer uri:~a" u) (with-handlers - ([exn:fail:network? + ([exn:fail? (λ (e) - (log-error "Network error nick:~a uri:~a exn:~a" - (feed-nick feed) - (feed-uri feed) - e) + (log-error "Network error nick:~v uri:~v exn:~v" nick u e) #f)] [integer? (λ (status) - (log-error "HTTP error nick:~a uri:~a status:~a" - (feed-nick feed) - (feed-uri feed) - status) + (log-error "HTTP error nick:~v uri:~a status:~a" nick u status) #f)]) - (uri-download (feed-uri feed)))) + (define-values (_result _tm-cpu-ms tm-real-ms _tm-gc-ms) + (time-apply uri-download (list uri))) + (log-info "Peer downloaded in ~a seconds, uri: ~a" (/ tm-real-ms 1000.0) u))) -; timeline-download : Integer -> (Listof Feed) -> Void -(define (timeline-download num_workers feeds) +(: timeline-download (-> Integer (Listof Peer) Void)) +(define (timeline-download num-workers peers) ; TODO No need for map - can just iter - (void (concurrent-filter-map num_workers feed-download feeds))) + (void (concurrent-filter-map num-workers peer-download peers))) ; TODO timeline contract : time-sorted list of messages -; timeline-read : (U 'old->new 'new->old) -> (Listof Feeds) -> (Listof Msg) -(define (timeline-read order feeds) +(: timeline-read (-> Timeline-Order (Listof Peer) (Listof Msg))) +(define (timeline-read order peers) (define cmp (match order ['old->new <] ['new->old >])) - (sort (append* (filter-map feed->msgs feeds)) - (λ (a b) (cmp (msg-ts_epoch a) (msg-ts_epoch b))))) - -(define (str->feed str) - ; TODO validation - (define toks (string-split str)) - (apply feed toks)) - -(define (filter-comments lines) - (filter-not (λ (line) (string-prefix? line "#")) lines)) - -(define (str->feeds str) - (map str->feed (filter-comments (str->lines str)))) - -(define (file->feeds filename) - (str->feeds (file->string filename))) - -(define (user-agent prog-name prog-version) - (let* - ([prog-uri "https://github.com/xandkar/tt"] - [user-feed-file (expand-user-path "~/twtxt-me.txt")] - [user - (if (file-exists? user-feed-file) - (let ([user (first (file->feeds user-feed-file))]) - (format "+~a; @~a" (feed-uri user) (feed-nick user))) - (format "+~a" prog-uri))]) - (format "~a/~a (~a)" prog-name prog-version user))) - -(define (start-logger level) - (let* ([logger (make-logger #f #f level #f)] - [log-receiver (make-log-receiver logger level)]) - (void (thread (λ () - (parameterize - ([date-display-format 'iso-8601]) - (let loop () - (define data (sync log-receiver)) - (define level (vector-ref data 0)) - (define msg (vector-ref data 1)) - (define ts (date->string (current-date) #t)) - (eprintf "~a [~a] ~a~n" ts level msg) - (loop)))))) - (current-logger logger))) + (sort (append* (filter-map peer->msgs peers)) + (λ (a b) (cmp (Msg-ts-epoch a) (Msg-ts-epoch b))))) + +(: paths->peers (-> (Listof String) (Listof Peer))) +(define (paths->peers paths) + (let* ([paths (match paths + ['() + (let ([peer-refs-file (build-path tt-home-dir "peers")]) + (log-debug + "No peer ref file paths provided, defaulting to ~v" + (path->string peer-refs-file)) + (list peer-refs-file))] + [paths + (log-debug "Peer ref file paths provided: ~v" paths) + (map string->path paths)])] + [peers (append* (map file->peers paths))]) + (log-info "Read-in ~a peers." (length peers)) + peers)) + +(: log-writer-stop (-> Thread Void)) +(define (log-writer-stop log-writer) + (log-message (current-logger) 'fatal 'stop "Exiting." #f) + (thread-wait log-writer)) + +(: log-writer-start (-> Log-Level Thread)) +(define (log-writer-start level) + (let* ([logger + (make-logger #f #f level #f)] + [log-receiver + (make-log-receiver logger level)] + [log-writer + (thread + (λ () + (parameterize + ([date-display-format 'iso-8601]) + (let loop () + (match-define (vector level msg _ topic) (sync log-receiver)) + (unless (equal? topic 'stop) + (eprintf "~a [~a] ~a~n" (date->string (current-date) #t) level msg) + (loop))))))]) + (current-logger logger) + log-writer)) (module+ main - (require (prefix-in info: setup/getinfo)) - (let ([log-level 'info]) (command-line #:program @@ -305,37 +488,34 @@ "and is one of" "r, read i : Read the timeline." "d, download : Download the timeline." + ; TODO Add path dynamically "u, upload : Upload your twtxt file (alias to execute ~/.tt/upload)." "" #:args (command . args) - (start-logger log-level) + (define log-writer (log-writer-start log-level)) (current-command-line-arguments (list->vector args)) (match command [(or "d" "download") - (let ([num_workers 15]) ; 15 was fastest out of the tried: 1, 5, 10, 20. + ; Initially, 15 was fastest out of the tried: 1, 5, 10, 20. Then I + ; started noticing significant slowdowns. Reducing to 5 seems to help. + (let ([num-workers 5]) (command-line #:program "tt download" #:once-each [("-j" "--jobs") njobs "Number of concurrent jobs." - (set! num_workers (string->number njobs))] - - #:args (filename) - - (current-http-client/response-auto #f) - (let* ([prog-name "tt"] - [prog-version ((info:get-info (list prog-name)) 'version)] - [user-agent (user-agent prog-name prog-version)]) - (current-http-client/user-agent user-agent)) - (timeline-download num_workers (file->feeds filename)) - ))] + (set! num-workers (string->number njobs))] + #:args file-paths + (define-values (_res _cpu real-ms _gc) + (time-apply timeline-download (list num-workers (paths->peers file-paths)))) + (log-info "Timeline downloaded in ~a seconds." (/ real-ms 1000.0))))] [(or "u" "upload") (command-line - #:program - "tt upload" - #:args () - (if (system (path->string (expand-user-path "~/.tt/upload"))) + #:program + "tt upload" + #:args () + (if (system (path->string (build-path tt-home-dir "upload"))) (exit 0) (exit 1)))] [(or "r" "read") @@ -355,6 +535,6 @@ [("-l" "--long") "Long output format" (set! out-format 'multi-line)] - #:args (filename) - (timeline-print out-format (timeline-read order (file->feeds filename)))))] - )))) + #:args file-paths + (timeline-print out-format (timeline-read order (paths->peers file-paths)))))]) + (log-writer-stop log-writer))))