@dcj where would the metadata come from?
you would write it explicitly in Kafka?
I might have a related question, are the ConsumerWrappers accessible from jackdaw? I want to log committed offsets
Interesting question @jgerman. Do you mean that you have some app implemented using the kafka streams DSL, and that you basically want to track the progress of the underlying consumers by logging the committed offsets as they are committed? Not sure how I'd do that. I think again it might be something for which you'd need to bust out the processor api.
yeah, I've only been looking at Streams and Jackdaw for a day or so, but in the monitoring section of the Manning book on streams there's an example of implementing interceptors (in java) to get that sort of information
public class StockTransactionConsumerInterceptor implements
ConsumerInterceptor<Object, Object> {
// some details left out for clarity
private static final Logger LOG =
LoggerFactory.getLogger(StockTransactionConsumerInterceptor.class);
public StockTransactionConsumerInterceptor() {
<http://LOG.info|LOG.info>("Built StockTransactionConsumerInterceptor");
}
@Override
public ConsumerRecords<Object, Object>
(ConsumerRecords<Object, Object> consumerRecords) {
<http://LOG.info|LOG.info>("Intercepted ConsumerRecords {}",
buildMessage(consumerRecords.iterator())); 1
return consumerRecords;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
<http://LOG.info|LOG.info>("Commit information {}", map); 2
}
if I'm going to jump into the ProcessorAPI with jackdaw I'm off in custom interop land correct?
my intention here is to be able to report consumer lag to datadog, we've had trouble getting jmx metrics out of kafka
Ah yeah interceptors is probably a good tool to use for that purpose. You can also do them in clojure. You just need to make sure they are AOT compiled so that kafka. Hopefully jackdaw neither helps nor hinders in this area. It should just get out of your way
cool, I'll poke around and see if I can figure out how to handle it, thanks!