does anyone have any experience using redis pubsub from clojure in a "system" sort of way
Jedis has a subscribe method that blocks and keeps taking messages from redis
but because it blocks I need to run it in a seperate thread and my logic for making sure errors aren't fatal feels sloppy
and also my "shutdown" function doesn't really work
;; ----------------------------------------------------------------------------
(def post-notification-channel "ABC")
;; ----------------------------------------------------------------------------
(defn create-pub-sub-listener
"Creates a listener for pub-sub messages. Borrows a connection
from the pool and doesn't return it until it is shut down."
[pool]
(let [client ^Jedis (.getResource pool)
logic (proxy [JedisPubSub] []
(onMessage [channel message]
(log/info :channel channel :message message)))
executor ^ExecutorService (Executors/newSingleThreadExecutor)]
(.submit
executor
(reify Runnable
(run [_]
(while true
(try
(.subscribe client
logic
^{:tag "[Ljava.lang.String;"}
(into-array String [post-notification-channel]))
(catch InterruptedException e
(throw e))
(catch Exception e
(log/error ::jedis-pub-sub-error e
::action-to-take "retry subscription")))))))
{:borrowed-client client
:pub-sub logic
:executor executor}))
;; ----------------------------------------------------------------------------
(defn shutdown-pub-sub-listener!
[pub-sub-listener]
(let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
(.shutdownNow executor)
(.unsubscribe pub-sub (into-array String [post-notification-channel]))
(.close borrowed-client)))
after shutdown if i send a message to the channel i get
:cause "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
:via
[{:type redis.clients.jedis.exceptions.JedisDataException
:message "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
:at [redis.clients.jedis.Protocol processError "Protocol.java" 132]}]
:trace ...
which means its still consuming somehow
right now I am trying to structure my app in a "system, but doing it all manually" way
so making sure i can start and stop the mechanism is important
the jedis connection is not multi-thread safe
you are trying to share the connection that the subscribe is getting messages on with your other code doing redis stuff
that error is what happens if you subscribe on a redis connection, and then on the same connection try to do something other than the listed operations
jedis comes with some connection pool stuff which might help you here, but I ended up writing our own because of the need to pull out connections for subscriptions like this
i thought that if i used .getResource
and just didn't return the connection to the pool that it would be fine
since no other thread is accessing the connection
except during the shutdown
ah, I didn't realize you are using the pool
though i guess it makes sense
i take a connection from the pool, and a thread is doing its busy wait loop on it
and i try to call .close, but the connection is in an invalid state
but that doesn't explain why messages are still being consumed when i shutdown the executor
shutdown an executor doesn't stop it
or I should say, it doesn't stop already running tasks
it stops it from starting new ones
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();
the docs for shutdown now says it tries to
This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
awaitTermination just waits though, it doesn't do anything to the tasks
so me physically waiting should be enough
you are not waiting
it maybe that calling .close on the connection is what is getting you that error message
(defn shutdown-pub-sub-listener!
[pub-sub-listener]
(let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
(.shutdownNow executor)
(.awaitTermination executor 2 TimeUnit/MINUTES)
(.unsubscribe pub-sub (into-array String [post-notification-channel]))
(.close borrowed-client)))
adding await terminate just makes it hang
trying without a .close
right
because your task is not exiting
without the .close
it is still recieving messages
okay so then the question is, how to kill the task
I can't add a check to Thread.isInterrupted
anywhere obvious
actually, looking at our connection pool what I do to kill the pubsub is just close the connection
and ignore any errors