Skip to content

Commit

Permalink
Merge pull request #5 from avisi/metadata
Browse files Browse the repository at this point in the history
Add metadata to the analysis map
  • Loading branch information
keerts authored Feb 11, 2020
2 parents 662afb5 + 6d90fef commit e8b6b44
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 49 deletions.
7 changes: 4 additions & 3 deletions build.boot
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

(set-env! :resource-paths #{"resources" "src"}
:source-paths #{"test"}
:dependencies '[[org.clojure/clojure "1.9.0-alpha11"]
:dependencies '[[org.clojure/clojure "1.9.0-alpha13"]
[clj-time "0.12.2"]
[pandect "0.6.0"]
[com.amazonaws/aws-java-sdk-s3 "1.10.61"]
[amazonica "0.3.76" :exclusions [org.clojure/clojure]]
[com.amazonaws/aws-java-sdk-s3 "1.11.50"]
[amazonica "0.3.77" :exclusions [org.clojure/clojure]]
[org.clojure/test.check "0.9.0" :scope "test"]
[org.clojure/tools.logging "0.3.1"]
[adzerk/boot-test "RELEASE" :scope "test"]])
Expand Down
20 changes: 10 additions & 10 deletions src/avisi/rsync/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[avisi.rsync.local-dir :as dir]
[avisi.rsync.s3-bucket :as s3]
[amazonica.aws.s3 :as aws-s3]
[clojure.tools.logging :as log]
[clojure.data :as data]
[clojure.string :as str]))

Expand All @@ -20,13 +21,14 @@

