mvxcvi/blocks

2.0.2


Content-addressed data storage interface.

dependencies

org.clojure/clojure
1.10.1
org.clojure/data.priority-map
0.0.10
org.clojure/tools.logging
0.5.0
byte-streams
0.2.4
com.stuartsierra/component
0.4.0
commons-io
2.6
manifold
0.1.8
mvxcvi/multiformats
0.2.0



(this space intentionally left almost blank)
 

Block type and constructor functions.

Blocks have two primary attributes, :id and :size. The block identifier is a multihash with the digest identifying the content. The size is the number of bytes in the block content. Blocks also have a :stored-at value giving the instant they were persisted, but this does not affect equality or the block's hash code.

Internally, blocks may reference their content in-memory as a byte array, or a content reader which constructs new input streams for the block data on demand. A block with in-memory content is considered a loaded block, while blocks with readers are lazy blocks.

(ns ^:no-doc blocks.data
  (:require
    [byte-streams :as bytes]
    [multiformats.hash :as multihash])
  (:import
    blocks.data.PersistentBytes
    (java.io
      InputStream
      IOException)
    java.time.Instant
    multiformats.hash.Multihash
    org.apache.commons.io.input.BoundedInputStream))

Block Type

(deftype Block
  [^Multihash id
   ^long size
   ^Instant stored-at
   content
   _meta]
  :load-ns true
  java.lang.Object
  (toString
    [this]
    (format "Block[%s %s %s]" id size stored-at))
  (equals
    [this that]
    (boolean
      (or (identical? this that)
          (when (identical? (class this) (class that))
            (let [that ^Block that]
              (and (= id   (.id   that))
                   (= size (.size that))))))))
  (hashCode
    [this]
    (-> (hash (class this))
        (hash-combine (hash id))
        (hash-combine size)))
  java.lang.Comparable
  (compareTo
    [this that]
    (if (= id (:id that))
      (if (= size (:size that))
        (if (= stored-at (:stored-at that))
          0
          (compare stored-at (:stored-at that)))
        (compare size (:size that)))
      (compare id (:id that))))
  clojure.lang.IObj
  (meta
    [this]
    _meta)
  (withMeta
    [this meta-map]
    (Block. id size stored-at content meta-map))
  clojure.lang.ILookup
  (valAt
    [this k]
    (.valAt this k nil))
  (valAt
    [this k not-found]
    (case k
      :id id
      :size size
      :stored-at stored-at
      not-found)))
(defmethod print-method Block
  [v ^java.io.Writer w]
  (.write w (str v)))

Content Readers

Content readers provide functions for repeatably reading byte streams from some backing data source.

