off-topic

https://github.com/clojurians/community-development/blob/master/Code-of-Conduct.md Clojurians Slack Community Code of Conduct. Searchable message archives are at https://clojurians-log.clojureverse.org/
emccue 2020-10-09T23:01:43.201500Z

does anyone have any experience using redis pubsub from clojure in a "system" sort of way

emccue 2020-10-09T23:02:12.202100Z

Jedis has a subscribe method that blocks and keeps taking messages from redis

emccue 2020-10-09T23:02:42.202800Z

but because it blocks I need to run it in a seperate thread and my logic for making sure errors aren't fatal feels sloppy

emccue 2020-10-09T23:02:56.203200Z

and also my "shutdown" function doesn't really work

emccue 2020-10-09T23:03:20.203500Z

;; ----------------------------------------------------------------------------
(def post-notification-channel "ABC")

;; ----------------------------------------------------------------------------
(defn create-pub-sub-listener
  "Creates a listener for pub-sub messages. Borrows a connection
  from the pool and doesn't return it until it is shut down."
  [pool]
  (let [client ^Jedis (.getResource pool)
        logic (proxy [JedisPubSub] []
                (onMessage [channel message]
                  (log/info :channel channel :message message)))
        executor ^ExecutorService (Executors/newSingleThreadExecutor)]
    (.submit
      executor
      (reify Runnable
        (run [_]
          (while true
            (try
              (.subscribe client
                          logic
                          ^{:tag "[Ljava.lang.String;"}
                          (into-array String [post-notification-channel]))
              (catch InterruptedException e
                (throw e))
              (catch Exception e
                (log/error ::jedis-pub-sub-error e
                           ::action-to-take "retry subscription")))))))
    {:borrowed-client client
     :pub-sub logic
     :executor executor}))

;; ----------------------------------------------------------------------------
(defn shutdown-pub-sub-listener!
  [pub-sub-listener]
  (let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
    (.shutdownNow executor)
    (.unsubscribe pub-sub (into-array String [post-notification-channel]))
    (.close borrowed-client)))

emccue 2020-10-09T23:03:55.203900Z

after shutdown if i send a message to the channel i get

emccue 2020-10-09T23:03:59.204300Z

:cause "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
 :via
 [{:type redis.clients.jedis.exceptions.JedisDataException
   :message "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
   :at [redis.clients.jedis.Protocol processError "Protocol.java" 132]}]
 :trace ...

emccue 2020-10-09T23:04:14.204700Z

which means its still consuming somehow

emccue 2020-10-09T23:05:44.205100Z

right now I am trying to structure my app in a "system, but doing it all manually" way

emccue 2020-10-09T23:06:03.205600Z

so making sure i can start and stop the mechanism is important

2020-10-09T23:41:41.206200Z

the jedis connection is not multi-thread safe

2020-10-09T23:42:09.206800Z

you are trying to share the connection that the subscribe is getting messages on with your other code doing redis stuff

2020-10-09T23:43:00.207400Z

that error is what happens if you subscribe on a redis connection, and then on the same connection try to do something other than the listed operations

2020-10-09T23:43:56.208300Z

jedis comes with some connection pool stuff which might help you here, but I ended up writing our own because of the need to pull out connections for subscriptions like this

emccue 2020-10-09T23:47:02.208800Z

i thought that if i used .getResource and just didn't return the connection to the pool that it would be fine

emccue 2020-10-09T23:47:26.209300Z

since no other thread is accessing the connection

emccue 2020-10-09T23:47:35.209600Z

except during the shutdown

2020-10-09T23:47:47.210100Z

ah, I didn't realize you are using the pool

emccue 2020-10-09T23:48:25.210600Z

though i guess it makes sense

emccue 2020-10-09T23:48:43.211200Z

i take a connection from the pool, and a thread is doing its busy wait loop on it

emccue 2020-10-09T23:49:00.211800Z

and i try to call .close, but the connection is in an invalid state

emccue 2020-10-09T23:49:27.212900Z

but that doesn't explain why messages are still being consumed when i shutdown the executor

2020-10-09T23:50:39.213300Z

shutdown an executor doesn't stop it

2020-10-09T23:50:55.213700Z

or I should say, it doesn't stop already running tasks

2020-10-09T23:51:05.214Z

it stops it from starting new ones

emccue 2020-10-09T23:51:14.214200Z

/**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

emccue 2020-10-09T23:51:27.214500Z

the docs for shutdown now says it tries to

2020-10-09T23:51:43.214800Z

This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.

emccue 2020-10-09T23:52:12.215400Z

awaitTermination just waits though, it doesn't do anything to the tasks

emccue 2020-10-09T23:52:18.215800Z

so me physically waiting should be enough

2020-10-09T23:53:20.216Z

you are not waiting

2020-10-09T23:54:29.216500Z

it maybe that calling .close on the connection is what is getting you that error message

emccue 2020-10-09T23:55:33.217200Z

(defn shutdown-pub-sub-listener!
  [pub-sub-listener]
  (let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
    (.shutdownNow executor)
    (.awaitTermination executor 2 TimeUnit/MINUTES)
    (.unsubscribe pub-sub (into-array String [post-notification-channel]))
    (.close borrowed-client)))

emccue 2020-10-09T23:55:40.217500Z

adding await terminate just makes it hang

emccue 2020-10-09T23:55:51.217800Z

trying without a .close

2020-10-09T23:57:22.218100Z

right

2020-10-09T23:57:34.218600Z

because your task is not exiting

emccue 2020-10-09T23:57:37.218700Z

without the .close it is still recieving messages

emccue 2020-10-09T23:58:07.219200Z

okay so then the question is, how to kill the task

emccue 2020-10-09T23:58:25.219900Z

I can't add a check to Thread.isInterrupted anywhere obvious

2020-10-09T23:58:36.220200Z

actually, looking at our connection pool what I do to kill the pubsub is just close the connection

2020-10-09T23:58:43.220500Z

and ignore any errors