jackdaw

https://github.com/FundingCircle/jackdaw
abdullahibra 2020-07-21T11:59:44.091900Z

Hi everyone

abdullahibra 2020-07-21T12:00:03.092300Z

is there a way to get kafka message timestamp for example in map or for-each ?

abdullahibra 2020-07-21T12:00:17.092700Z

i see map and for-each only take 2 args which are key and value

abdullahibra 2020-07-21T12:00:24.093Z

but what about the additional keys

abdullahibra 2020-07-21T12:00:24.093200Z

?

Daniel Stephens 2020-07-21T12:07:22.094700Z

I believe you have to drop into the slightly lower level functions using process or transform to get the other info, last I checked that's what you needed to do in pure Kafka Streams as well

Daniel Stephens 2020-07-21T12:08:12.095200Z

Then you get given the ProcessorContext object which has the timestamp among other things

abdullahibra 2020-07-21T12:11:02.095600Z

that's really good advice, thank you 🙂

👍 1
abdullahibra 2020-07-21T14:07:26.095900Z

@dstephens hey

abdullahibra 2020-07-21T14:07:38.096300Z

(reify Transformer
   (init [_ _])
   (close [_])
   (transform [ctx k v]
     (key-value [k [v (.timestamp ctx)]])))

abdullahibra 2020-07-21T14:08:13.096800Z

i have got an error: java.lang.IllegalArgumentException: No matching field found: timestamp for class

abdullahibra 2020-07-21T14:08:23.097Z

can you help here?

Daniel Stephens 2020-07-21T14:13:27.100800Z

Ahh sure, it's a bit confusing because the java expect some mutable state. The important bit that's wrong for your code is that the first argument passed to transform function you've reified is this in java terms, so the field .timestamp doesn't exist because you are calling it on your reified object. Processor context actually get's passed in as the second arg to init and then gets mutated as kakfa reads. I have this in my code:

(defn transformer
  [f]
  (let [ctx (atom nil)]
    (reify Transformer
      (init [_ context]
        (reset! ctx context))
      (close [_])
      (transform [_ k v]
        (some-> (f @ctx k v)
                jl/key-value)))))
and that way I can provide a clojure function which does accept the useful (imo) 3 args (fn [ctx k v] ...)

abdullahibra 2020-07-21T14:30:32.101700Z

that's good, but (f @ctx k v) that function will return what i need [k [v timestamp]] right ?

abdullahibra 2020-07-21T14:30:55.102100Z

@dstephens is there a better way you prefer to do that ?

abdullahibra 2020-07-21T14:32:34.102600Z

i meant to return timestamp to make it available for next processing by map or for-each

Daniel Stephens 2020-07-21T14:34:58.104900Z

yeah, if you give back [k [v timestamp]] then your value is now a vector of your old value and the kafka timestamp (whichever one you configured), the rest is dependent on your design, personally if the time is required by my business logic I'd probably put the timestamp on v manually when it gets made but I have no idea if that is 'good practice'

Daniel Stephens 2020-07-21T14:36:35.106600Z

Personally I'd rather not rely on location in a vector either, so I'd probably give back [k {:value v :metadata {:timestamp timestamp}}] at least, so that you can extend it later more easily

Daniel Stephens 2020-07-21T14:37:43.107600Z

and then in your map you can have your function (fn [[k v]] (get-in v [:metadata :timestamp])) or whatever you want

Daniel Stephens 2020-07-21T14:38:11.108100Z

Depends on what serialisation you use as well, lots of variables I'm afraid!

abdullahibra 2020-07-21T14:42:15.108300Z

great advice

abdullahibra 2020-07-21T14:42:20.108600Z

thank you so much

Daniel Stephens 2020-07-21T14:47:44.110700Z

Glad I could help 😊 I should say as a disclaimer I'm not one of the maintainers of jackdaw or anything so while I think my technical advice is correct you should listen to my design advice with reasonable doubt and read lots of confluent docs on that stuff!

abdullahibra 2020-07-21T14:50:12.110900Z

🙂