X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=tt.rkt;h=19397e2edade88f2a93ab26feefb636579bebbe3;hb=b8b29fbb27481dd8b1475f5714bfbc61d266dacb;hp=f470538faef56e2a9631fd8028e7fc5377a9d10f;hpb=8da029e4d172f6a2b440c3e56d9575ea369c0001;p=tt.git diff --git a/tt.rkt b/tt.rkt index f470538..19397e2 100644 --- a/tt.rkt +++ b/tt.rkt @@ -23,6 +23,10 @@ (U 'old->new 'new->old)) +(define-type Result + (∀ (α β) (U (cons 'ok α) + (cons 'error β)))) + (struct Msg ([ts-epoch : Integer] [ts-orig : String] @@ -32,8 +36,15 @@ [mentions : (Listof Peer)])) (struct Peer - ([nick : (Option String)] - [uri : Url]) + ([nick : (Option String)] + [uri : Url] + [comment : (Option String)]) + #:transparent) + +(struct Resp + ([status-line : String] + [headers : (Listof Bytes)] + [body-input : Input-Port]) #:transparent) (: tt-home-dir Path-String) @@ -142,19 +153,20 @@ local-time?)]) (+ ts-epoch tz-offset))] [_ - (log-error "Invalid timestamp: ~v" ts) + (log-debug "Invalid timestamp: ~v" ts) #f])))) (: str->msg (-> (Option String) Url String (Option Msg))) (define str->msg (let ([re (pregexp "^([^\\s\t]+)[\\s\t]+(.*)$")]) (λ (nick uri str) + (define str-head (substring str 0 (min 100 (string-length str)))) (with-handlers* ([exn:fail? (λ (e) - (log-error + (log-debug "Failed to parse msg: ~v, from: ~v, at: ~v, because: ~v" - str nick (url->string uri) e) + str-head nick (url->string uri) e) #f)]) (match (regexp-match re str) [(list _wholething ts-orig text) @@ -168,12 +180,12 @@ (regexp-match* #px"@<[^\\s]+([\\s]+)?[^>]+>" text))]) (Msg ts-epoch ts-orig nick uri text mentions)) (begin - (log-error + (log-debug "Msg rejected due to invalid timestamp: ~v, nick:~v, uri:~v" - str nick (url->string uri)) + str-head nick (url->string uri)) #f)))] [_ - (log-debug "Non-msg line from nick:~v, line:~a" nick str) + (log-debug "Non-msg line from nick:~v, line:~a" nick str-head) #f]))))) (module+ test @@ -207,7 +219,7 @@ [nick "foo"] [uri "bar"] [actual (str->msg nick uri (string-append ts tab text))] - [expected (Msg 1605756129 ts nick uri text)]) + [expected (Msg 1605756129 ts nick uri text '())]) (check-equal? (Msg-ts-epoch actual) (Msg-ts-epoch expected) @@ -260,14 +272,17 @@ (define url->cache-object-path url->cache-file-path-v2) +(: cache-object-filename->url (-> Path-String Url)) +(define (cache-object-filename->url name) + (string->url (uri-decode (path->string name)))) + (define (url->cache-etag-path uri) (build-path cache-dir "etags" (uri-encode (url->string uri)))) (define (url->cache-lmod-path uri) (build-path cache-dir "lmods" (uri-encode (url->string uri)))) -; TODO Return Option -(: uri-read-cached (-> Url String)) +(: uri-read-cached (-> Url (Option String))) (define (uri-read-cached uri) (define path-v1 (url->cache-file-path-v1 uri)) (define path-v2 (url->cache-file-path-v2 uri)) @@ -276,24 +291,65 @@ (if (file-exists? path-v2) (file->string path-v2) (begin - (log-warning "Cache file not found for URI: ~a" (url->string uri)) - ""))) + (log-debug "Cache file not found for URI: ~a" (url->string uri)) + #f))) + +(: str->url (-> String (Option String))) +(define (str->url s) + (with-handlers* + ([exn:fail? (λ (e) #f)]) + (string->url s))) (: str->peer (String (Option Peer))) (define (str->peer str) (log-debug "Parsing peer string: ~v" str) - (with-handlers* - ([exn:fail? - (λ (e) - (log-error "Invalid URI in string: ~v, exn: ~v" str e) - #f)]) - (match (string-split str) - [(list u) (Peer #f (string->url u))] - [(list n u) (Peer n (string->url u))] - [_ - (log-error "Invalid peer string: ~v" str) - #f]))) + (match + (regexp-match + #px"(([^\\s\t]+)[\\s\t]+)?([a-zA-Z]+://[^\\s\t]*)[\\s\t]*(#\\s*(.*))?" + str) + [(list _wholething + _nick-with-space + nick + url + _comment-with-hash + comment) + (match (str->url url) + [#f + (log-error "Invalid URI in peer string: ~v" str) + #f] + [url (Peer nick url comment)])] + [_ + (log-debug "Invalid peer string: ~v" str) + #f])) +(module+ test + (check-equal? + (str->peer "foo http://bar/file.txt # some rando") + (Peer "foo" (str->url "http://bar/file.txt") "some rando")) + (check-equal? + (str->peer "http://bar/file.txt # some rando") + (Peer #f (str->url "http://bar/file.txt") "some rando")) + (check-equal? + (str->peer "http://bar/file.txt #") + (Peer #f (str->url "http://bar/file.txt") "")) + (check-equal? + (str->peer "http://bar/file.txt#") ; XXX URLs can have #s + (Peer #f (str->url "http://bar/file.txt#") #f)) + (check-equal? + (str->peer "http://bar/file.txt") + (Peer #f (str->url "http://bar/file.txt") #f)) + (check-equal? + (str->peer "foo http://bar/file.txt") + (Peer "foo" (str->url "http://bar/file.txt") #f)) + (check-equal? + (str->peer "foo bar # baz") + #f) + (check-equal? + (str->peer "foo bar://baz # quux") + (Peer "foo" (str->url "bar://baz") "quux")) + (check-equal? + (str->peer "foo bar//baz # quux") + #f)) (: filter-comments (-> (Listof String) (Listof String))) (define (filter-comments lines) @@ -303,14 +359,64 @@ (define (str->peers str) (filter-map str->peer (filter-comments (str->lines str)))) +(: peers->file (-> (Listof Peers) Path-String Void)) +(define (peers->file peers path) + (display-lines-to-file + (map (match-lambda + [(Peer n u c) + (format "~a~a~a" + (if n (format "~a " n) "") + (url->string u) + (if c (format " # ~a" c) ""))]) + peers) + path + #:exists 'replace)) + (: file->peers (-> Path-String (Listof Peer))) (define (file->peers file-path) (if (file-exists? file-path) (str->peers (file->string file-path)) (begin - (log-error "File does not exist: ~v" (path->string file-path)) + (log-warning "File does not exist: ~v" (path->string file-path)) '()))) +(define re-rfc2822 + #px"^(Mon|Tue|Wed|Thu|Fri|Sat|Sun), ([0-9]{2}) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) ([0-9]{4}) ([0-2][0-9]):([0-6][0-9]):([0-6][0-9]) GMT") + +(: b->n (-> Bytes (Option Number))) +(define (b->n b) + (string->number (bytes->string/utf-8 b))) + +(: mon->num (-> Bytes Natural)) +(define/match (mon->num mon) + [(#"Jan") 1] + [(#"Feb") 2] + [(#"Mar") 3] + [(#"Apr") 4] + [(#"May") 5] + [(#"Jun") 6] + [(#"Jul") 7] + [(#"Aug") 8] + [(#"Sep") 9] + [(#"Oct") 10] + [(#"Nov") 11] + [(#"Dec") 12]) + +(: rfc2822->epoch (-> Bytes (Option Nonnegative-Integer))) +(define (rfc2822->epoch timestamp) + (match (regexp-match re-rfc2822 timestamp) + [(list _ _ dd mo yyyy HH MM SS) + #:when (and dd mo yyyy HH MM SS) + (find-seconds (b->n SS) + (b->n MM) + (b->n HH) + (b->n dd) + (mon->num mo) + (b->n yyyy) + #f)] + [_ + #f])) + (: user-agent String) (define user-agent (let* @@ -321,8 +427,8 @@ [user (if (file-exists? user-peer-file) (match (first (file->peers user-peer-file)) - [(Peer #f u) (format "+~a" (url->string u) )] - [(Peer n u) (format "+~a; @~a" (url->string u) n)]) + [(Peer #f u _) (format "+~a" (url->string u) )] + [(Peer n u _) (format "+~a; @~a" (url->string u) n)]) (format "+~a" prog-uri))]) (format "~a/~a (~a)" prog-name prog-version user))) @@ -332,46 +438,118 @@ [(list val) val] [_ #f])) -(: uri-download (-> Url Void)) -(define (uri-download u) +(: uri-download-from-port + (-> Url (Listof (U Bytes String)) Input-Port + (U 'skipped-cached 'downloaded-new))) ; TODO 'ok|'error ? +(define (uri-download-from-port u headers body-input) + (define u-str (url->string u)) + (log-debug "uri-download-from-port ~v into ~v" u-str cached-object-path) (define cached-object-path (url->cache-object-path u)) (define cached-etag-path (url->cache-etag-path u)) (define cached-lmod-path (url->cache-lmod-path u)) - (log-debug "uri-download ~v into ~v" u cached-object-path) - (define-values (status-line headers body-input) - ; TODO Timeout. Currently hangs on slow connections. - (http-sendrecv/url u #:headers (list (format "User-Agent: ~a" user-agent)))) - (log-debug "headers: ~v" headers) - (log-debug "status-line: ~v" status-line) - (define status - (string->number (second (string-split (bytes->string/utf-8 status-line))))) - (log-debug "status: ~v" status) - ; TODO Handle redirects - (match status - [200 - (let ([etag (header-get headers #"ETag")] - [lmod (header-get headers #"Last-Modified")]) - (if (and etag - (file-exists? cached-etag-path) - (bytes=? etag (file->bytes cached-etag-path))) - (log-info "ETags match, skipping the rest of ~v" (url->string u)) - (begin - (log-info - "Downloading the rest of ~v. ETag: ~a, Last-Modified: ~v" - (url->string u) etag lmod) - (make-parent-directory* cached-object-path) - (make-parent-directory* cached-etag-path) - (make-parent-directory* cached-lmod-path) - (call-with-output-file cached-object-path - (curry copy-port body-input) - #:exists 'replace) - (when etag - (display-to-file etag cached-etag-path #:exists 'replace)) - (when lmod - (display-to-file etag cached-lmod-path #:exists 'replace)))) - (close-input-port body-input))] - [_ - (raise status)])) + (define etag (header-get headers #"ETag")) + (define lmod (header-get headers #"Last-Modified")) + (define lmod-curr (if lmod (rfc2822->epoch lmod) #f)) + (define lmod-prev (if (file-exists? cached-lmod-path) + (rfc2822->epoch (file->bytes cached-lmod-path)) + #f)) + (log-debug "lmod-curr:~v lmod-prev:~v" lmod-curr lmod-prev) + (define cached? + (or (and etag + (file-exists? cached-etag-path) + (bytes=? etag (file->bytes cached-etag-path)) + (begin + (log-debug "ETags match, skipping the rest of ~v" u-str) + #t)) + (and lmod-curr + lmod-prev + (<= lmod-curr lmod-prev) + (begin + (log-debug "Last-Modified <= current skipping the rest of ~v" u-str) + #t)))) + (if (not cached?) + (begin + (log-debug + "Downloading the rest of ~v. ETag: ~a, Last-Modified: ~v" + u-str etag lmod) + (make-parent-directory* cached-object-path) + (make-parent-directory* cached-etag-path) + (make-parent-directory* cached-lmod-path) + (call-with-output-file cached-object-path + (curry copy-port body-input) + #:exists 'replace) + (when etag + (display-to-file etag cached-etag-path #:exists 'replace)) + (when lmod + (display-to-file lmod cached-lmod-path #:exists 'replace)) + 'downloaded-new) + 'skipped-cached)) + +(: uri-download + (-> Positive-Float Url + (Result (U 'skipped-cached 'downloaded-new) + Any))) ; TODO Maybe more-precise error type? +(define (uri-download timeout u) + (define u-str (url->string u)) + (define timeout-chan (make-channel)) + (define result-chan (make-channel)) + (define timeout-thread + (thread (λ () + ; Doing this instead of sync/timeout to distinguish error values, + ; rather than just have #f to work with. + (sleep timeout) + (channel-put timeout-chan '(error . timeout))))) + (define result-thread + (thread (λ () + ; XXX We timeout getting a response, but body download could + ; also take a long time and we might want to time that out as + ; well, but then we may end-up with partially downloaded + ; objects. But that could happen anyway if the server drops the + ; connection for whatever reason. + ; + ; Maybe that is OK once we start treating the + ; downloaded object as an addition to the stored set of + ; messages, rather than the final set of messages. + + ; TODO message db + ; - 1st try can just be an in-memory set that gets written-to + ; and read-from disk as a whole. + (define result + (with-handlers + ; TODO Maybe name each known errno? (exn:fail:network:errno-errno e) + ([exn:fail:network? + (λ (e) `(error . (net-error . ,e)))] + [exn? + (λ (e) `(error . (other . ,e)))]) + (define-values (status-line headers body-input) + (http-sendrecv/url + u + #:headers (list (format "User-Agent: ~a" user-agent)))) + `(ok . ,(Resp status-line headers body-input)))) + (channel-put result-chan result)))) + (define result + (sync timeout-chan + result-chan)) + (kill-thread result-thread) + (kill-thread timeout-thread) + (match result + [(cons 'error _) + result] + [(cons 'ok (Resp status-line headers body-input)) + (log-debug "headers: ~v" headers) + (log-debug "status-line: ~v" status-line) + (define status + (string->number (second (string-split (bytes->string/utf-8 status-line))))) + (log-debug "status: ~v" status) + ; TODO Handle redirects. Should be within same timeout as req and body. + (let ([result + (match status + [200 + `(ok . ,(uri-download-from-port u headers body-input))] + [_ + `(error . (http . ,status))])]) + (close-input-port body-input) + result)])) (: timeline-print (-> Out-Format (Listof Msg) Void)) (define (timeline-print out-format timeline) @@ -384,42 +562,67 @@ timeline))) (: peer->msgs (-> Peer (Listof Msg))) -(define (peer->msgs f) - (match-define (Peer nick uri) f) - (log-info "Reading peer nick:~v uri:~v" nick (url->string uri)) - (str->msgs nick uri (uri-read-cached uri))) - -(: peer-download (-> Peer Void)) -(define (peer-download f) - (match-define (Peer nick uri) f) +(define (peer->msgs peer) + (match-define (Peer nick uri _) peer) + (log-debug "Reading peer nick:~v uri:~v" nick (url->string uri)) + (define msgs-data (uri-read-cached uri)) + (if msgs-data + (str->msgs nick uri msgs-data) + '())) + +(: peer-download + (-> Positive-Float Peer + (Result (U 'skipped-cached 'downloaded-new) + Any))) +(define (peer-download timeout peer) + (match-define (Peer nick uri _) peer) (define u (url->string uri)) - (log-info "Downloading peer uri:~a" u) - (with-handlers - ([exn:fail? - (λ (e) - (log-error "Network error nick:~v uri:~v exn:~v" nick u e) - #f)] - [integer? - (λ (status) - (log-error "HTTP error nick:~v uri:~a status:~a" nick u status) - #f)]) - (define-values (_result _tm-cpu-ms tm-real-ms _tm-gc-ms) - (time-apply uri-download (list uri))) - (log-info "Peer downloaded in ~a seconds, uri: ~a" (/ tm-real-ms 1000.0) u))) - -(: timeline-download (-> Integer (Listof Peer) Void)) -(define (timeline-download num-workers peers) - ; TODO No need for map - can just iter - (void (concurrent-filter-map num-workers peer-download peers))) - -; TODO timeline contract : time-sorted list of messages -(: timeline-read (-> Timeline-Order (Listof Peer) (Listof Msg))) -(define (timeline-read order peers) + (log-info "Download BEGIN URL:~a" u) + (define-values (results _tm-cpu-ms tm-real-ms _tm-gc-ms) + (time-apply uri-download (list timeout uri))) + (define result (car results)) + (log-info "Download END in ~a seconds, URL:~a, result:~s" + (/ tm-real-ms 1000.0) + u + result) + result) + +(: timeline-download (-> Integer Positive-Float (Listof Peer) Void)) +(define (timeline-download num-workers timeout peers) + (define results + (concurrent-filter-map num-workers + (λ (p) (cons p (peer-download timeout p))) + peers)) + (define peers-ok + (filter-map (match-lambda + [(cons p (cons 'ok _)) p] + [(cons _ (cons 'error e)) #f]) + results)) + (define peers-err + (filter-map (match-lambda + [(cons _ (cons 'ok _)) + #f] + [(cons p (cons 'error e)) + (struct-copy Peer p [comment (format "~s" e)])]) + results)) + (peers->file peers-ok (build-path tt-home-dir "peers-last-downloaded-ok")) + (peers->file peers-err (build-path tt-home-dir "peers-last-downloaded-err"))) + +(: uniq (∀ (α) (-> (Listof α) (Listof α)))) +(define (uniq xs) + (set->list (list->set xs))) + +(: peers->timeline (-> (listof Peer) (listof Msg))) +(define (peers->timeline peers) + (append* (filter-map peer->msgs peers))) + +(: timeline-sort (-> (listof Msg) timeline-order (Listof Msgs))) +(define (timeline-sort msgs order) (define cmp (match order ['old->new <] ['new->old >])) - (sort (append* (filter-map peer->msgs peers)) - (λ (a b) (cmp (Msg-ts-epoch a) (Msg-ts-epoch b))))) + (sort msgs (λ (a b) (cmp (Msg-ts-epoch a) + (Msg-ts-epoch b))))) (: paths->peers (-> (Listof String) (Listof Peer))) (define (paths->peers paths) @@ -435,7 +638,33 @@ (map string->path paths)])] [peers (append* (map file->peers paths))]) (log-info "Read-in ~a peers." (length peers)) - peers)) + (uniq peers))) + +(: mentioned-peers-in-cache (-> (Listof Peer))) +(define (mentioned-peers-in-cache) + (define msgs + (append* (map (λ (filename) + (define path (build-path cache-object-dir filename)) + (define size (/ (file-size path) 1000000.0)) + (log-debug "BEGIN parsing ~a MB from file: ~v" + size + (path->string path)) + (define t0 (current-inexact-milliseconds)) + (define m (filter-map + (λ (line) + (str->msg #f (cache-object-filename->url filename) line)) + (filter-comments + (file->lines path)))) + (define t1 (current-inexact-milliseconds)) + (log-debug "END parsing ~a MB in ~a seconds from file: ~v." + size + (* 0.001 (- t1 t0)) + (path->string path)) + (when (empty? m) + (log-debug "No messages found in ~a" (path->string path))) + m) + (directory-list cache-object-dir)))) + (uniq (append* (map Msg-mentions msgs)))) (: log-writer-stop (-> Thread Void)) (define (log-writer-stop log-writer) @@ -473,10 +702,11 @@ #:help-labels "" "and is one of" - "r, read i : Read the timeline." + "r, read : Read the timeline (offline operation)." "d, download : Download the timeline." ; TODO Add path dynamically - "u, upload : Upload your twtxt file (alias to execute ~/.tt/upload)." + "u, upload : Upload your twtxt file (alias to execute ~/.tt/hooks/upload)." + "c, crawl : Discover new peers mentioned by known peers (offline operation)." "" #:args (command . args) (define log-writer (log-writer-start log-level)) @@ -485,7 +715,8 @@ [(or "d" "download") ; Initially, 15 was fastest out of the tried: 1, 5, 10, 20. Then I ; started noticing significant slowdowns. Reducing to 5 seems to help. - (let ([num-workers 5]) + (let ([num-workers 5] + [timeout 10.0]) (command-line #:program "tt download" @@ -493,21 +724,29 @@ [("-j" "--jobs") njobs "Number of concurrent jobs." (set! num-workers (string->number njobs))] + [("-t" "--timeout") + seconds "Timeout seconds per request." + (set! timeout (string->number seconds))] #:args file-paths - (define-values (_res _cpu real-ms _gc) - (time-apply timeline-download (list num-workers (paths->peers file-paths)))) - (log-info "Timeline downloaded in ~a seconds." (/ real-ms 1000.0))))] + (let ([peers (paths->peers file-paths)]) + (define-values (_res _cpu real-ms _gc) + (time-apply timeline-download (list num-workers timeout peers))) + (log-info "Downloaded timelines from ~a peers in ~a seconds." + (length peers) + (/ real-ms 1000.0)))))] [(or "u" "upload") (command-line #:program "tt upload" #:args () - (if (system (path->string (build-path tt-home-dir "upload"))) + (if (system (path->string (build-path tt-home-dir "hooks" "upload"))) (exit 0) (exit 1)))] [(or "r" "read") (let ([out-format 'multi-line] - [order 'old->new]) + [order 'old->new] + [ts-min #f] + [ts-max #f]) (command-line #:program "tt read" @@ -515,6 +754,12 @@ [("-r" "--rev") "Reverse displayed timeline order." (set! order 'new->old)] + [("-m" "--min") + m "Earliest time to display (ignore anything before it)." + (set! ts-min (rfc3339->epoch m))] + [("-x" "--max") + x "Latest time to display (ignore anything after it)." + (set! ts-max (rfc3339->epoch x))] #:once-any [("-s" "--short") "Short output format" @@ -523,5 +768,72 @@ "Long output format" (set! out-format 'multi-line)] #:args file-paths - (timeline-print out-format (timeline-read order (paths->peers file-paths)))))]) + (let* ([peers + (paths->peers file-paths)] + [timeline + (timeline-sort (peers->timeline peers) order)] + [timeline + (filter (λ (m) (and (if ts-min (>= (Msg-ts-epoch m) + ts-min) + #t) + (if ts-max (<= (Msg-ts-epoch m) + ts-max) + #t))) + timeline)]) + (timeline-print out-format timeline))))] + [(or "c" "crawl") + (command-line + #:program + "tt crawl" + #:args () + (let* ([peers-sort + (λ (peers) (sort peers (match-lambda** + [((Peer n1 _ _) (Peer n2 _ _)) + (stringpeers peers-mentioned-file)] + [peers-mentioned + (peers-sort (uniq (append peers-mentioned-prev + peers-mentioned-curr)))] + [peers-all-prev + (file->peers peers-all-file)] + [peers-all + (list->set (append peers-mentioned + peers-all-prev))] + [peers-discovered + (set-subtract peers-all (list->set peers-all-prev))] + [peers-all + (peers-sort (set->list peers-all))] + [peers-parsed + (filter + (λ (p) (< 0 (length (peer->msgs p)))) + peers-all)]) + (log-info "Known peers mentioned: ~a" (length peers-mentioned)) + (log-info "Known peers parsed ~a" (length peers-parsed)) + (log-info "Known peers total: ~a" (length peers-all)) + (log-info "Discovered ~a new peers:~n~a" + (set-count peers-discovered) + (pretty-format (map + (λ (p) (cons (Peer-nick p) + (url->string (Peer-uri p)))) + (set->list peers-discovered)))) + (peers->file peers-mentioned + peers-mentioned-file) + (peers->file peers-parsed + peers-parsed-file) + (peers->file peers-all + peers-all-file)))] + [command + (eprintf "Error: invalid command: ~v\n" command) + (eprintf "Please use the \"--help\" option to see a list of available commands.\n") + (exit 1)]) (log-writer-stop log-writer))))