In my streams app I have an aggregate
but the key are UUIDs.. maybe only 3-4 events have the same UUIDs. I see that the default changelog topic has retention policy set to compact
which I guess is ok if you want a table. I only want to aggregate for a window of maybe 1 or 2 days.
My current changelog topic after only running a week is already >50GB and the rocksdb state store 90GB+.
Is there a way to set the retention policy to compact,delete
for the changelog topic and that then also that the rocksdb state on disc doesn't grow indefinitely?
Just in case someone reads this and has a similar problem.
The solution is to use a windowing. A timewindow key for the aggregate sets the changelog retention policy to compact,delete
like I wanted.
So instead of
(-> kstream
;; ...
(j/group-by-key)
(j/aggregate ,,,,,,)
(j/to-kstream)
(j/to topic))
I have to
(-> kstream
;; ...
(j/group-by-key)
(j/window-by-time (TimeWindows/of (Duration/ofDays 2)))
(j/aggregate ,,,,,,)
(j/to-kstream)
(j/map (fn [[k v]] [(.key k) v]))
(j/to topic))