off-topic Clojurians Slack Community Code of Conduct. Searchable message archives are at
emccue 2020-10-09T23:01:43.201500Z

does anyone have any experience using redis pubsub from clojure in a "system" sort of way

emccue 2020-10-09T23:02:12.202100Z

Jedis has a subscribe method that blocks and keeps taking messages from redis

emccue 2020-10-09T23:02:42.202800Z

but because it blocks I need to run it in a seperate thread and my logic for making sure errors aren't fatal feels sloppy

emccue 2020-10-09T23:02:56.203200Z

and also my "shutdown" function doesn't really work

emccue 2020-10-09T23:03:20.203500Z

;; ----------------------------------------------------------------------------
(def post-notification-channel "ABC")

;; ----------------------------------------------------------------------------
(defn create-pub-sub-listener
  "Creates a listener for pub-sub messages. Borrows a connection
  from the pool and doesn't return it until it is shut down."
  (let [client ^Jedis (.getResource pool)
        logic (proxy [JedisPubSub] []
                (onMessage [channel message]
                  (log/info :channel channel :message message)))
        executor ^ExecutorService (Executors/newSingleThreadExecutor)]
      (reify Runnable
        (run [_]
          (while true
              (.subscribe client
                          ^{:tag "[Ljava.lang.String;"}
                          (into-array String [post-notification-channel]))
              (catch InterruptedException e
                (throw e))
              (catch Exception e
                (log/error ::jedis-pub-sub-error e
                           ::action-to-take "retry subscription")))))))
    {:borrowed-client client
     :pub-sub logic
     :executor executor}))

;; ----------------------------------------------------------------------------
(defn shutdown-pub-sub-listener!
  (let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
    (.shutdownNow executor)
    (.unsubscribe pub-sub (into-array String [post-notification-channel]))
    (.close borrowed-client)))

emccue 2020-10-09T23:03:55.203900Z

after shutdown if i send a message to the channel i get

emccue 2020-10-09T23:03:59.204300Z

:cause "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
 [{:type redis.clients.jedis.exceptions.JedisDataException
   :message "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
   :at [redis.clients.jedis.Protocol processError "" 132]}]
 :trace ...

emccue 2020-10-09T23:04:14.204700Z

which means its still consuming somehow

emccue 2020-10-09T23:05:44.205100Z

right now I am trying to structure my app in a "system, but doing it all manually" way

emccue 2020-10-09T23:06:03.205600Z

so making sure i can start and stop the mechanism is important


the jedis connection is not multi-thread safe


you are trying to share the connection that the subscribe is getting messages on with your other code doing redis stuff


that error is what happens if you subscribe on a redis connection, and then on the same connection try to do something other than the listed operations


jedis comes with some connection pool stuff which might help you here, but I ended up writing our own because of the need to pull out connections for subscriptions like this

emccue 2020-10-09T23:47:02.208800Z

i thought that if i used .getResource and just didn't return the connection to the pool that it would be fine

emccue 2020-10-09T23:47:26.209300Z

since no other thread is accessing the connection

emccue 2020-10-09T23:47:35.209600Z

except during the shutdown


ah, I didn't realize you are using the pool

emccue 2020-10-09T23:48:25.210600Z

though i guess it makes sense

emccue 2020-10-09T23:48:43.211200Z

i take a connection from the pool, and a thread is doing its busy wait loop on it

emccue 2020-10-09T23:49:00.211800Z

and i try to call .close, but the connection is in an invalid state

emccue 2020-10-09T23:49:27.212900Z

but that doesn't explain why messages are still being consumed when i shutdown the executor


shutdown an executor doesn't stop it


or I should say, it doesn't stop already running tasks


it stops it from starting new ones

emccue 2020-10-09T23:51:14.214200Z

     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
    List<Runnable> shutdownNow();

emccue 2020-10-09T23:51:27.214500Z

the docs for shutdown now says it tries to


This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.

emccue 2020-10-09T23:52:12.215400Z

awaitTermination just waits though, it doesn't do anything to the tasks

emccue 2020-10-09T23:52:18.215800Z

so me physically waiting should be enough


you are not waiting


it maybe that calling .close on the connection is what is getting you that error message

emccue 2020-10-09T23:55:33.217200Z

(defn shutdown-pub-sub-listener!
  (let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
    (.shutdownNow executor)
    (.awaitTermination executor 2 TimeUnit/MINUTES)
    (.unsubscribe pub-sub (into-array String [post-notification-channel]))
    (.close borrowed-client)))

emccue 2020-10-09T23:55:40.217500Z

adding await terminate just makes it hang

emccue 2020-10-09T23:55:51.217800Z

trying without a .close




because your task is not exiting

emccue 2020-10-09T23:57:37.218700Z

without the .close it is still recieving messages

emccue 2020-10-09T23:58:07.219200Z

okay so then the question is, how to kill the task

emccue 2020-10-09T23:58:25.219900Z

I can't add a check to Thread.isInterrupted anywhere obvious


actually, looking at our connection pool what I do to kill the pubsub is just close the connection


and ignore any errors