Can you please help me to understand how I can send event with attached avro schema? I do the following:
(def test-schema (slurp (io/resource "resources/test.avsc"))
(def my-producer
(jc/producer
{"bootstrap.servers" "localhost:9092"
"key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" "io.confluent.kafka.serializers.KafkaAvroSerializer"
"schema.registry.url" "<http://localhost:8081>"}))
(def value-serde
(serdes.avro.confluent/serde
"<http://localhost:8081>"
schema-v1
false))
(with-open [producer' my-producer]
@(jc/produce! producer'
{:topic-name "my_topic"}
"2"
{:firstName "Hello"
:lastName "World"}))
But it doesn't work. I need to create one producer. And send events with specified avro schema to topics
so, my question is, how I can implement this code using jackdaw
private fun createProducer(brokers: String, schemaRegistryUrl: String): Producer<String, GenericRecord> {
val props = Properties()
props["bootstrap.servers"] = brokers
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = KafkaAvroSerializer::class.java
props["schema.registry.url"] = schemaRegistryUrl
return KafkaProducer<String, GenericRecord>(props)
}
val schema = Schema.Parser().parse(File("src/main/resources/person.avsc"))
val avroPerson = GenericRecordBuilder(schema).apply {
set("firstName", fakePerson.firstName)
set("lastName", fakePerson.lastName)
set("birthDate", fakePerson.birthDate.time)
}.build()
val futureResult = producer.send(ProducerRecord(personsAvroTopic, avroPerson))
or if you know better way, please let me know
Ok, I've completed it, but using Java interop
(:require
[<http://clojure.java.io|clojure.java.io> :as io]
[jackdaw.client :as jc])
(:import
[org.apache.avro Schema$Parser]
[org.apache.avro.generic GenericData$Record])
(def test-schema
(.parse (Schema$Parser.)
(slurp (io/resource "resources/test.avsc"))))
(def rec (GenericData$Record. test-schema))
(.put rec "firstName" "Hello")
(.put rec "lastName" "World")
(def p
(jc/producer {"bootstrap.servers" "localhost:9092"
:key.serializer "org.apache.kafka.common.serialization.StringSerializer"
:value.serializer "io.confluent.kafka.serializers.KafkaAvroSerializer"
:schema.registry.url "<http://localhost:8081>"}))
(with-open [producer' p]
@(jc/produce! producer'
{:topic-name "messages_by_sources"}
"1"
rec))
How I can do it using Jackdaw?