(defn left-to-right
[paths left-location right-location]
(doseq [{path :path} paths]
(doseq [path paths]
(log/debug "copying file" path)
(with-open [input-stream (l/read left-location path)]
(l/write right-location path input-stream))))

(defn delete
[paths location]
(doseq [{path :path} paths]
(doseq [path paths]
(l/delete location path)))

(defn dry-run?
Expand All @@ -38,25 +40,23 @@
[from-url to-url options]
(let [from-location (location from-url)
to-location (location to-url)
_ (log/debug "analysing from location")
from-set (l/analyse from-location)
_ (log/debug "analysing to location")
to-set (l/analyse to-location)
_ (log/debug "diffing results")
diff (data/diff from-set to-set)
to-be-deleted (filter #(not (contains-path? (:path %) (first diff))) (second diff))
to-be-copied (filter #(not (contains-path? (:path %) (second diff))) (first diff))
to-be-updated (filter #(contains-path? (:path %) (second diff)) (first diff))]
(if (not (dry-run? options))
(do
(log/debug "copying new files")
(left-to-right to-be-copied from-location to-location)
(log/debug "updating existing files")
(left-to-right to-be-updated from-location to-location)
(log/debug "deleting redundant files")
(delete to-be-deleted to-location)))
{:deleted to-be-deleted
:copied to-be-copied
:updated to-be-updated}))

(comment
(aws-s3/get-object :bucket-name "dev-rfj-files" :key "/test-logo")
(clojure.spec.test/instrument)
(sync! "s3://dev-rfj-files" "file:///tmp/ff1" {:dry-run false})
(str/join "/" (filter #(not (nil? %)) ["koud" "whatever"]))
()
(str/join "/" [nil "whatever"]))
22 changes: 17 additions & 5 deletions src/avisi/rsync/local_dir.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [avisi.rsync.location :as l]
[clojure.string :as str]
[clojure.java.io :as io]
[clojure.tools.logging :as log]
[pandect.algo.md5 :as md5])
(:import [java.io File]
[java.util.regex Pattern]))
Expand Down Expand Up @@ -38,13 +39,20 @@

(defn read
[file-path]
(log/debug "reading from" file-path)
(io/input-stream (io/file file-path)))

(defn write
[file-path input-stream]
(log/debug "writing to" file-path)
(io/make-parents file-path)
(with-open [output-stream (io/output-stream (io/file file-path))]
(io/copy input-stream output-stream)))

(defn delete
[file-path]
(io/delete-file file-path))

;; Private Helper Functions

(defn- root-path-regex [root]
Expand All @@ -59,17 +67,21 @@
(defn- path->file-details [root-path file]
(let [absolute-path (.getAbsolutePath file)
rel-path (relative-path root-path absolute-path)
md5 (md5/md5-file absolute-path)]
{:path rel-path :md5 md5}))
md5 (md5/md5-file absolute-path)
last-modified (.lastModified file)
size (.length file)]
{:path rel-path :md5 md5 :meta {:size size}}))

(defrecord DirectoryLocation [directory]
l/Location
(analyse [this]
(analyse-local-directory directory))
(write [this path stream]
(write [this {:keys [path]} stream]
(write (str directory "/" path) stream))
(read [this path]
(read (str directory "/" path))))
(read [this {:keys [path]}]
(read (str directory "/" path)))
(delete [this {:keys [path]}]
(delete (str directory "/" path))))

(defn new-directory-location
[directory]
Expand Down
6 changes: 3 additions & 3 deletions src/avisi/rsync/location.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(defprotocol Location
"A location holding files that can be pulled and pushed"
(analyse [this])
(delete [this path])
(read [this path])
(write [this path stream]))
(delete [this file])
(read [this file])
(write [this file stream]))

9 changes: 8 additions & 1 deletion src/avisi/rsync/location_spec.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns avisi.rsync.location-spec
(:require [clojure.spec :as s]
[clj-time.core :as t]
[avisi.rsync.location :as l]))

(def md5-regex #"^[a-fA-F0-9]{32}$")
Expand All @@ -14,6 +15,12 @@

(s/def ::md5 (s/and string? #(re-matches md5-regex %)))

(s/def ::analysis-entry (s/keys :req-un [::path ::md5]))
(s/def ::size (s/and number? pos?))

(s/def ::timestamp (inst? org.joda.time.DateTime))

(s/def ::meta (s/keys :opt-un [::size ::timestamp]))

(s/def ::analysis-entry (s/keys :req-un [::path ::md5 ::meta]))

(s/def ::analysis (s/coll-of ::analysis-entry :kind set?))
55 changes: 34 additions & 21 deletions src/avisi/rsync/s3_bucket.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@
(defn analyse-s3-bucket
"Pass a request map with :bucket-name and possibly a :prefix"
[request]
(let [paths (->> (list-all-objects request)
(filter #(not (str/ends-with? (:key %) "/"))) ;; filter folders
(map #(select-keys % [:key :etag]))
(map #(clojure.set/rename-keys % {:key :path :etag :md5})))]

;; after listing the objects, reduce and restructure the information to conform to the spec
(let [paths (into []
(comp (filter #(not (str/ends-with? (:key %) "/")))
(map #(select-keys % [:key :etag :size]))
(map #(clojure.set/rename-keys % {:key :path :etag :md5 :last-modified :timestamp}))
(map #(reduce-kv (fn [m k v]
(if (contains? #{:path :md5} k)
(assoc m k v)
(assoc-in m [:meta k] v)))
{} %)))
(list-all-objects request))]
(if (str/blank? (:prefix request))
(set paths)
(set (map #(assoc-in % [:path] (subs (:path %) (inc (count (:prefix request))))) paths)))))
Expand All @@ -33,39 +41,44 @@
(s3/delete-object bucket key))

(defn write
[bucket key stream]
(s3/put-object :bucket-name bucket :key key :input-stream stream))
[bucket key stream meta-data]
(log/debug "writing to key" key "meta data supplied" meta-data)
(let [s3-meta-data (-> meta-data
(select-keys [:size])
(clojure.set/rename-keys {:size :content-length}))]
(log/debug "writing to key" key "with meta data" s3-meta-data)
(s3/put-object :bucket-name bucket :key key :input-stream stream :meta-data s3-meta-data)))

(defn read
[bucket key]
(:input-stream (s3/get-object :bucket-name bucket :key key)))

(defn s3-url->bucket-name
[url]
[url]
(as-> (second (str/split url #"://")) v
(first (str/split v #"/"))))
(first (str/split v #"/"))))

(defn s3-url->key
[url]
[url]
(as-> (second (str/split url #"://")) v
(rest (str/split v #"/"))
(str/join "/" v)))
(rest (str/split v #"/"))
(str/join "/" v)))

(defn prefix&path->key
[prefix path]
(str/join "/" (remove empty? [prefix path])))

(defrecord S3Location [bucket prefix]
l/Location
(analyse [this]
(analyse-s3-bucket {:bucket-name bucket :prefix prefix}))
(delete [this path]
(delete bucket (prefix&path->key prefix path)))
(read [this path]
(log/info "attempting to read path" path "prefix is" prefix)
(read bucket (prefix&path->key prefix path)))
(write [this path stream]
(write bucket (prefix&path->key prefix path) stream)))
l/Location
(analyse [this]
(analyse-s3-bucket {:bucket-name bucket :prefix prefix}))
(delete [this file]
(delete bucket (prefix&path->key prefix (:path file))))
(read [this file]
(log/debug "attempting to read file" (:path file) "prefix is" prefix)
(read bucket (prefix&path->key prefix (:path file))))
(write [this file stream]
(write bucket (prefix&path->key prefix (:path file)) stream (:meta file))))

(defn new-s3-location
[bucket prefix]
Expand Down
16 changes: 10 additions & 6 deletions test/avisi/rsync/local_dir_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
(deftest analyse
(testing "analyse"
(let [file (java.nio.file.Files/createTempDirectory "test" (into-array java.nio.file.attribute.FileAttribute []))
location (dir/new-directory-location (.toString file))]
(l/write location "test" (io/input-stream (.getBytes "text")))
location (dir/new-directory-location (.toString file))
test-file {:path "test"}
test-content "text"]
(l/write location test-file (io/input-stream (.getBytes test-content)))
(let [analysis (l/analyse location)]
(is (s/valid? ::location-spec/analysis analysis)))
(is (= "text" (with-open [input-stream (l/read location "test")] (slurp input-stream)))))))
(is (= test-content (with-open [input-stream (l/read location test-file)] (slurp input-stream)))))))

(deftest read-write
(testing "write / read back"
(let [file (java.nio.file.Files/createTempDirectory "test" (into-array java.nio.file.attribute.FileAttribute []))
location (dir/new-directory-location (.toString file))]
(l/write location "test" (io/input-stream (.getBytes "text")))
(is (= "text" (with-open [input-stream (l/read location "test")] (slurp input-stream)))))))
location (dir/new-directory-location (.toString file))
test-file {:path "test"}
test-content "text"]
(l/write location test-file (io/input-stream (.getBytes test-content)))
(is (= test-content (with-open [input-stream (l/read location test-file)] (slurp input-stream)))))))
1 change: 1 addition & 0 deletions test/avisi/rsync/s3_bucket_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns avisi.rsync.s3-bucket-test
(:require [avisi.rsync.s3-bucket :as sut]
[avisi.rsync.s3-bucket-spec :as sut-spec]
[clojure.spec.test :as stest]
[clojure.test :refer :all]))

Expand Down

0 comments on commit e8b6b44

Please sign in to comment.