@dm3 [aleph "0.4.2-alpha8”]
with [manifold "0.1.6-alpha1”]
btw, I haven’t fully deployed it, but I’m running it on one node
I looked at the code and can't see how this can happen
Also trying to write local tests to proof that this can happen
is this a basic (s/stream)
?
but no success so far
no it’s a subscription on an event bus with a complex transform
but it should be a stream in the end
yeah, there are different internal impls
how is the stream constructed? Do you specify an xform? How do you provide buffer size?
I did already find that the closing of streams works slightly different that i had expected
yeah I’m trying to extract those things now. It’s all over the place now
I’ll try to make an equivalent of what I have in production without all the clutter
I feel for you 🙂
haha thanks
The manifold.stream/transform
doesn’t have a timeout one it
This is not the real issue i’m seeing in production, but this also doesn’t match my mental model of manifold
I think how this works
the first value always gets "taken" from the source stream at the point you connect
so, once connected, there will always be a pending-take
on the source stream
after you did put-all!
on the source stream before that, it created a pending-put
so the value 1
is propagated to the sink stream once you connect
which also creates a pending-put
for the value 2
on the source stream (so you can think of the value as already in the stream)
I guess the timeout should have happened for the value 2
Yes that’s what i think should happen. It seems the :timeout
option via connect does some skipping too
Btw I think I’ve replicated the production issue. Maybe it’s because I’m using a s/buffered-stream
and not s/stream
maybe i’m misusing the s/buffered-stream
What I really want to achieve is what zach describes in his presentation here [1], a way to prevent one slow consumer to block the rest, but I haven’t found how yet [1] https://www.youtube.com/watch?v=1bNOO3xxMc0&feature=youtu.be&t=1887
event bus should achieve that
the subscription should drop due to a timeout
Yes I would have hoped that, but this never happened in production
And consequently new connections wouldn’t receive any messages, because, i think, the other subscribers were blocking the producer
The documentation on buffered-streams
implies that the buffer-size
shouldn't get this big i think:
(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)
A stream which will buffer at most limit data, where the size of each message is defined by (metric message).
yep
seems like a bug with buffered-stream.. if you use (s/stream buffer-size)
, the buffer-size doesn't go over the capacity
yeah this one is interesting too. It seems to be a very specific issue with buffered-stream as it doesn’t happen here in the same way:
(fact "Buffered stream does have a limit +1"
(let [s (s/buffered-stream 10)]
@(d/timeout! (s/put-all! s (range 100)) 100 :timed-out-like-expected) => :timed-out-like-expected
(s/description s) => {:buffer-capacity 10, :buffer-size 11, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))
ah
ok, one issue is with publishing to the bus
you are not respecting the backpressure
on bus/publish!
What about this one?
(fact "Buffered stream goes wild in combination with another thread"
(let [s (s/buffered-stream 10)]
(future
(dotimes [i 100]
(s/put! s i)))
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))
when calling put!
, you have to respect the backpressure
ah ok
I'm not sure what it should be doing here though, if you don't respect it
it's probably undefined
accumulating values in the buffer doesn't seem like the best idea
yes I guess it has to queue somewhere. I thought too that buffered-stream would work like a sliding-buffer and drop values
Do you know how to achieve this with manifold? I had something like this with core.async
maybe i should use try-put! instead
stream/throttle
?
you can also whip up something based on d/loop
which I usually do
look at the throttle
impl for example
if i don’t want to throttle on the rate I need to have a high rate and set a max-backlog I suppose
thanks, I’ll have a look
would be also great if you could create an issue with the test cases
to question the behaviour that we've seen
Would you mark them all as issues? Or should I just go ahead and let Zach decide?
I'd just have one issue
with the behaviour of buffered streams when puts
don't respect the backpressure
ah yeah, ok I’ll do that
thanks for your support 🙂
np
thanks 🙂
I think I’ll stay away from buffered-stream for now 🙂
it's the same issue
zip runs put!
on every item
yes you are right so I guess i could listen to the backpressure via a deref of manifold.bus/publish!
I have replaced buffered-stream with a normal stream with a buffer and redeployed to see how it changes things
if the bus is configured with a timeout, the connection to the slow stream should be severed
but you have to manage the backpressure
Yeah I’ll have to think it through one more time
I have at least one other issue with manifold in combination with aleph. It might be just be my misunderstanding. I’m sending a stream of server side events to a http client (in this case curl). I would suspect that aleph would close this stream on disconnect, right? Yet this is not something I have observed
I don't have much experience with aleph
but I'd look for on-close
in the sources
or something like that
in manifold you can only close a sink that you are writing to
ah thanks i’ll have a look
manifold streams will automatically close once all downstream sinks are closed
it could be that you created your streams in the wrong way, though
take a look at the upstream?
or downstream?
properties of your stream
ah @lmergen you are right. I think the problem is with an intermedia transform:
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream)
([{:op "transducer"} << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream)
([{:op "map"} << sink: {:type "callback"} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream)
[<< stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>]
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream first s/downstream)
nil
i think i have to create a transform that has a :timeout
btw, the above says the final stream is closed, but the intermediate ones not
that said it might be are a weird mix, as there are also pending puts on the first one
ah right, always forget about that
sneaky auto close
also close signal doesn't propagate unless you put another value
IIRC
yeah that was my first suprise indeed:
(fact "Transform doesn't close right away, but after a new put"
(let [source (s/stream)
sink (s/transform (map identity) source)]
(s/close! sink)
(s/closed? source) => false
(s/put! source :foo)
(s/closed? source) => true))
yes
i don’t think this is the best behavior, tbh
it's an implementation issue - not very easy to solve
i guess it is also an optimization as you don’t have to go up the chain, but you do it lazily down the chain? Close whatever you can close along the way
but it is a gotcha though
btw, @lmergen i see you are in Amsterdam too 🙂 Haven’t been to the meetup lately. Do you do a lot with Aleph?
@jeroenvandijk we’re actually hosting the meetup nowadays 🙂
we do quiet a lot with aleph
we’re heavy into the juxt ecosystem (most notably yada), which does everything with aleph
on top of that, all our microservices use it for http clients as well
and… it has its rough edges, like these
ah nice
i saw the meetup page indeed. I have to go to the meetups again 🙂
I’m trying to build an Hystrix clone. It works locally, but not so much under high load it seems
I’m positive Aleph/Manifold can do it though if i use it right
hmm locally the transform really doesn’t need a timeout to close. I think the high load really does something different somewhere. Maybe i should simplify the transform and see what happens
you might be interested in looking at https://github.com/tulos/manifail - tangentially related
ah thanks, definitely interesting!
The main thing I’m after in the beginning is convenient monitoring and latency control. short-circuiting would also be interesting. I’m not planning to build a one to one hystrix clone, but I would like to make something that is compatible with their dashboard
In what kind of application do you use this?
retrying remote requests mostly
in various places
it has a nice and compact code base
only handles retries though
yeah I’m working on a realtime bidding system. The remote services we use are normally reliable, but getting latency perfectly under control is harder. We have something that works, but could be better. I would like to apply semaphores and timeouts to get this better. A hystrix kind of abstraction in idiomatic clojure would be nice i think
hehe, aren't you scared of GC pauses all over the place?
you mean because of the Hystrix approach or just in general?
it is realtime bidding in the advertising domain so you still have 100ms or more per request, not too bad
it’s not bidding on a money exchange where it is submillisecond
yeah, you should be able to keep under that limit. However you are destined to have a fun time reducing allocations in Clojure code at some point 🙂
it has been live for a few years so nothing too bad 😎
i’m profiling my newly deployed code now, and i see clojure.lang.LockingTransaction$RetryEx coming up. Never seen that before. Does it ring a bell?
ah, ok then 🙂
STM?
conflicts in swap!
?
yeah must something like that, but never seen it before so must have something to do with the new code. So i guess nothing manifold specific
I also noted that there are some reflection issues with Manifold. I get quite some warnings
The reflector is also scoring high in the profiling list
Ah the downside of debugging/profiling in a live system… something else is causing the hiccup
I'd be surprised
I think it is clojure.pprint/pprint
printing a lot of errors at the moment...
or better the agent that is calling pprint, agents have a lockingtransaction apparently
I have something working. There was one other issue where I didn’t add the newlines and so the SSE endpoint wouldn’t flush the events
thanks for the help!
@jeroenvandijk interesting you’re working on a hystrix clone! i was actually thinking about doing some work in this area as well, but am more looking for a higher-level “microservice client”, like what you could imagine the client-side counterpart of Yada to be
Cool 🙂 I can imagine it doesn’t matter much if it is on the server or on a client. My first approach was with core.async and I actually thought of creating a browser demo. But core.async is harder to wrap than manifold due the dependency of macros. I’m not sure what needs to happen to make something work on a browser client if that’s what you mean
true, but if you want to implement hystrix features like bulkheads, things start to become a bit more difficult with manifold i think
you mean bulk head requests? or is it that a term i don’t know?
never mind i see it is in hystrix
Yeah I’m not after complete thread isolation i guess
I would like to use semaphores to control the concurrency
that is actually pretty easy to implement with manifold
Maybe thread isolation would be possible with setting specific executors
I’m not a manifold expert either, nor hystrix for that matter. So maybe I’m underestimating the full problem
The first version i have in mind is simple enough i think
If i get this reporting to work properly that is 🙂
I can give you a preview if you are interested in the semaphore implementation
Maybe I’m missing something. Why do you think bulkheads would be hard to implement with manifold?
When I read this http://stackoverflow.com/questions/30391809/what-is-bulkhead-pattern-used-by-hystrix I’m guessing that a semaphore (control the number of concurrent requests) and an executor with a maximum of threads would be equivalent
it’s important to realise that the problem that’s being solved is that of thread starvation in one place
if you do not allocate at least one thread to each “component”, you still face the problem that all your threads might be hanging in another place
so this would not solve the deadlocking issue that is solved with bulkheads
Ok I think i’m just unfamiliar with the particular use case or I haven’t faced the issue yet. Do you have an example somewhere of an event like this?
For example, the stackoverflow says that a semaphore approach cannot timeout requests and a thread approach can. I don’t think i understand why the semaphore approach wouldn’t allow that
In previous efforts I found that a thread can also not be cleaned up cleanly in all cases. Which means that if your underlying code is faulty it doesn’t really matter how you wrap it. I hope to be proven wrong though
Consider the case where all your threads are blocked in some HTTP request. Indirectly, however, that request is issueing another request on your server, and is actually blocked on that
Then you have a deadlock purely because there is no thread available to handle that other request
hmm i think i see what you mean. So that extra thread would keep an eye on timeouts I guess? I mean without timeouts this situation would be unsolvable, right? I'll think about it a bit more. I guess my application is a lot more simple and doesn’t have these deadlock situation AFAIK
Well it's meant to solve a specific type of failure scenario. It is comparable to a deadlock
However, it is distributed
I do want to say that using a semaphore does not solve the issue we're describing here, unless your total number of threads is more than your all your semaphore limits together
Hmm yeah i think that’s true. The thing i’m missing is how this would be 100% prevented with a thread based approach. If you have a threadpool you have the same issue, right? Only being able to create a new thread would circumvent this I guess
But I don’t think I have this deadlock scenario in my application. Maybe i’m ignorant or maybe I just work on a less complex system
I would love the chat about some more later as you do seem to have this problem. Maybe can also help validating the hystrix clone 🙂
Well you can always come by if you're in Amsterdam. These things are fascinating and difficult to get right.
https://johnragan.wordpress.com/2009/12/08/release-it-stability-patterns-and-best-practices/
that book, Release It! is where bulkheads, circuit breakers etc are first described, btw
Yeah I guess it’s time to start reading it and stop being ignorant 😬
I’ll visit the next Clojure meetup, I see it’s next wednesday. Maybe we can meet there too
Have to run. Will post new results/questions here later 🙂
Thanks for all the help
it seems in general bulkheads are hard to achieve on the JVM
if you have a runtime like Erlang's, your process just dies and restarts fresh
independent of others
@dm3: so you could pretty much say that bulkheads are necessary because of the JVM
basically, bulkheads are indirectly part of erlang's actor architecture, which in turn is stolen from CSP
in other words, people have once again already figured all this out in the 60s :)
I think it's a property of runtime, no?
@jeroenvandijk: would be awesome, I'll probably be there!
@dm3: it's a property of the concurrency model
hm, so you are saying core.async enables easy bulkheads? I uess I don't really understand what you mean by those
maybe, i'm not very familiar with core.async, but as far as I'm aware it shares a thread pool between actors, right ?
anyway, you pretty much just want one threadpool (or thread) per actor
well, core.async is an implementation of CSP ideas
I guess a programming model + a proper runtime makes a robust system
you have natural bulkheads and the ability to fail fast
in erlang
yes, but if focuses mostly on abstractions of CSP
it's not pure CSP, probably because of the ease of use