hey guys, need some advice / code review the code will communicate with sqs ( I’m using cognitect aws ), fetch messages and dispatch to workers ( responsible to do the job )
aws SQS <----------- fetch -------------> workers
(ns accounting.queue
(:require [mount.core :refer [defstate]]
[accounting.config :refer [config]]
[cognitect.aws.client.api :as aws]
[cognitect.aws.credentials :as credentials]
[cheshire.core :as json]
[clojure.tools.logging :as log]
[clj-time.core :as time]))
(defprotocol QueueProtocol
(messages [queue])
(delete [queue receipt])
(run [queue])
(stop [queue]))
(defn build-client [{:keys [aws_region aws_key aws_secret_key]}]
(aws/client {:api :sqs
:region aws_region
:credentials-provider (credentials/basic-credentials-provider
{:access-key-id aws_key
:secret-access-key aws_secret_key})}))
(defn find-queue [client name]
(:QueueUrl (aws/invoke client {:op :GetQueueUrl
:request {:QueueName name}})))
(defn create-queue [client name]
(aws/invoke client {:op :CreateQueue
:request {:QueueName name}}))
(defn init-queue [client queue-name]
(or (find-queue client queue-name)
(create-queue client queue-name)))
(defn delete* [{client :client queue-url :endpoint} receipt]
(aws/invoke client {:op :DeleteMessage
:request {:QueueUrl queue-url
:ReceiptHandle receipt}}))
(defn messages* [{client :client queue-url :endpoint}]
(aws/invoke client {:op :ReceiveMessage
:request {:QueueUrl queue-url}}))
(defmulti worker (fn [{type :type}] (keyword type)))
(defmethod worker :invoice-created [message]
(generate-invoice message))
(defmethod worker :default [envelope]
(log/info "envelope received"))
(defn- handle [queue messages]
(doseq [{receipt :ReceiptHandle body :Body } messages]
(try
(worker (json/parse-string body true))
(delete queue receipt)
(catch Exception e
(log/error (ex-message e))))))
(defn run* [queue]
(future
(loop []
(let [messages (messages queue)]
(doseq [message (:Messages messages)] (handle queue message)))
(recur))))
(defrecord Queue [client endpoint runner]
QueueProtocol
(messages [queue] (messages* queue))
(delete [queue receipt] (delete* queue receipt))
(run [queue] (assoc queue runner (run* queue)))
(stop [_] (when (and runner (not (future-done? runner))
(future-cancel runner)))))
(defn build-queues [{queues :aws_sqs_queues :as cfg}]
(let [client (build-client cfg)]
(mapv #(run (map->Queue {:client client :endpoint (init-queue client %)})) queues)))
(defstate queue
:start {:queues (build-queues config)}
:stop (doseq [q (:queues queue)] (stop q)))
don’t know better way to dispatch messages to worker, my solution was using defmethod/defmulti based on the message type
is that a good usage of defmethod /defmulti ?