Hi everyone,
i have tried the based example of simple topology but i didn't get any results printed
(ns hello.text
(:require [jackdaw.streams :as js]
[jackdaw.client :as jc]
[jackdaw.client.log :as jcl]
[jackdaw.admin :as ja]
[jackdaw.serdes :refer [string-serde]]
[jackdaw.serdes.json :refer [serde]]
[jackdaw.serdes.resolver :as resolver]))
(def kafka-config
{"application.id" "hello
"bootstrap.servers" kafka-url
"default.key.serde" "jackdaw.serdes.EdnSerde"
"default.value.serde" "jackdaw.serdes.EdnSerde"
"cache.max.bytes.buffering" "0"})
;; Serdes tell Kafka how to serialize/deserialize messages
;; We'll just keep them as JSON
(def serdes
{:key-serde (string-serde)
:value-serde (serde)})
;; Each topic needs a config. The important part to note is the :topic-name key.
(def source-topic
(merge {:topic-name "source"
:partition-count 1
:replication-factor 1
:topic-config {}}
serdes))
(def dest-topic
(merge {:topic-name "dest"
:partition-count 1
:replication-factor 1
:topic-config {}}
serdes))
(defn simple-topology [builder]
(-> (js/kstream builder source-topic)
(js/for-each! (fn [[key msg]]
(prn msg)
))
))
(defn view-messages [topic]
"View the messages on the given topic"
(with-open [consumer (jc/subscribed-consumer
(assoc kafka-config "group.id" (str (java.util.UUID/randomUUID)))
[topic])]
(jc/seek-to-beginning-eager consumer)
(->> (jcl/log-until-inactivity consumer 100)
(map :value)
doall)))
(defn start! []
"Starts the simple topology"
(let [builder (js/streams-builder)]
(simple-topology builder)
(doto (js/kafka-streams builder kafka-config)
(js/start))))
(defn stop! [kafka-streams-app]
"Stops the given KafkaStreams application"
(js/close kafka-streams-app))
(def hello (start!))
i'm executing that in cider emacs
but i don't get any message printed
is there something i did wrong ?
also
if i run that separately
(def builder (js/streams-builder))
(simple-topology builder) ;; that gives nil, is it impure function which start something in the background ?
topologies start threads
I don't see anything in your example that writes messages the topology would consume?