(defprotocol ContentReader
  (read-all
    [reader]
    "Open an input stream that returns all bytes of the content.")
  (read-range
    [reader start end]
    "Open an input stream that reads just bytes from `start` to `end`,
    inclusive. A `nil` for either value implies the beginning or end of the
    stream, respectively."))

Wrap an input stream such that it only returns a stream of bytes in the range start to end.

(defn- bounded-input-stream
  ^java.io.InputStream
  [^InputStream input start end]
  (when (pos-int? start)
    (.skip input start))
  (if (pos-int? end)
    (BoundedInputStream. input (- end (or start 0)))
    input))
(extend-protocol ContentReader

  PersistentBytes

  (read-all
    [^PersistentBytes this]
    (.open this))


  (read-range
    [^PersistentBytes this start end]
    (bounded-input-stream (.open this) start end))


  clojure.lang.Fn

  (read-all
    [this]
    (this))


  (read-range
    [this start end]
    ; Ranged open not supported for generic functions, use naive approach.
    (bounded-input-stream (this) start end)))

Open an input stream to read the contents of the block.

(defn content-stream
  ^java.io.InputStream
  [^Block block start end]
  (let [content (.content block)]
    (if (or start end)
      (read-range content start end)
      (read-all content))))

True if the argument is a persistent byte array.

(defn persistent-bytes?
  [x]
  (instance? PersistentBytes x))

True if the block has content loaded into memory as persistent bytes.

(defn byte-content?
  [^Block block]
  (persistent-bytes? (.content block)))

Constructors

Remove automatic constructor function.

(alter-meta! #'->Block assoc :private true)

Return the current instant in time.

This is mostly useful for rebinding during tests.

(defn- now
  ^Instant
  []
  (Instant/now))

Return the hashing function for an algorithm keyword, or throw an exception if no supported function is available.

(defn hasher
  [algorithm]
  (or (multihash/functions algorithm)
      (throw (IllegalArgumentException.
               (str "No digest function found for algorithm "
                    algorithm)))))

Create a block from a content reader. The simplest version is a no-arg function which should return a new InputStream to read the full block content. The block is given the id and size directly, without being checked.

(defn create-block
  ([id size content]
   (create-block id size (now) content))
  ([id size stored-at content]
   (when-not (instance? Multihash id)
     (throw (ex-info "Block id must be a multihash"
                     {:id id, :size size, :stored-at stored-at})))
   (when-not (pos-int? size)
     (throw (ex-info "Block size must be a positive integer"
                     {:id id, :size size, :stored-at stored-at})))
   (when-not (instance? Instant stored-at)
     (throw (ex-info "Block must have a stored-at instant"
                     {:id id, :size size, :stored-at stored-at})))
   (when-not content
     (throw (ex-info "Block must have a content reader"
                     {:id id, :size size, :stored-at stored-at})))
   (->Block id size stored-at content nil)))

Create a block by reading the source into memory and hashing it.

(defn read-block
  [algorithm source]
  (let [hash-fn (hasher algorithm)
        content (PersistentBytes/wrap (bytes/to-byte-array source))
        size (count content)]
    (when (pos? size)
      (create-block (hash-fn (read-all content)) size (now) content))))

Create a new block by merging together two blocks representing the same content. Block ids and sizes must match. The new block's content and timestamp come from the second block, and any metadata is merged together.

(defn merge-blocks
  [^Block left ^Block right]
  (when (not= (.id left) (.id right))
    (throw (ex-info
             (str "Cannot merge blocks with differing ids " (.id left)
                  " and " (.id right))
             {:left left, :right right})))
  (when (not= (.size left) (.size right))
    (throw (ex-info
             (str "Cannot merge blocks with differing sizes " (.size left)
                  " and " (.size right))
             {:left left, :right right})))
  (->Block
    (.id right)
    (.size right)
    (.stored-at right)
    (.content right)
    (not-empty (merge (._meta left) (._meta right)))))

Wrap a block's content by calling f on it, returning a new block with the same id and size.

(defn wrap-content
  ^blocks.data.Block
  [^Block block f]
  (->Block
    (.id block)
    (.size block)
    (.stored-at block)
    (f (.content block))
    (._meta block)))
 

A 'summary' represents a collection of blocks, including certain statistics about the aggregate count and sizes. These are useful for returning from certain operations to represent the set of blocks acted upon.

The following fields are present in a summary:

  • :count The total number of blocks added to the summary.
  • :size The total size of blocks added to the summary, in bytes.
  • :sizes A histogram map from bucket exponent to a count of the blocks in that bucket (see size->bucket and bucket->range).
(ns blocks.summary
  (:refer-clojure :exclude [update merge]))

Construct a new, empty summary.

(defn init
  []
  {:count 0
   :size 0
   :sizes {}})

Assigns a block size to an exponential histogram bucket. Given a size s, returns n such that 2^n <= s < 2^(n+1).

(defn size->bucket
  [size]
  (loop [s size
         n 0]
    (if (pos? s)
      (recur (bit-shift-right s 1) (inc n))
      n)))

Returns a vector with the boundaries which a given size bucket covers.

(defn bucket->range
  [n]
  [(bit-shift-left 1 (dec n))
   (bit-shift-left 1 n)])

Update the summary with the stats from the given block.

(defn update
  [summary block]
  (when (instance? Throwable block)
    (throw block))
  (-> summary
      (clojure.core/update :count inc)
      (clojure.core/update :size + (:size block))
      (clojure.core/update :sizes clojure.core/update (size->bucket (:size block)) (fnil inc 0))))

Merge two summaries together.

(defn merge
  [a b]
  (-> a
      (clojure.core/update :count + (:count b))
      (clojure.core/update :size + (:size b))
      (clojure.core/update :sizes (partial merge-with +) (:sizes b))
      (clojure.core/merge (dissoc b :count :size :sizes))))
 

Core block storage API.

Functions which may cause side effects or IO are marked with bangs - for example (read! "foo") doesn't have side-effects, but (read! some-input-stream) will consume bytes from the stream.

(ns blocks.core
  (:refer-clojure :exclude [get list])
  (:require
    [blocks.data :as data]
    [blocks.meter :as meter]
    [blocks.store :as store]
    [blocks.summary :as sum]
    [byte-streams :as bytes]
    [clojure.java.io :as io]
    [clojure.set :as set]
    [clojure.string :as str]
    [manifold.deferred :as d]
    [manifold.stream :as s]
    [multiformats.hash :as multihash])
  (:import
    (blocks.data
      Block
      PersistentBytes)
    (java.io
      File
      FileInputStream)
    java.time.Instant
    multiformats.hash.Multihash
    org.apache.commons.io.input.CountingInputStream))

Utilities

The hashing algorithm used if not specified in functions which create blocks.

(def default-algorithm
  :sha2-256)

Accept arguments and return a map corresponding to the input. Accepts either a single map argument or kw-args.

(defn- args->map
  [args]
  (when (seq args)
    (if (and (= 1 (count args))
             (or (map? (first args))
                 (nil? (first args))))
      (first args)
      (apply hash-map args))))

True if the value is a hexadecimal string.

(defn- hex-string?
  [x]
  (and (string? x) (re-matches #"[0-9a-fA-F]*" x)))

True if the value is a multihash.

(defn- multihash?
  [x]
  (instance? Multihash x))

Block IO

True if the block's content is already loaded into memory.

(defn loaded?
  [block]
  (data/byte-content? block))

True if the given block reads its content on-demand.

(defn lazy?
  [block]
  (not (data/byte-content? block)))

Create a lazy block from a local file. Returns the block, or nil if the file does not exist or is empty.

The file is read once to calculate the identifier.

(defn from-file
  ([file]
   (from-file file default-algorithm))
  ([file algorithm]
   (let [file (io/file file)
         hash-fn (data/hasher algorithm)]
     (io!
       (when (and (.exists file) (pos? (.length file)))
         (data/create-block
           (hash-fn (FileInputStream. file))
           (.length file)
           (Instant/ofEpochMilli (.lastModified file))
           (fn reader [] (FileInputStream. file))))))))

Open an input stream to read the contents of the block.

If an options map with :start or :end are given, the input stream will only return content from the starting index byte to the byte before the end index. For example, opening a block with size n with these options would return the full block contents:

  (open block {:start 0, :end n})

Omitting either boundary will read from the beginning or to the end of the block, respectively.

(defn open
  (^java.io.InputStream
   [block]
   (open block nil))
  (^java.io.InputStream
   [block opts]
   (let [{:keys [start end]} opts]
     (when (and start (or (not (nat-int? start))
                          (<= (:size block) start)))
       (throw (IllegalArgumentException.
                (format "Range start must be an integer within block size %d: %s"
                        (:size block) start))))
     (when (and end (or (not (pos-int? end))
                        (< (:size block) end)))
       (throw (IllegalArgumentException.
                (format "Range end must be an integer within block size %d: %s"
                        (:size block) end))))
     (when (and start end (not (< start end)))
       (throw (IllegalArgumentException.
                (format "Range start %d must be less than range end %d"
                        start end))))
     (io! (data/content-stream block start end)))))

Read data into memory from the given source and hash it to identify the block.

(defn read!
  ([source]
   (read! source default-algorithm))
  ([source algorithm]
   (io! (data/read-block algorithm source))))

Write a block's content to an output stream.

(defn write!
  [block out]
  (with-open [stream (open block)]
    (bytes/transfer stream out)))

Ensure the block's content is loaded into memory. Returns a loaded version of the given block.

If the block is lazy, the stream is read into memory and returned as a new block. If the block is already loaded, it is returned unchanged. The returned block will have the same metadata as the one given.

(defn load!
  [block]
  (io!
    (if (lazy? block)
      (with-meta
        (data/read-block
          (:algorithm (:id block))
          (data/content-stream block nil nil))
        (meta block))
      block)))

Check a block to verify that it has the correct identifier and size for its content. Returns true if the block is valid, or throws an exception on any error.

(defn validate!
  [block]
  (let [id (:id block)
        size (:size block)]
    (when-not (multihash? id)
      (throw (ex-info
               (str "Block id is not a multihash: " (pr-str id))
               {:id id})))
    (when-not (pos-int? size)
      (throw (ex-info
               (str "Block " id " has an invalid size: " (pr-str size))
               {:id id, :size size})))
    (with-open [stream (CountingInputStream. (open block))]
      (let [hash-fn (data/hasher (:algorithm id))
            actual-id (hash-fn stream)
            actual-size (.getByteCount stream)]
        (when (not= id actual-id)
          (throw (ex-info
                   (str "Block " id " has mismatched id and content")
                   {:id id, :actual-id actual-id})))
        (when (not= size actual-size)
          (throw (ex-info
                   (str "Block " id " reports size " size
                        " but has actual size " actual-size)
                   {:id id, :size size, :actual-size actual-size})))))
    true))

Storage API

Constructs a new block store from a URI by dispatching on the scheme. The store will be returned in an initialized (but not started) state.

(defn ->store
  [uri]
  (store/initialize uri))

Enumerate the stored blocks, returning a stream of blocks ordered by their multihash id. The store will continue listing blocks until the stream is closed or there are no more matching blocks to return.

  • :algorithm Only return blocks identified by this hash algorithm.
  • :after Return blocks whose id (in hex) lexically follows this string. A multihash may also be provided and will be coerced to hex.
  • :before Return blocks whose id (in hex) lexically precedes this string. A multihash may also be provided and will be coerced to hex.
  • :limit Restrict the maximum number of blocks returned on the stream.
(defn list
  [store & opts]
  (let [opts (args->map opts)
        opts (merge
               ; Validate algorithm option.
               (when-let [algorithm (:algorithm opts)]
                 (if (keyword? algorithm)
                   {:algorithm algorithm}
                   (throw (IllegalArgumentException.
                            (str "Option :algorithm is not a keyword: "
                                 (pr-str algorithm))))))
               ; Validate 'after' boundary.
               (when-let [after (:after opts)]
                 (cond
                   (hex-string? after)
                   {:after (str/lower-case after)}
                   (multihash? after)
                   {:after (multihash/hex after)}
                   :else
                   (throw (IllegalArgumentException.
                            (str "Option :after is not a hex string or multihash: "
                                 (pr-str after))))))
               ; Validate 'before' boundary.
               (when-let [before (:before opts)]
                 (cond
                   (hex-string? before)
                   {:before (str/lower-case before)}
                   (multihash? before)
                   {:before (multihash/hex before)}
                   :else
                   (throw (IllegalArgumentException.
                            (str "Option :before is not a hex string or multihash: "
                                 (pr-str before))))))
               ; Validate query limit.
               (when-let [limit (:limit opts)]
                 (if (pos-int? limit)
                   {:limit limit}
                   (throw (IllegalArgumentException.
                            (str "Option :limit is not a positive integer: "
                                  (pr-str limit))))))
               ; Ensure no other options.
               (when-let [bad-opts (not-empty (dissoc opts :algorithm :after :before :limit))]
                 (throw (IllegalArgumentException.
                          (str "Unknown options passed to list: " (pr-str bad-opts))))))]
    (meter/measure-stream
      store :list nil
      (io! (store/select-blocks opts (store/-list store opts))))))

Enumerate the stored blocks, returning a sequence of blocks ordered by their multihash id. This wraps the list method and consumes the stream lazily, terminating when the stream is drained, a timeout is encountered, or a list exception is observed on the stream.

Accepts the same options as list, plus:

  • :timeout Millisecond duration to wait for new blocks to arrive on the stream. (default: 10000)
(defn list-seq
  [store & opts]
  (let [opts (args->map opts)
        timeout (:timeout opts 10000)]
    (when-not (pos-int? timeout)
      (throw (IllegalArgumentException.
               (str "Option :timeout is not a positive integer: "
                     (pr-str timeout)))))
    (letfn [(stream->seq
              [s]
              (lazy-seq
                (let [x @(s/try-take! s ::drained timeout ::timeout)]
                  (when (instance? Throwable x)
                    (throw x))
                  (when (identical? ::timeout x)
                    (throw (ex-info
                             (format "Block stream consumption timed out after %d ms"
                                     timeout)
                             {:opts opts})))
                  (when-not (identical? ::drained x)
                    (cons x (stream->seq s))))))]
      (stream->seq (list store (dissoc opts :timeout))))))

Load metadata about a block if the store contains it. Returns a deferred which yields a map with block information but no content, or nil if the store does not contain the identified block.

The block stats include the :id, :size, and :stored-at fields. The returned map may also have additional implementation-specific storage metadata, similar to returned blocks.

(defn stat
  [store id]
  (when-not (multihash? id)
    (throw (IllegalArgumentException.
             (str "Block id must be a multihash, got: " (pr-str id)))))
  (meter/measure-method
    store :stat
    {:block-id id}
    (io! (store/-stat store id))))

Load a block from the store. Returns a deferred which yields the block if the store contains it, or nil if no block is stored for that id.

(defn get
  [store id]
  (when-not (multihash? id)
    (throw (IllegalArgumentException.
             (str "Block id must be a multihash, got: " (pr-str id)))))
  (d/chain
    (meter/measure-method
      store :get
      {:block-id id}
      (io! (store/-get store id)))
    (fn validate-block
      [block]
      (when block
        (when-not (= id (:id block))
          (throw (RuntimeException.
                   (str "Asked for block " id " but got " (:id block)))))
        (meter/metered-block store ::meter/io-read block)))))

Save a block into the store. Returns a deferred which yields the stored block, which may have already been present in the store.

(defn put!
  [store block]
  (when-not (instance? Block block)
    (throw (IllegalArgumentException.
             (str "Argument must be a block, got: " (pr-str block)))))
  (d/chain
    (meter/measure-method
      store :put!
      {:block-id (:id block)
       :block-size (:size block)}
      (->> block
           (meter/metered-block store ::meter/io-write)
           (store/-put! store)
           (io!)))
    (fn meter-block
      [block]
      (meter/metered-block store ::meter/io-read block))))

Store content from a byte source in a block store. Returns a deferred which yields the stored block, or nil if the source was empty.

If the source is a file, it will be streamed into the store, otherwise the content is read into memory.

(defn store!
  ([store source]
   (store! store source default-algorithm))
  ([store source algorithm]
   (d/chain
     (io!
       (store/future'
         (if (instance? File source)
           (from-file source algorithm)
           (read! source algorithm))))
     (fn put-block
       [block]
       (when block
         (put! store block))))))

Remove a block from the store. Returns a deferred which yields true if the block was found and removed.

(defn delete!
  [store id]
  (when-not (multihash? id)
    (throw (IllegalArgumentException.
             (str "Block id must be a multihash, got: " (pr-str id)))))
  (meter/measure-method
    store :delete!
    {:block-id id}
    (io! (store/-delete! store id))))

Batch API

Retrieve a batch of blocks identified by a collection of multihashes. Returns a deferred which yields a collection of the blocks which were found.

The blocks are returned in no particular order, and any missing blocks are omitted from the result.

(defn get-batch
  [store ids]
  (d/chain
    (->> (distinct ids)
         (map (partial get store))
         (apply d/zip))
    (fn omit-missing
      [blocks]
      (into [] (remove nil?) blocks))))

Save a collection of blocks into the store. Returns a deferred which yields a collection of stored blocks.

This is not guaranteed to be atomic; readers may see the store in a partially updated state.

(defn put-batch!
  [store blocks]
  (if-let [blocks (seq (remove nil? blocks))]
    (apply d/zip (map (partial put! store) blocks))
    (d/success-deferred [])))

Remove a batch of blocks from the store, identified by a collection of multihashes. Returns a deferred which yields a set of ids for the blocks which were found and deleted.

This is not guaranteed to be atomic; readers may see the store in a partially deleted state.

(defn delete-batch!
  [store ids]
  (if-let [ids (not-empty (into [] (comp (remove nil?) (distinct)) ids))]
    (d/chain
      (apply d/zip (map (partial delete! store) ids))
      (fn match-ids
        [results]
        (into #{}
              (comp
                (filter first)
                (map second))
              (map vector results ids))))
    (d/success-deferred #{})))

Storage Utilities

Scan blocks in the store, building up a summary. Returns a deferred which yields the summary map when the scan is complete.

Accepts the same arguments as list, plus:

  • :filter A predicate function which will be used to filter blocks listed by the store. By default, all blocks are included.
(defn scan
  [store & opts]
  (let [opts (args->map opts)]
    (->
      (list store (dissoc opts :filter))
      (cond->>
        (:filter opts) (s/filter (:filter opts)))
      (->>
        (s/reduce sum/update (sum/init))))))

Completely remove all data associated with the store. After this call, the store will be empty. Returns a deferred which yields true once the store has been erased.

This is not guaranteed to be atomic; readers may see the store in a partially erased state.

(defn erase!
  [store]
  (io!
    (if (satisfies? store/ErasableStore store)
      (meter/measure-method
        store :erase! nil
        (store/-erase! store))
      ; TODO: should be able to parallelize this - how to communicate errors?
      (s/consume-async
        (fn erase-block
          [block]
          (if (instance? Throwable block)
            (d/error-deferred block)
            (delete! store (:id block))))
        (list store)))))

Synchronize blocks from the source store to the dest store. Returns a deferred which yields a summary of the copied blocks. Options may include:

  • :filter A function to run on every block before it is synchronized. The block will only be copied if the filter returns a truthy value.
(defn sync!
  [source dest & opts]
  (let [opts (args->map opts)
        stream (cond->> (io! (store/missing-blocks
                               (store/-list source nil)
                               (store/-list dest nil)))
                 (:filter opts) (s/filter (:filter opts)))]
    (d/loop [summary (sum/init)]
      (d/chain'
        (s/take! stream ::drained)
        (fn copy-next
          [block]
          (cond
            (identical? ::drained block)
            summary
            (instance? Throwable block)
            (d/error-deferred block)
            :else
            (d/chain
              (put! dest block)
              (fn update-sum
                [block']
                (d/recur (sum/update summary block'))))))))))
 

Block storage protocols. Typically, clients of the library should use the API wrapper functions in blocks.core instead of using these methods directly.

(ns ^:no-doc blocks.store
  (:require
    [blocks.data :as data]
    [clojure.string :as str]
    [manifold.deferred :as d]
    [manifold.stream :as s]
    [multiformats.hash :as multihash]))

Storage Protocols

Protocol for content-addressable storage keyed by multihash identifiers.

(defprotocol BlockStore
  (-list
    [store opts]
    "List the blocks contained in the store. This method should return a stream
    of blocks ordered by multihash id. See `blocks.core/list` for the supported
    options.
    The method must return _at least_ the blocks which match the query options,
    and _should_ optimize the results by omitting unmatched blocks. The
    returned stream may be closed preemptively if the consumer is done, which
    should terminate the list thread.
    If the listing thread encounters an exception, the error should be placed
    on the stream and the stream should be closed to indicate no further blocks
    will be coming. Consumers must handle exceptions propagated on the stream.")
  (-stat
    [store id]
    "Load a block's metadata if the store contains it. Returns a deferred which
    yields a map with block information but no content, or nil if the store
    does not contain the identified block.")
  (-get
    [store id]
    "Fetch a block from the store. Returns a deferred which yields the block,
    or nil if not present.")
  (-put!
    [store block]
    "Persist a block into the store. Returns a deferred which yields the
    stored block, which may have already been present in the store.")
  (-delete!
    [store id]
    "Remove a block from the store. Returns a deferred which yields true if the
    block was stored, false if it was not."))

An erasable store has some notion of being removed in its entirety, often atomically. For example, a file system might unlink the root directory rather than deleting each individual file.

(defprotocol ErasableStore
  (-erase!
    [store]
    "Completely removes any data associated with the store. Returns a deferred
    value which yields when the store is erased."))

Store Construction

Parse a URI string into a map of keywords to URI parts.

(defn parse-uri
  [location]
  (let [uri (java.net.URI. location)]
    (->>
      {:scheme (.getScheme uri)
       :name (and (nil? (.getAuthority uri))
                  (nil? (.getPath uri))
                  (.getSchemeSpecificPart uri))
       :user-info (when-let [info (.getUserInfo uri)]
                    (zipmap [:id :secret] (str/split info #":" 2)))
       :host (.getHost uri)
       :port (when (not= (.getPort uri) -1)
               (.getPort uri))
       :path (.getPath uri)
       :query (when-let [query (.getQuery uri)]
                (->> (str/split query #"&")
                     (map #(let [[k v] (str/split % #"=")]
                             [(keyword k) v]))
                     (into {})))
       :fragment (.getFragment uri)}
      (filter val)
      (into {}))))

Constructs a new block store from a URI by dispatching on the scheme. The store will be returned in an initialized but not started state.

(defmulti initialize
  (comp :scheme parse-uri))
(defmethod initialize :default
  [uri]
  (throw (IllegalArgumentException.
           (str "Unsupported block-store URI scheme: " (pr-str uri)))))

Alters the metadatata on the given var symbol to change the visibility to private.

(defmacro privatize!
  [var-sym]
  `(alter-meta! #'~var-sym assoc :private true))

Alters the metadata on the automatic record constructor functions to set their visibility to private.

(defmacro privatize-constructors!
  [record-name]
  `(do (privatize! ~(symbol (str "->" record-name)))
       (privatize! ~(symbol (str "map->" record-name)))))

Async Utilities

A helper for the future macro which wraps some submission logic in a common function.

(defn ^:no-doc schedule-future!
  [d body-fn]
  (manifold.utils/future-with
    (manifold.executor/execute-pool)
    (when-not (d/realized? d)
      (try
        (d/success! d (body-fn))
        (catch Throwable ex
          (d/error! d ex))))))

Alternative to d/future that has better coverage.

(defmacro future'
  [& body]
  `(let [d# (d/deferred)]
     (schedule-future! d# (fn future# [] ~@body))
     d#))

Apply a function to each of the given block stores in parallel. Returns a deferred which yields the vector of results.

(defn zip-stores
  [stores f & args]
  (apply d/zip (map #(apply f % args) stores)))

Apply a function to each of the given block stores in order until one returns a non-nil result. Returns a deferred which yields the result, or nil if all stores returned nil.

(defn some-store
  [stores f & args]
  (d/loop [stores stores]
    (when-let [store (first stores)]
      (d/chain
        (apply f store args)
        (fn check-result
          [result]
          (if (nil? result)
            (d/recur (rest stores))
            result))))))

Stream Utilities

Choose among multiple blocks to determine the optimal one to use for copying into a new store. Returns the first loaded block, if any are keeping in-memory content. If none are, returns the first block.

(defn preferred-block
  [& blocks]
  (when-let [blocks (seq (remove nil? blocks))]
    (or (first (filter data/byte-content? blocks))
        (first blocks))))

Select blocks from a stream based on the criteria spported in -list. Returns a filtered view of the block streams that will close the source once the relevant blocks have been read.

(defn select-blocks
  [opts blocks]
  (let [{:keys [algorithm after before limit]} opts
        counter (atom 0)
        out (s/stream)]
    (s/connect-via
      blocks
      (fn test-block
        [block]
        (if (instance? Throwable block)
          ; Propagate error on the stream.
          (do (s/put! out block)
              (s/close! out)
              (d/success-deferred false))
          ; Determine if block matches query criteria.
          (let [id (:id block)
                hex (multihash/hex id)]
            (cond
              ; Ignore any blocks which don't match the algorithm.
              (and algorithm (not= algorithm (:algorithm id)))
              (d/success-deferred true)
              ; Drop blocks until an id later than `after`.
              (and after (not (neg? (compare after hex))))
              (d/success-deferred true)
              ; Terminate the stream if block is later than `before` or `limit`
              ; blocks have already been returned.
              (or (and before (not (pos? (compare before hex))))
                  (and (pos-int? limit) (< limit (swap! counter inc))))
              (do (s/close! out)
                  (d/success-deferred false))
              ; Otherwise, pass the block along.
              :else
              (s/put! out block)))))
        out
        {:description {:op "select-blocks"}})
    (s/source-only out)))

Merge multiple streams of blocks and return a stream with one block per unique id, maintaining sorted order. The input streams are consumed incrementally and must already be sorted.

(defn merge-blocks
  [& streams]
  (if (= 1 (count streams))
    (first streams)
    (let [intermediates (mapv
                          (fn hook-up
                            [a]
                            (let [b (s/stream)]
                              (s/connect-via
                                a #(s/put! b %) b
                                {:description {:op "merge-blocks"}})
                              b))
                          streams)
          out (s/stream)]
      (d/loop [inputs (map vector intermediates (repeat nil))]
        (d/chain
          ; Take the head value from each stream we don't already have.
          (->>
            inputs
            (map (fn take-next
                   [[input head :as pair]]
                   (if (nil? head)
                     (d/chain
                       (s/take! input ::drained)
                       (partial vector input))
                     pair)))
            (apply d/zip))
          ; Remove drained streams from consideration.
          (fn remove-drained
            [inputs]
            (remove #(identical? ::drained (second %)) inputs))
          ; Find the next earliest block to return.
          (fn find-next
            [inputs]
            (if (empty? inputs)
              ; Every input is drained.
              (s/close! out)
              ; Check inputs for errors.
              (if-let [error (->> (map second inputs)
                                  (filter #(instance? Throwable %))
                                  (first))]
                ; Propagate error.
                (d/finally
                  (s/put! out error)
                  #(s/close! out))
                ; Determine the next block to output.
                (let [earliest (first (sort-by :id (map second inputs)))]
                  (d/chain
                    (s/put! out earliest)
                    (fn check-put
                      [result]
                      (if result
                        ; Remove any blocks matching the one emitted.
                        (d/recur (mapv (fn remove-earliest
                                         [[input head :as pair]]
                                         (if (= (:id earliest) (:id head))
                                           [input nil]
                                           pair))
                                       inputs))
                        ; Out was closed on us.
                        false)))))))))
      (s/source-only out))))

Compare two block streams and generate a derived stream of the blocks in source which are not present in dest.

(defn missing-blocks
  [source dest]
  (let [src (s/stream)
        dst (s/stream)
        out (s/stream)
        close-all! (fn close-all!
                     []
                     (s/close! src)
                     (s/close! dst)
                     (s/close! out))]
    (s/connect-via
      source #(s/put! src %) src
      {:description {:op "missing-blocks"}})
    (s/connect-via
      dest #(s/put! dst %) dst
      {:description {:op "missing-blocks"}})
    (d/loop [s nil
             d nil]
      (d/chain
        (d/zip
          (if (nil? s)
            (s/take! src ::drained)
            s)
          (if (nil? d)
            (s/take! dst ::drained)
            d))
        (fn compare-next
          [[s d]]
          (cond
            ; Source stream exhausted; terminate sequence.
            (identical? ::drained s)
            (close-all!)
            ; Destination stream exhausted; return remaining blocks in source.
            (identical? ::drained d)
            (-> (s/put! out s)
                (d/chain
                  (fn [_] (s/drain-into src out)))
                (d/finally close-all!))
            ; Source threw an error; propagate it.
            (instance? Throwable s)
            (d/finally
              (s/put! out s)
              close-all!)
            ; Dest threw an error; propagate it.
            (instance? Throwable d)
            (d/finally
              (s/put! out d)
              close-all!)
            ; Block is present in both streams; drop and continue.
            (= (:id s) (:id d))
            (d/recur nil nil)
            ; Source has a block not in dest.
            (neg? (compare (:id s) (:id d)))
            (d/chain
              (s/put! out s)
              (fn onwards
                [result]
                (when result
                  (d/recur nil d))))
            ; Next source block comes after some dest blocks; skip forward.
            :else
            (d/recur s nil)))))
    (s/source-only out)))
 

Instrumentation for block stores to measure data flows, call latencies, and other metrics.

The logic in this namespace is built around the notion of a metric event and an associated recording function on the store which the events are passed to. Each event has at least a namespaced :type keyword, a :label associated with the store, and a numeric :value.

Events may contain other information like the block id or method name as well, and it is up to the receiver to interpret them.

(ns blocks.meter
  (:require
    [blocks.data :as data]
    [clojure.tools.logging :as log]
    [manifold.deferred :as d]
    [manifold.stream :as s])
  (:import
    java.io.InputStream
    java.util.concurrent.atomic.AtomicLong
    org.apache.commons.io.input.ProxyInputStream))

Utilities

Create a delay expression which will return the number of milliseconds elapsed between its creation and dereference.

(defn- stopwatch
  []
  (let [start (System/nanoTime)]
    (delay (/ (- (System/nanoTime) start) 1e6))))

Format a byte value as a string with the given suffix.

(defn- format-bytes
  [value unit]
  (loop [value value
         prefixes ["" "K" "M" "G"]]
    (if (and (< 1024 value) (seq prefixes))
      (recur (/ value 1024) (next prefixes))
      (if (nat-int? value)
        (format "%d %s%s" value (first prefixes) unit)
        (format "%.1f %s%s" (double value) (first prefixes) unit)))))

Construct a string to label the metered store.

(defn- meter-label
  [store]
  (str (or (::label store) (.getSimpleName (class store)))))

True if the store has metering enabled and a valid recorder.

(defn- enabled?
  [store]
  (boolean (::recorder store)))

Helper to record an event to the metered store if a recording function is present.

(defn- record!
  [store metric-type value attrs]
  (when-let [recorder (::recorder store)]
    (try
      (recorder
        store
        (assoc attrs
               :type metric-type
               :label (meter-label store)
               :value value))
      (catch Exception ex
        (log/warn ex "Failure while recording metric")))))

Stream Metering

Record incremental IO metrics every N seconds.

(def ^:dynamic *io-report-period*
  10)

Wrap the given stream in an intermediate stream which will record metric events with the number of blocks which passed through the stream.

(defn- metering-block-stream
  [store metric-type attrs stream]
  (let [counter (AtomicLong. 0)
        period *io-report-period*
        label (meter-label store)
        out (s/map #(do (.incrementAndGet counter) %) stream)
        reports (s/periodically
                  (* period 1000)
                  #(.getAndSet counter 0))
        flush! (fn flush!
                 [sum]
                 (when (pos? sum)
                   (log/tracef "Metered %s of %d blocks through stream %s (%.2f/sec)"
                               (name metric-type) sum label
                               (double (/ sum period)))
                   (record! store metric-type sum nil)))]
    (s/consume flush! reports)
    (s/on-closed
      stream
      (fn report-final
        []
        (flush! (.getAndSet counter 0))
        (s/close! reports)))
    out))

Wrap the given input stream in a proxy which will record metric events with the given type and number of bytes read.

(defn- metering-input-stream
  [store metric-type block-id ^InputStream input-stream]
  (let [meter (volatile! [(System/nanoTime) 0])]
    (letfn [(flush!
              []
              (let [[last-time sum] @meter
                    elapsed (/ (- (System/nanoTime) last-time) 1e9)
                    label (meter-label store)]
                (when (pos? sum)
                  (log/tracef "Metered %s of %s block %s: %s (%s)"
                              (name metric-type) label block-id
                              (format-bytes sum "B")
                              (format-bytes (/ sum elapsed) "Bps"))
                  (record! store metric-type sum {:block block-id})
                  (vreset! meter [(System/nanoTime) 0]))))]
      (proxy [ProxyInputStream] [input-stream]
        (afterRead
          [n]
          (when (pos? n)
            (let [[last-time sum] (vswap! meter update 1 + n)
                  elapsed (/ (- (System/nanoTime) last-time) 1e9)]
              (when (<= *io-report-period* elapsed)
                (flush!)))))
        (close
          []
          (flush!)
          (.close input-stream))))))

Metered Content

(deftype MeteredContentReader
  [store metric-type block-id content]
  data/ContentReader
  (read-all
    [this]
    (metering-input-stream
      store metric-type block-id
      (data/read-all content)))
  (read-range
    [this start end]
    (metering-input-stream
      store metric-type block-id
      (data/read-range content start end))))
(alter-meta! #'->MeteredContentReader assoc :private true)

Wrap the block with a lazy constructor for a metered input stream which will report metrics for the given type. If the store does not have a recorder, the block will be returned unchanged.

(defn metered-block
  [store metric-type block]
  (when block
    (if (enabled? store)
      (data/wrap-content
        block
        (partial ->MeteredContentReader
                 store
                 metric-type
                 (:id block)))
      block)))

Method Wrappers

Measure the flow of blocks through a manifold stream. Returns the wrapped stream, or the original if the store does not have metering enabled.

(defn measure-stream
  [store method-kw attrs stream]
  (cond->> stream
    (enabled? store)
    (metering-block-stream
      store ::list-stream
      (assoc attrs :method method-kw))))

Measure the end-to-end elapsed time for a block store method. Returns a deferred with a final report hook if the store has metering enabled.

(defn measure-method
  [store method-kw attrs body-deferred]
  (let [elapsed (stopwatch)]
    (cond-> body-deferred
      (enabled? store)
      (d/finally
        (fn record-elapsed
          []
          (log/tracef "Method %s of %s block store on %s took %.1f ms"
                      (name method-kw)
                      (meter-label store)
                      attrs
                      @elapsed)
          (record!
            store ::method-time @elapsed
            (assoc attrs :method method-kw)))))))
 

Buffer stores provide logical block storage which uses two backing stores to implement a buffer. New blocks are written to the buffer store, which can be flushed to write all of the blocks to the primary store. Reads return a unified view of the existing and buffered blocks.

(ns blocks.store.buffer
  (:require
    [blocks.core :as block]
    [blocks.store :as store]
    [blocks.summary :as sum]
    [com.stuartsierra.component :as component]
    [manifold.deferred :as d]
    [manifold.stream :as s]))
(defrecord BufferBlockStore
  [primary buffer predicate]
  component/Lifecycle
  (start
    [this]
    (when-not (satisfies? store/BlockStore primary)
      (throw (IllegalStateException.
               (str "Cannot start buffer block store without a backing primary store: "
                    (pr-str primary)))))
    (when-not (satisfies? store/BlockStore buffer)
      (throw (IllegalStateException.
               (str "Cannot start buffer block store without a backing buffer store: "
                    (pr-str buffer)))))
    this)
  (stop
    [this]
    this)
  store/BlockStore
  (-list
    [this opts]
    (store/merge-blocks
      (block/list buffer opts)
      (block/list primary opts)))
  (-stat
    [this id]
    (store/some-store [buffer primary] block/stat id))
  (-get
    [this id]
    (store/some-store [buffer primary] block/get id))
  (-put!
    [this block]
    (d/chain
      (block/get primary (:id block))
      (fn store-block
        [stored]
        (or stored
            (if (or (nil? predicate) (predicate block))
              (block/put! buffer block)
              (block/put! primary block))))))
  (-delete!
    [this id]
    (d/chain
      (d/zip
        (block/delete! buffer id)
        (block/delete! primary id))
      (fn result
        [[buffered? stored?]]
        (boolean (or buffered? stored?))))))

Remove all blocks from the buffer. Returns a deferred which yields a summary of the deleted blocks.

(defn clear!
  [store]
  (d/chain
    (s/reduce
      sum/update
      (sum/init)
      (block/list (:buffer store)))
    (fn clear-buffer
      [summary]
      (d/chain
        (block/erase! (:buffer store))
        (constantly summary)))))

Flush the store, writing all buffered blocks to the primary store. Returns a deferred which yields a summary of the flushed blocks.

(defn flush!
  [store]
  (->>
    (block/list (:buffer store))
    (s/map (fn copy
             [block]
             (d/chain
               (block/put! (:primary store) block)
               (fn delete
                 [block']
                 (d/chain
                   (block/delete! (:buffer store) (:id block))
                   (constantly block'))))))
    (s/realize-each)
    (s/reduce sum/update (sum/init))))

Constructors

(store/privatize-constructors! BufferBlockStore)

Create a new buffering block store.

  • :buffer Block store to use for new writes.
  • :primary Block store to use for flushed blocks.
  • :predicate (optional) A predicate function which should return false for blocks which should not be buffered; instead, they will be written directly to the primary store.
(defn buffer-block-store
  [& {:as opts}]
  (map->BufferBlockStore opts))
 

Memory stores provide process-local storage backed by a map in a ref. Blocks put into this store will be fully read into memory to ensure the content is present locally. Memory block stores may be constructed usin the mem:- URI form.

This store is most suitable for testing, caches, and other situations which call for an ephemeral block store.

(ns blocks.store.memory
  (:require
    [blocks.data :as data]
    [blocks.store :as store]
    [manifold.deferred :as d]
    [manifold.stream :as s])
  (:import
    blocks.data.Block))

Prepare a new block for storage based on the given block. This ensures the content is loaded into memory and cleans the block metadata.

(defn- load-block
  [^Block block]
  (if (data/byte-content? block)
    (data/create-block
      (:id block)
      (:size block)
      (.content block))
    (data/read-block
      (:algorithm (:id block))
      (data/read-all (.content block)))))

Block records in a memory store are held in a map in a ref.

(defrecord MemoryBlockStore
  [memory]
  store/BlockStore
  (-list
    [this opts]
    (s/->source (or (vals @memory) [])))
  (-stat
    [this id]
    (d/success-deferred
      (when-let [block (get @memory id)]
        {:id (:id block)
         :size (:size block)
         :stored-at (:stored-at block)})))
  (-get
    [this id]
    (d/success-deferred
      (get @memory id)))
  (-put!
    [this block]
    (let [id (:id block)]
      (store/future'
        (dosync
          (if-let [extant (get @memory id)]
            extant
            (let [block (load-block block)]
              (alter memory assoc id block)
              block))))))
  (-delete!
    [this id]
    (store/future'
      (dosync
        (let [existed? (contains? @memory id)]
          (alter memory dissoc id)
          existed?))))
  store/ErasableStore
  (-erase!
    [this]
    (store/future'
      (dosync
        (alter memory empty)
        true))))

Constructors

(store/privatize-constructors! MemoryBlockStore)

Creates a new in-memory block store.

(defn memory-block-store
  [& {:as opts}]
  (map->MemoryBlockStore
    (assoc opts :memory (ref (sorted-map) :validator map?))))
(defmethod store/initialize "mem"
  [location]
  (memory-block-store))
 

File stores provide block storage backed by a local filesystem. Each block is stored in a separate file under the root. File block stores may be constructed using a file://<path-to-root-dir> URI. Both relative and absolute paths are supported.

Under the root directory, the store keeps a block data in a subdirectory alongside some layout metadata and a landing directory:

  $ROOT/meta.properties
  $ROOT/blocks/111497df/35011497df3588b5a3...
  $ROOT/landing/block.123456789.tmp

In many filesystems, performance degrades as the number of files in a directory grows. In order to reduce this impact and make navigating the blocks more efficient, block files are stored in multiple subdirectories consisting of the four byte prefix of the hashes of the blocks stored in them. Within each directory, blocks are stored in files whose names consist of the rest of their id digest.

In addition to the blocks, a meta.properties file at the root holds information about the current storage layout for future-proofing. This currently holds a single layout version property, which is always "v1".

(ns blocks.store.file
  (:require
    [blocks.data :as data]
    [blocks.store :as store]
    [byte-streams :as bytes]
    [clojure.java.io :as io]
    [clojure.string :as str]
    [clojure.tools.logging :as log]
    [com.stuartsierra.component :as component]
    [manifold.deferred :as d]
    [manifold.stream :as s]
    [multiformats.hash :as multihash])
  (:import
    (java.io
      File
      FileInputStream)
    java.time.Instant))

Storage Layout

The current supported storage layout version.

(def layout-version
  "v1")

Metadata

Construct the store-level metadata properties file from the store root.

(defn- meta-file
  ^File
  [root]
  (io/file root "meta.properties"))

Read the store's metadata file if it exists.

(defn- read-meta-properties
  [^File root]
  (let [props-file (meta-file root)]
    (when (.exists props-file)
      (into {}
            (map (juxt (comp keyword key) val))
            (doto (java.util.Properties.)
              (.load (io/reader props-file)))))))

Write a metadata properties file and returns the data map.

(defn- write-meta-properties
  [^File root]
  (let [props-file (meta-file root)
        props (doto (java.util.Properties.)
                (.setProperty "version" layout-version))]
    (.mkdirs root)
    (with-open [out (io/writer props-file)]
      (.store props out " blocks.store.file"))
    {:version layout-version}))

Landing Area

Construct the landing directory from the store root.

(defn- landing-dir
  ^File
  [root]
  (io/file root "landing"))

Create an empty temporary file to land block data into. Marks the resulting file for automatic cleanup if it is not moved.

(defn- landing-file
  ^File
  [^File root]
  (let [tmp-dir (landing-dir root)]
    (.mkdirs tmp-dir)
    (doto (File/createTempFile "block" ".tmp" tmp-dir)
      (.deleteOnExit))))

Block Files

Number of characters to use as a prefix for top-level directory names.

(def ^:private prefix-length
  8)

Construct the block directory from the store root.

(defn- blocks-dir
  ^File
  [root]
  (io/file root "blocks"))

Walks a block directory tree depth first, returning a sequence of files found in lexical order. Intelligently skips subdirectories based on the given marker.

(defn- block-files
  [^File root after]
  (->
    (.listFiles (blocks-dir root))
    (sort)
    (cond->>
      after
      (drop-while
        #(let [subdirname (.getName ^File %)
               len (min (count after) (count subdirname))]
           (pos? (compare (subs after 0 len) (subs subdirname 0 len))))))
    (->>
      (mapcat
        (fn list-blocks
          [^File subdir]
          (sort (.listFiles subdir)))))))

Calculate a map of statistics about a block file.

(defn- file-stats
  [^File file]
  (with-meta
    {:size (.length file)
     :stored-at (Instant/ofEpochMilli (.lastModified file))}
    {::source (.toURI file)}))

Determine the filesystem path for a block of content with the given hash identifier.

(defn- id->file
  ^File
  [^File root id]
  (let [hex (multihash/hex id)
        len (min (dec (count hex)) prefix-length)
        subdir (subs hex 0 len)
        fragment (subs hex len)]
    (io/file (blocks-dir root) subdir fragment)))

Reconstruct the hash identifier represented by the given file path. Returns nil if the file is not a proper block.

(defn- file->id
  [^File root ^File file]
  (let [prefix (str (blocks-dir root))
        path (.getPath file)
        hex (str/replace (subs path (inc (count prefix))) File/separator "")]
    (if (re-matches #"[0-9a-fA-F]+" hex)
      (multihash/parse hex)
      (log/warnf "File %s did not form valid hex entry: %s" file hex))))

Creates a lazy block to read from the given file.

(defn- file->block
  [id ^File file]
  (let [stats (file-stats file)]
    (with-meta
      (data/create-block
        id (:size stats) (:stored-at stats)
        ; OPTIMIZE: use java.io.RandomAccessFile to read subranges
        (fn reader [] (FileInputStream. file)))
      (meta stats))))

Initialization

True if the given directory is a v0 block subdirectory.

(defn- v0-subdir?
  [^File subdir]
  (and (.isDirectory subdir)
       (= prefix-length (count (.getName subdir)))
       (re-matches #"[0-9a-f]+" (.getName subdir))))

Migrate an existing v0 layout to v1.

(defn- migrate-v0!
  [^File root]
  (let [blocks (blocks-dir root)]
    (.mkdirs blocks)
    (run!
      (fn move-block-dir
        [^File subdir]
        (when (v0-subdir? subdir)
          (.renameTo subdir (io/file blocks (.getName subdir)))))
      (.listFiles root))))

Initialize the block store layout by writing out metadata and pre-creating some directories. Returns the layout meta-properties.

(defn- initialize-layout!
  [store]
  (let [^File root (:root store)]
    (if (empty? (.listFiles root))
      ; Root doesn't exist or is empty, so initialize the storage layout.
      (write-meta-properties root)
      ; Try loading store metadata.
      (let [properties (read-meta-properties root)]
        (if (nil? properties)
          ; No meta-properties file; check for v0 layout.
          (do
            ; Check for unknown file content in root.
            (when-not (every? v0-subdir? (.listFiles root))
              (throw (ex-info
                       (str "Detected unknown files in block store at " root)
                       {:files (vec (.listFiles root))})))
            ; Possible v0 store. Abort unless configured to migrate.
            (when-not (:auto-migrate? store)
              (throw (ex-info
                       (str "Detected v0 file block store layout at " root)
                       {:root root})))
            ; Migrate to v1 layout.
            (log/warn "Automatically migrating file block store layout at"
                      (.getPath root) "from v0 ->" layout-version)
            (migrate-v0! root)
            (write-meta-properties root))
          ; Check for known layout version.
          (let [version (:version properties)]
            (when (not= layout-version version)
              (throw (ex-info
                       (str "Unknown storage layout version " (pr-str version)
                            " does not match supported version "
                            (pr-str layout-version))
                       {:supported layout-version
                        :properties properties})))
            ; Layout matches the expected version.
            properties))))))

Recursively removes a directory of files.

(defn- rm-r
  [^File path]
  (when (.isDirectory path)
    (run! rm-r (.listFiles path)))
  (.delete path))

File Store

Block content is stored as files in a multi-level hierarchy under the given root directory.

(defrecord FileBlockStore
  [^File root]
  component/Lifecycle
  (start
    [this]
    (let [properties (initialize-layout! this)
          version (:version properties)]
      ;(log/debug "Using storage layout version" version)
      (assoc this :version version)))
  (stop
    [this]
    this)
  store/BlockStore
  (-list
    [this opts]
    (let [out (s/stream 1000)]
      (store/future'
        (try
          (loop [files (block-files root (:after opts))]
            (when-let [file (first files)]
              (if-let [id (file->id root file)]
                ; Check that the id is still before the marker, if set.
                (when (or (nil? (:before opts))
                          (pos? (compare (:before opts) (multihash/hex id))))
                  ; Process next block.
                  (when @(s/put! out (file->block id file))
                    (recur (next files))))
                ; Not a valid block file, skip.
                (recur (next files)))))
          (catch Exception ex
            (log/error ex "Failure listing file blocks")
            (s/put! out ex))
          (finally
            (s/close! out))))
      (s/source-only out)))
  (-stat
    [this id]
    (store/future'
      (let [file (id->file root id)]
        (when (.exists file)
          (assoc (file-stats file) :id id)))))
  (-get
    [this id]
    (store/future'
      (let [file (id->file root id)]
        (when (.exists file)
          (file->block id file)))))
  (-put!
    [this block]
    (store/future'
      (let [id (:id block)
            file (id->file root id)]
        (when-not (.exists file)
          (let [tmp (landing-file root)]
            (with-open [content (data/content-stream block nil nil)]
              (io/copy content tmp))
            (io/make-parents file)
            (.setWritable tmp false false)
            (.renameTo tmp file)))
        (file->block id file))))
  (-delete!
    [this id]
    (store/future'
      (let [file (id->file root id)]
        (if (.exists file)
          (do (.delete file) true)
          false))))
  store/ErasableStore
  (-erase!
    [this]
    (store/future'
      (rm-r (landing-dir root))
      (rm-r (blocks-dir root))
      true)))

Constructors

(store/privatize-constructors! FileBlockStore)

Creates a new local file-based block store.

(defn file-block-store
  [root & {:as opts}]
  (map->FileBlockStore
    (assoc opts :root (io/file root))))
(defmethod store/initialize "file"
  [location]
  (let [uri (store/parse-uri location)]
    (file-block-store
      (if (:host uri)
        (io/file (:host uri) (subs (:path uri) 1))
        (io/file (:path uri))))))
 

Cache stores provide logical block storage backed by two other stores, a primary store and a cache.

Blocks are added to the cache on reads and writes, and evicted with a least-recently-used strategy to keep the cache under a certain total size. Operations on this store will prefer to look up blocks in the cache, and fall back to the primary store when not available.

Because the caching logic runs locally, the backing cache storage should not be shared among multiple concurrent processes.

(ns blocks.store.cache
  (:require
    [blocks.core :as block]
    [blocks.store :as store]
    [blocks.summary :as sum]
    [clojure.data.priority-map :refer [priority-map]]
    [clojure.tools.logging :as log]
    [com.stuartsierra.component :as component]
    [manifold.deferred :as d])
  (:import
    java.time.Instant))

Computes the state of a cache, including priorities for all stored blocks and the total size of block content stored.

(defn- scan-state
  [store]
  (reduce
    (fn [state block]
      (let [tick (if-let [stored-at (:stored-at block)]
                   (long (/ (.toEpochMilli ^Instant stored-at) 1000))
                   0)]
        (-> state
            (update :priorities assoc (:id block) [tick (:size block)])
            (update :total-size + (:size block))
            (update :tick max tick))))
    {:priorities (priority-map)
     :total-size 0
     :tick 0}
    (block/list-seq store)))

True if the block may be cached in this store.

(defn- cacheable?
  [store block]
  (let [{:keys [size-limit predicate]} store]
    (and (<= (:size block) size-limit)
         (or (nil? predicate) (predicate block)))))

Update the cache state to account for the usage (fetch or store) of a block.

(defn- touch-block
  [state block]
  (let [id (:id block)
        size (:size block)
        priorities (:priorities state)]
    (-> state
        (update :tick inc)
        (update :priorities assoc id [(:tick state) size])
        (cond->
          (not (contains? priorities id))
          (update :total-size + size)))))

Update the cache state to remove a block from it by id.

(defn- remove-block
  [state id]
  (if-let [[tick size] (get-in state [:priorities id])]
    (-> state
        (update :total-size - size)
        (update :priorities dissoc id))
    state))

Given a target amount of space to free and a cache store, deletes blocks from the cache to free up the desired amount of space. Returns a deferred which yields a summary of the deleted entries.

(defn reap!
  [store target-free]
  (let [{:keys [cache state size-limit]} store]
    (d/loop [deleted (sum/init)]
      (let [{:keys [priorities total-size]} @state]
        (if (and (< (- size-limit total-size) target-free)
                 (not (empty? priorities)))
          ; Need to delete the next block.
          (let [[id [tick size]] (peek priorities)]
            (swap! state remove-block id)
            (d/chain
              (block/delete! cache id)
              (fn next-delete
                [deleted?]
                (d/recur (if deleted?
                           (sum/update deleted {:id id, :size size})
                           deleted)))))
          ; Enough free space, or no more blocks to delete.
          deleted)))))

Store a block in the cache and update the internal tracking state.

(defn- cache-block!
  [store block]
  (swap! (:state store) touch-block block)
  (d/chain
    (reap! store (:size block))
    (fn cache-block
      [_]
      (block/put! (:cache store) block))))
(defrecord CachingBlockStore
  [size-limit predicate primary cache state]
  component/Lifecycle
  (start
    [this]
    (when-not (satisfies? store/BlockStore primary)
      (throw (IllegalStateException.
               (str "Cannot start caching block store without a backing primary store: "
                    (pr-str primary)))))
    (when-not (satisfies? store/BlockStore cache)
      (throw (IllegalStateException.
               (str "Cannot start caching block store without a backing cache store: "
                    (pr-str cache)))))
    (when-not @state
      (let [initial-state (scan-state cache)
            cached-bytes (:total-size initial-state)]
        (reset! state initial-state)
        (when (pos? cached-bytes)
          (log/infof "Cache has %d bytes in %d blocks"
                     (:total-size initial-state)
                     (count (:priorities initial-state))))))
    this)
  (stop
    [this]
    this)
  store/BlockStore
  (-list
    [this opts]
    (store/merge-blocks
      (block/list cache opts)
      (block/list primary opts)))
  (-stat
    [this id]
    (store/some-store [cache primary] block/stat id))
  (-get
    [this id]
    (d/chain
      (block/get cache id)
      (fn check-cache
        [block]
        (if block
          (vary-meta block assoc ::cached? true)
          (block/get primary id)))
      (fn recache
        [block]
        (cond
          ; Block not present in cache or primary.
          (nil? block)
          nil
          ; Block is already cached.
          (::cached? (meta block))
          (do (swap! state touch-block block)
              block)
          ; Determine whether to cache the primary block.
          (cacheable? this block)
          (cache-block! this block)
          ; Non cacheable block from the primary store.
          :else block))))
  (-put!
    [this block]
    (d/chain
      (d/zip
        (block/put! primary block)
        (when (cacheable? this block)
          (cache-block! this block)))
      (fn return-preferred
        [[stored cached]]
        (store/preferred-block
          stored
          (when cached
            (vary-meta cached assoc ::cached? true))))))
  (-delete!
    [this id]
    (d/chain
      (d/zip
        (block/delete! primary id)
        (block/delete! cache id))
      (fn result
        [[stored? cached?]]
        (boolean (or stored? cached?)))))
  store/ErasableStore
  (-erase!
    [this]
    (d/chain
      (d/zip
        (block/erase! primary)
        (block/erase! cache))
      (constantly true))))

Constructors

(store/privatize-constructors! CachingBlockStore)

Create a new logical block store which will use one block store to cache up to a certain size of content for another store. This store should have a :primary and a :cache associated with it for backing block storage.

  • :primary Backing store with the primary block data.
  • :cache Store to cache blocks in and prefer for reads.
  • :size-limit Maximum total size of blocks to keep in the cache store.
  • :predicate (optional) A predicate function which should return false for blocks which should not be cached; instead, they will only be written to the primary store.
(defn caching-block-store
  [size-limit & {:as opts}]
  (when-not (pos-int? size-limit)
    (throw (IllegalArgumentException.
             (str "Cache store size-limit must be a positive integer: "
                  (pr-str size-limit)))))
  (map->CachingBlockStore
    (assoc opts
           :size-limit size-limit
           :state (atom nil))))
 

Replica stores provide logical block storage which writes to multiple backing stores. Lookups will try the backing stores in order to find blocks.

Replicas are useful for ensuring durability across stores and for shared caches, where some external process controls cache eviction.

(ns blocks.store.replica
  (:require
    [blocks.core :as block]
    [blocks.store :as store]
    [com.stuartsierra.component :as component]
    [manifold.deferred :as d]))

Resolve the configured replica stores.

(defn- resolve-stores
  ([store]
   (resolve-stores store (:replicas store)))
  ([store replicas]
   (mapv (partial get store) replicas)))
(defrecord ReplicaBlockStore
  [replicas]
  component/Lifecycle
  (start
    [this]
    (when-let [missing (seq (remove (partial contains? this) replicas))]
      (throw (IllegalStateException.
               (str "Replica block store is missing configured keys: "
                    (pr-str missing)))))
    this)
  (stop
    [this]
    this)
  store/BlockStore
  (-list
    [this opts]
    (->> (resolve-stores this)
         (map #(block/list % opts))
         (apply store/merge-blocks)))
  (-stat
    [this id]
    (store/some-store (resolve-stores this) block/stat id))
  (-get
    [this id]
    ; OPTIMIZE: query in parallel, use `d/alt`?
    (store/some-store (resolve-stores this) block/get id))
  (-put!
    [this block]
    (d/chain
      (block/put! (get this (first replicas)) block)
      (fn keep-preferred
        [stored]
        (let [block (store/preferred-block block stored)]
          (d/chain
            (store/zip-stores (resolve-stores this (rest replicas)) block/put! block)
            (partial apply store/preferred-block stored))))))
  (-delete!
    [this id]
    (d/chain
      (store/zip-stores (resolve-stores this) block/delete! id)
      (partial some true?)
      boolean)))

Constructors

(store/privatize-constructors! ReplicaBlockStore)

Creates a new replica block store which will persist blocks to multiple backing stores. Block operations will be performed on the stores in the order given in replicas, where each key is looked up in the store record.

(defn replica-block-store
  [replicas & {:as opts}]
  (map->ReplicaBlockStore
    (assoc opts :replicas (vec replicas))))