eviction should happen immediately after emit. Do the messages arrive somewhat in order? If they are at completely random times then watermarks plus a delay won’t help
if there won’t be any segments arriving in that window after you start receiving segments that occur 30 minutes after, then it should be working fine and there’s definitely an issue somewhere else though
could it be the presence of the timer trigger in your composite trigger?
Now that you’re evicting, those windows may just be getting flushed by the timer trigger
Is the delay wall time or watermark time? I'm simulating a month in terms of internal timestamps, but the whole thing runs in about ten minutes.
Watermark time
Vs time of the given segment
It seems to be working if I set the delay equal to the window range. I think I must be misunderstanding the purpose of the delay. And as such I'm not clear if the delay is really required here, or if I just managed to hack around something else I did wrong. Anyway, it seems to be solid now.
Except when I get this exception: "Messenger subscription image was lost, and rejoined. This is generally due to timeout conditions e.g. garbage collection, or other causes of connection flapping. Rebooting peer." It that case I seem to lose segments, though maybe that's because I managed to boff the checkpointing of my custom input.
Most likely bad checkpointing
Is it expected that a delay would be required for eviction to work properly, in the sense of ensuring that the window has captured all segments in the time interval? The input is just generating segments equally spaced in time and in order, and the aggregation is counting segments and doing some simple reduction of other values in the segment.
A little more info: I had accidentally removed the delay in the case where segments appeared to be lost. I added some debuggery to the function called to emit the final state, triggered by the watermark with no delay. Under normal conditions. the final-emit function is called once, and the state indicates all expected segments were aggregated in the window. But after receiving a "Messenger subscription image was lost", final-emit gets called twice, with each call having a different part of the window. So if I expect 1000 segments in a window, the first call might show 800, and the second 200. Putting in a trigger delay equal to the window range appears to alleviate this.
So, generally the way it should work is that the watermark + delay should put an upper bound on the time before a window will be emitted, with the delay being used to handle any stragglers that may arrive later than some later timestamps. But when you are completing the job you need to flush the remaining messages (which I believe you have handled with punctuation messages). That’s strange that the final emit will be called with different sets of data. I’d be worried that they delay is just papering over it unless it’s because the data is arriving out of order. I think you should print out the messages as they are added to your window as it’d be helpful to know whether your watermark + delay is acting correctly to wait for stragglers to arrive
What would cause "stragglers"? The input is generating the segments in strict time order, and the watermark is just the ms of the generated time stamp. The only time I see this issue is when recovering from an exception, the "Rebooting peer" scenario. I'm pretty sure the checkpointing/recovery is working right, and the input is pretty simple, essentially the collection plugin with a little extra state.
ok, if everything is in order, that can’t be it.
if everything is in order, then I believe it should work with delay 0. I assume the peer is rebooting after the subscription image is lost?
Correct.
Hmm. That’s pretty odd. What does the parallelism look like on the tasks prior to the window task?
This is the workflow. Input goes to several results tasks which are parallel. each with different window range, and those emit to the output. All tasks have :max-peers 1
, so each path through the DAG should be input -> results -> output.
Max peers was what I was looking for. Since it’s 1 it can’t be related to what I was thinking.
Are the timestamps unique? Or can some segments have the same timestamp?
Should be unique, generated via range
as in the code above.
k, can’t be that then.
I've captured the segments seen by one of the results aggregator tasks, and looked for places where they are not in time order (shown above). These are as expected, three exceptions and corresponding restarts. What is interesting is that the double watermark issue and "lost" segments only occurred when the recovery crossed windows, in the last two cases (window range is one day). I'll run a few more and see if that pattern persists.
Never mind about that last part, saw a case where the recovery spanned windows with no issue.
Is it possible you’re setting a watermark somewhere in your input plugin but not resetting it when recover is called?
thus passing down a later watermark than indicated from the point you recovered?
If you’re using assign-watermark-fn rather than implementing the protocol, I will look at onyx’s code instead
Using assign-watermark-fn.
Ok. We may have a recovery bug there. I’ll let you know when I’ve had a chance to look at it.
Some more data points: the issue does not always occur on recovery, only sometimes. I also forced some random exceptions in the job, and they show the same behavior, so it isn't isolated to the messenger exception.
K. Seems reasonably likely to be a recovery bug that is timing dependent then
Seems like we may be handling recovery badly for watermarks. Let me run the tests and cut you a snapshot to try
Cool, thanks.
Give this guy a try: 0.13.1-20180715.232706-16
No issues on the first run. I'll give it some more punishment and keep you posted.
:thumbsup: