jackdaw

https://github.com/FundingCircle/jackdaw
abdullahibra 2020-04-30T14:02:24.019600Z

Hi everyone,

abdullahibra 2020-04-30T14:05:18.021500Z

i have tried the based example of simple topology but i didn't get any results printed

(ns hello.text
  (:require [jackdaw.streams :as js]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jcl]
            [jackdaw.admin :as ja]
            [jackdaw.serdes :refer [string-serde]]
            [jackdaw.serdes.json :refer [serde]]
            [jackdaw.serdes.resolver :as resolver]))

(def kafka-config
  {"application.id" "hello
   "bootstrap.servers" kafka-url
   "default.key.serde" "jackdaw.serdes.EdnSerde"
   "default.value.serde" "jackdaw.serdes.EdnSerde"
   "cache.max.bytes.buffering" "0"})

;; Serdes tell Kafka how to serialize/deserialize messages
;; We'll just keep them as JSON
(def serdes
  {:key-serde (string-serde)
   :value-serde (serde)})

;; Each topic needs a config. The important part to note is the :topic-name key.
(def source-topic
  (merge {:topic-name "source"
          :partition-count 1
          :replication-factor 1
          :topic-config {}}
         serdes))

(def dest-topic
  (merge {:topic-name "dest"
          :partition-count 1
          :replication-factor 1
          :topic-config {}}
         serdes))

(defn simple-topology [builder]
  (-> (js/kstream builder source-topic)
      (js/for-each! (fn [[key msg]] 
                      (prn msg)
                      ))
      ))

(defn view-messages [topic]
  "View the messages on the given topic"
  (with-open [consumer (jc/subscribed-consumer 
                        (assoc kafka-config "group.id" (str (java.util.UUID/randomUUID)))
                        [topic])]
    (jc/seek-to-beginning-eager consumer)
    (->> (jcl/log-until-inactivity consumer 100)
         (map :value)
         doall)))

(defn start! []
  "Starts the simple topology"
  (let [builder (js/streams-builder)]
    (simple-topology builder)
    (doto (js/kafka-streams builder kafka-config)
      (js/start))))

(defn stop! [kafka-streams-app]
  "Stops the given KafkaStreams application"
  (js/close kafka-streams-app))

(def hello (start!))

abdullahibra 2020-04-30T14:07:06.022Z

i'm executing that in cider emacs

abdullahibra 2020-04-30T14:07:21.022300Z

but i don't get any message printed

abdullahibra 2020-04-30T14:07:26.022600Z

is there something i did wrong ?

abdullahibra 2020-04-30T14:08:17.022800Z

also

abdullahibra 2020-04-30T14:09:00.023700Z

if i run that separately

(def builder (js/streams-builder))
(simple-topology builder) ;; that gives nil, is it impure function which start something in the background ?

2020-04-30T14:23:23.024Z

topologies start threads

2020-04-30T14:23:40.024500Z

I don't see anything in your example that writes messages the topology would consume?