aleph

2016-10-05T08:13:08.000022Z

@dm3 [aleph "0.4.2-alpha8”] with [manifold "0.1.6-alpha1”]

2016-10-05T08:13:48.000023Z

btw, I haven’t fully deployed it, but I’m running it on one node

dm3 2016-10-05T08:14:00.000024Z

I looked at the code and can't see how this can happen

2016-10-05T08:14:03.000025Z

Also trying to write local tests to proof that this can happen

dm3 2016-10-05T08:14:07.000026Z

is this a basic (s/stream)

dm3 2016-10-05T08:14:08.000027Z

?

2016-10-05T08:14:12.000028Z

but no success so far

2016-10-05T08:14:26.000029Z

no it’s a subscription on an event bus with a complex transform

2016-10-05T08:14:34.000030Z

but it should be a stream in the end

dm3 2016-10-05T08:14:42.000031Z

yeah, there are different internal impls

dm3 2016-10-05T08:15:07.000032Z

how is the stream constructed? Do you specify an xform? How do you provide buffer size?

2016-10-05T08:15:22.000033Z

I did already find that the closing of streams works slightly different that i had expected

2016-10-05T08:15:50.000034Z

yeah I’m trying to extract those things now. It’s all over the place now

2016-10-05T08:16:29.000035Z

I’ll try to make an equivalent of what I have in production without all the clutter

dm3 2016-10-05T08:18:37.000036Z

I feel for you 🙂

2016-10-05T08:18:45.000037Z

haha thanks

2016-10-05T08:34:34.000039Z

The manifold.stream/transform doesn’t have a timeout one it

2016-10-05T08:35:45.000040Z

This is not the real issue i’m seeing in production, but this also doesn’t match my mental model of manifold

dm3 2016-10-05T08:46:30.000042Z

I think how this works

dm3 2016-10-05T08:46:47.000043Z

the first value always gets "taken" from the source stream at the point you connect

dm3 2016-10-05T08:47:16.000044Z

so, once connected, there will always be a pending-take on the source stream

dm3 2016-10-05T08:48:30.000046Z

after you did put-all! on the source stream before that, it created a pending-put

dm3 2016-10-05T08:48:48.000047Z

so the value 1 is propagated to the sink stream once you connect

dm3 2016-10-05T08:49:25.000048Z

which also creates a pending-put for the value 2 on the source stream (so you can think of the value as already in the stream)

dm3 2016-10-05T08:50:18.000049Z

I guess the timeout should have happened for the value 2

2016-10-05T08:56:58.000050Z

Yes that’s what i think should happen. It seems the :timeout option via connect does some skipping too

2016-10-05T08:57:30.000051Z

Btw I think I’ve replicated the production issue. Maybe it’s because I’m using a s/buffered-stream and not s/stream

2016-10-05T09:01:07.000053Z

maybe i’m misusing the s/buffered-stream

2016-10-05T09:02:25.000054Z

What I really want to achieve is what zach describes in his presentation here [1], a way to prevent one slow consumer to block the rest, but I haven’t found how yet [1] https://www.youtube.com/watch?v=1bNOO3xxMc0&feature=youtu.be&t=1887

dm3 2016-10-05T09:02:54.000056Z

event bus should achieve that

dm3 2016-10-05T09:03:05.000057Z

the subscription should drop due to a timeout

2016-10-05T09:03:50.000058Z

Yes I would have hoped that, but this never happened in production

2016-10-05T09:04:52.000059Z

And consequently new connections wouldn’t receive any messages, because, i think, the other subscribers were blocking the producer

2016-10-05T09:06:05.000060Z

The documentation on buffered-streams implies that the buffer-size shouldn't get this big i think:

(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)
A stream which will buffer at most limit data, where the size of each message is defined by (metric message).

dm3 2016-10-05T09:07:43.000062Z

yep

dm3 2016-10-05T09:10:06.000063Z

seems like a bug with buffered-stream.. if you use (s/stream buffer-size), the buffer-size doesn't go over the capacity

2016-10-05T09:13:56.000064Z

yeah this one is interesting too. It seems to be a very specific issue with buffered-stream as it doesn’t happen here in the same way:

(fact "Buffered stream does have a limit +1"
      (let [s (s/buffered-stream 10)]

        @(d/timeout! (s/put-all! s (range 100)) 100 :timed-out-like-expected) => :timed-out-like-expected

        (s/description s) => {:buffer-capacity 10, :buffer-size 11, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))

dm3 2016-10-05T09:16:53.000065Z

ah

dm3 2016-10-05T09:17:01.000066Z

ok, one issue is with publishing to the bus

dm3 2016-10-05T09:17:15.000067Z

you are not respecting the backpressure

dm3 2016-10-05T09:17:19.000068Z

on bus/publish!

2016-10-05T09:18:07.000069Z

What about this one?

(fact "Buffered stream goes wild in combination with another thread"
      (let [s (s/buffered-stream 10)]

        (future
          (dotimes [i 100]
            (s/put! s i)))

        (Thread/sleep 100)

        (s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))

dm3 2016-10-05T09:18:38.000070Z

when calling put!, you have to respect the backpressure

2016-10-05T09:18:43.000071Z

ah ok

dm3 2016-10-05T09:18:55.000072Z

I'm not sure what it should be doing here though, if you don't respect it

dm3 2016-10-05T09:19:09.000073Z

it's probably undefined

dm3 2016-10-05T09:19:22.000074Z

accumulating values in the buffer doesn't seem like the best idea

2016-10-05T09:19:44.000075Z

yes I guess it has to queue somewhere. I thought too that buffered-stream would work like a sliding-buffer and drop values

2016-10-05T09:20:09.000076Z

Do you know how to achieve this with manifold? I had something like this with core.async

2016-10-05T09:20:41.000077Z

maybe i should use try-put! instead

dm3 2016-10-05T09:20:42.000078Z

stream/throttle?

dm3 2016-10-05T09:21:33.000079Z

you can also whip up something based on d/loop which I usually do

dm3 2016-10-05T09:21:43.000080Z

look at the throttle impl for example

2016-10-05T09:21:43.000081Z

if i don’t want to throttle on the rate I need to have a high rate and set a max-backlog I suppose

2016-10-05T09:22:05.000082Z

thanks, I’ll have a look

dm3 2016-10-05T09:22:44.000083Z

would be also great if you could create an issue with the test cases

dm3 2016-10-05T09:23:03.000084Z

to question the behaviour that we've seen

2016-10-05T09:23:11.000085Z

Would you mark them all as issues? Or should I just go ahead and let Zach decide?

dm3 2016-10-05T09:23:36.000086Z

I'd just have one issue

dm3 2016-10-05T09:23:55.000087Z

with the behaviour of buffered streams when puts don't respect the backpressure

2016-10-05T09:24:07.000088Z

ah yeah, ok I’ll do that

2016-10-05T09:24:26.000089Z

thanks for your support 🙂

dm3 2016-10-05T09:25:02.000090Z

np

2016-10-05T09:58:25.000091Z

here it is https://github.com/ztellman/manifold/issues/110

dm3 2016-10-05T09:59:58.000093Z

thanks 🙂

2016-10-05T10:37:28.000095Z

I think I’ll stay away from buffered-stream for now 🙂

dm3 2016-10-05T10:41:08.000096Z

it's the same issue

dm3 2016-10-05T10:41:25.000097Z

zip runs put! on every item

2016-10-05T10:54:05.000098Z

yes you are right so I guess i could listen to the backpressure via a deref of manifold.bus/publish!

2016-10-05T10:55:33.000099Z

I have replaced buffered-stream with a normal stream with a buffer and redeployed to see how it changes things

dm3 2016-10-05T10:55:59.000100Z

if the bus is configured with a timeout, the connection to the slow stream should be severed

dm3 2016-10-05T10:56:06.000101Z

but you have to manage the backpressure

2016-10-05T12:14:21.000102Z

Yeah I’ll have to think it through one more time

2016-10-05T12:14:35.000103Z

I have at least one other issue with manifold in combination with aleph. It might be just be my misunderstanding. I’m sending a stream of server side events to a http client (in this case curl). I would suspect that aleph would close this stream on disconnect, right? Yet this is not something I have observed

dm3 2016-10-05T12:16:53.000104Z

I don't have much experience with aleph

dm3 2016-10-05T12:17:12.000105Z

but I'd look for on-close in the sources

dm3 2016-10-05T12:17:23.000106Z

or something like that

dm3 2016-10-05T12:17:52.000107Z

in manifold you can only close a sink that you are writing to

2016-10-05T12:20:31.000108Z

ah thanks i’ll have a look

2016-10-05T12:28:07.000109Z

manifold streams will automatically close once all downstream sinks are closed

2016-10-05T12:28:22.000110Z

it could be that you created your streams in the wrong way, though

2016-10-05T12:28:40.000111Z

take a look at the upstream? or downstream? properties of your stream

2016-10-05T12:32:05.000112Z

ah @lmergen you are right. I think the problem is with an intermedia transform:

user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream)
([{:op "transducer"} << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream)
([{:op "map"} << sink: {:type "callback"} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream)
[<< stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>]
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream first s/downstream)
nil

2016-10-05T12:32:27.000113Z

i think i have to create a transform that has a :timeout

2016-10-05T12:32:59.000115Z

btw, the above says the final stream is closed, but the intermediate ones not

2016-10-05T12:34:48.000116Z

that said it might be are a weird mix, as there are also pending puts on the first one

dm3 2016-10-05T12:35:15.000117Z

ah right, always forget about that

dm3 2016-10-05T12:35:22.000118Z

sneaky auto close

dm3 2016-10-05T12:35:53.000119Z

also close signal doesn't propagate unless you put another value

dm3 2016-10-05T12:36:00.000120Z

IIRC

2016-10-05T12:37:12.000121Z

yeah that was my first suprise indeed:

(fact "Transform doesn't close right away, but after a new put"
             (let [source (s/stream)
                   sink (s/transform (map identity) source)]
               (s/close! sink)
               (s/closed? source) => false

               (s/put! source :foo)
               (s/closed? source) => true))

2016-10-05T12:37:36.000123Z

yes

2016-10-05T12:37:46.000124Z

i don’t think this is the best behavior, tbh

dm3 2016-10-05T12:38:09.000125Z

it's an implementation issue - not very easy to solve

2016-10-05T12:40:40.000126Z

i guess it is also an optimization as you don’t have to go up the chain, but you do it lazily down the chain? Close whatever you can close along the way

2016-10-05T12:40:56.000127Z

but it is a gotcha though

2016-10-05T12:53:05.000128Z

btw, @lmergen i see you are in Amsterdam too 🙂 Haven’t been to the meetup lately. Do you do a lot with Aleph?

2016-10-05T12:53:41.000129Z

@jeroenvandijk we’re actually hosting the meetup nowadays 🙂

2016-10-05T12:53:50.000130Z

we do quiet a lot with aleph

2016-10-05T12:54:10.000131Z

we’re heavy into the juxt ecosystem (most notably yada), which does everything with aleph

2016-10-05T12:54:26.000132Z

on top of that, all our microservices use it for http clients as well

2016-10-05T12:54:37.000133Z

and… it has its rough edges, like these

2016-10-05T12:54:47.000134Z

ah nice

2016-10-05T12:55:08.000135Z

i saw the meetup page indeed. I have to go to the meetups again 🙂

2016-10-05T12:55:39.000136Z

I’m trying to build an Hystrix clone. It works locally, but not so much under high load it seems

2016-10-05T12:56:19.000137Z

I’m positive Aleph/Manifold can do it though if i use it right

2016-10-05T13:01:08.000138Z

hmm locally the transform really doesn’t need a timeout to close. I think the high load really does something different somewhere. Maybe i should simplify the transform and see what happens

dm3 2016-10-05T13:08:56.000139Z

you might be interested in looking at https://github.com/tulos/manifail - tangentially related

2016-10-05T13:32:40.000141Z

ah thanks, definitely interesting!

2016-10-05T13:34:19.000142Z

The main thing I’m after in the beginning is convenient monitoring and latency control. short-circuiting would also be interesting. I’m not planning to build a one to one hystrix clone, but I would like to make something that is compatible with their dashboard

2016-10-05T13:36:13.000143Z

In what kind of application do you use this?

dm3 2016-10-05T13:36:51.000144Z

retrying remote requests mostly

dm3 2016-10-05T13:37:04.000145Z

in various places

2016-10-05T13:38:13.000146Z

it has a nice and compact code base

dm3 2016-10-05T13:38:45.000147Z

only handles retries though

2016-10-05T13:41:48.000148Z

yeah I’m working on a realtime bidding system. The remote services we use are normally reliable, but getting latency perfectly under control is harder. We have something that works, but could be better. I would like to apply semaphores and timeouts to get this better. A hystrix kind of abstraction in idiomatic clojure would be nice i think

dm3 2016-10-05T13:42:18.000149Z

hehe, aren't you scared of GC pauses all over the place?

2016-10-05T13:42:53.000150Z

you mean because of the Hystrix approach or just in general?

2016-10-05T13:43:20.000151Z

it is realtime bidding in the advertising domain so you still have 100ms or more per request, not too bad

2016-10-05T13:43:53.000153Z

it’s not bidding on a money exchange where it is submillisecond

dm3 2016-10-05T13:46:36.000154Z

yeah, you should be able to keep under that limit. However you are destined to have a fun time reducing allocations in Clojure code at some point 🙂

2016-10-05T13:48:05.000155Z

it has been live for a few years so nothing too bad 😎

2016-10-05T13:48:44.000156Z

i’m profiling my newly deployed code now, and i see clojure.lang.LockingTransaction$RetryEx coming up. Never seen that before. Does it ring a bell?

dm3 2016-10-05T13:48:47.000157Z

ah, ok then 🙂

dm3 2016-10-05T13:49:08.000158Z

STM?

dm3 2016-10-05T13:49:24.000159Z

conflicts in swap!?

2016-10-05T13:50:03.000160Z

yeah must something like that, but never seen it before so must have something to do with the new code. So i guess nothing manifold specific

2016-10-05T13:50:50.000161Z

I also noted that there are some reflection issues with Manifold. I get quite some warnings

2016-10-05T13:51:06.000162Z

The reflector is also scoring high in the profiling list

2016-10-05T13:54:00.000163Z

Ah the downside of debugging/profiling in a live system… something else is causing the hiccup

dm3 2016-10-05T13:54:01.000164Z

I'd be surprised

2016-10-05T13:54:38.000165Z

I think it is clojure.pprint/pprint printing a lot of errors at the moment...

2016-10-05T13:58:14.000167Z

or better the agent that is calling pprint, agents have a lockingtransaction apparently

2016-10-05T14:39:42.000169Z

I have something working. There was one other issue where I didn’t add the newlines and so the SSE endpoint wouldn’t flush the events

2016-10-05T14:39:48.000170Z

thanks for the help!

2016-10-05T15:26:23.000171Z

@jeroenvandijk interesting you’re working on a hystrix clone! i was actually thinking about doing some work in this area as well, but am more looking for a higher-level “microservice client”, like what you could imagine the client-side counterpart of Yada to be

2016-10-05T15:30:56.000172Z

Cool 🙂 I can imagine it doesn’t matter much if it is on the server or on a client. My first approach was with core.async and I actually thought of creating a browser demo. But core.async is harder to wrap than manifold due the dependency of macros. I’m not sure what needs to happen to make something work on a browser client if that’s what you mean

2016-10-05T15:32:26.000173Z

true, but if you want to implement hystrix features like bulkheads, things start to become a bit more difficult with manifold i think

2016-10-05T15:35:31.000174Z

you mean bulk head requests? or is it that a term i don’t know?

2016-10-05T15:36:04.000175Z

never mind i see it is in hystrix

2016-10-05T15:36:57.000176Z

Yeah I’m not after complete thread isolation i guess

2016-10-05T15:37:14.000177Z

I would like to use semaphores to control the concurrency

2016-10-05T15:37:37.000178Z

that is actually pretty easy to implement with manifold

2016-10-05T15:38:21.000179Z

Maybe thread isolation would be possible with setting specific executors

2016-10-05T15:38:51.000180Z

I’m not a manifold expert either, nor hystrix for that matter. So maybe I’m underestimating the full problem

2016-10-05T15:39:08.000181Z

The first version i have in mind is simple enough i think

2016-10-05T15:39:20.000182Z

If i get this reporting to work properly that is 🙂

2016-10-05T15:41:33.000183Z

I can give you a preview if you are interested in the semaphore implementation

2016-10-05T15:43:31.000184Z

Maybe I’m missing something. Why do you think bulkheads would be hard to implement with manifold?

2016-10-05T15:48:43.000185Z

When I read this http://stackoverflow.com/questions/30391809/what-is-bulkhead-pattern-used-by-hystrix I’m guessing that a semaphore (control the number of concurrent requests) and an executor with a maximum of threads would be equivalent

2016-10-05T15:48:58.000187Z

it’s important to realise that the problem that’s being solved is that of thread starvation in one place

2016-10-05T15:49:36.000188Z

if you do not allocate at least one thread to each “component”, you still face the problem that all your threads might be hanging in another place

2016-10-05T15:50:51.000189Z

so this would not solve the deadlocking issue that is solved with bulkheads

2016-10-05T15:56:24.000190Z

Ok I think i’m just unfamiliar with the particular use case or I haven’t faced the issue yet. Do you have an example somewhere of an event like this?

2016-10-05T15:59:49.000192Z

For example, the stackoverflow says that a semaphore approach cannot timeout requests and a thread approach can. I don’t think i understand why the semaphore approach wouldn’t allow that

2016-10-05T16:01:46.000193Z

In previous efforts I found that a thread can also not be cleaned up cleanly in all cases. Which means that if your underlying code is faulty it doesn’t really matter how you wrap it. I hope to be proven wrong though

2016-10-05T16:29:31.000194Z

Consider the case where all your threads are blocked in some HTTP request. Indirectly, however, that request is issueing another request on your server, and is actually blocked on that

2016-10-05T16:29:59.000195Z

Then you have a deadlock purely because there is no thread available to handle that other request

2016-10-05T17:03:16.000196Z

hmm i think i see what you mean. So that extra thread would keep an eye on timeouts I guess? I mean without timeouts this situation would be unsolvable, right? I'll think about it a bit more. I guess my application is a lot more simple and doesn’t have these deadlock situation AFAIK

2016-10-05T17:14:22.000198Z

Well it's meant to solve a specific type of failure scenario. It is comparable to a deadlock

2016-10-05T17:14:32.000199Z

However, it is distributed

2016-10-05T17:16:07.000200Z

I do want to say that using a semaphore does not solve the issue we're describing here, unless your total number of threads is more than your all your semaphore limits together

2016-10-05T17:19:13.000201Z

Hmm yeah i think that’s true. The thing i’m missing is how this would be 100% prevented with a thread based approach. If you have a threadpool you have the same issue, right? Only being able to create a new thread would circumvent this I guess

2016-10-05T17:21:19.000202Z

But I don’t think I have this deadlock scenario in my application. Maybe i’m ignorant or maybe I just work on a less complex system

2016-10-05T17:22:42.000203Z

I would love the chat about some more later as you do seem to have this problem. Maybe can also help validating the hystrix clone 🙂

2016-10-05T17:28:34.000205Z

Well you can always come by if you're in Amsterdam. These things are fascinating and difficult to get right.

2016-10-05T17:29:58.000208Z

that book, Release It! is where bulkheads, circuit breakers etc are first described, btw

2016-10-05T17:31:40.000209Z

Yeah I guess it’s time to start reading it and stop being ignorant 😬

2016-10-05T17:34:19.000210Z

I’ll visit the next Clojure meetup, I see it’s next wednesday. Maybe we can meet there too

2016-10-05T17:34:39.000211Z

Have to run. Will post new results/questions here later 🙂

2016-10-05T17:34:50.000212Z

Thanks for all the help

dm3 2016-10-05T17:36:28.000213Z

it seems in general bulkheads are hard to achieve on the JVM

dm3 2016-10-05T17:37:11.000214Z

if you have a runtime like Erlang's, your process just dies and restarts fresh

dm3 2016-10-05T17:37:20.000215Z

independent of others

2016-10-05T17:38:58.000216Z

@dm3: so you could pretty much say that bulkheads are necessary because of the JVM

2016-10-05T17:39:42.000217Z

basically, bulkheads are indirectly part of erlang's actor architecture, which in turn is stolen from CSP

2016-10-05T17:40:03.000218Z

in other words, people have once again already figured all this out in the 60s :)

dm3 2016-10-05T17:40:22.000219Z

I think it's a property of runtime, no?

2016-10-05T17:40:36.000220Z

@jeroenvandijk: would be awesome, I'll probably be there!

2016-10-05T17:40:56.000221Z

@dm3: it's a property of the concurrency model

dm3 2016-10-05T17:46:36.000222Z

hm, so you are saying core.async enables easy bulkheads? I uess I don't really understand what you mean by those

2016-10-05T17:49:59.000223Z

maybe, i'm not very familiar with core.async, but as far as I'm aware it shares a thread pool between actors, right ?

2016-10-05T17:50:27.000224Z

anyway, you pretty much just want one threadpool (or thread) per actor

dm3 2016-10-05T17:52:19.000225Z

well, core.async is an implementation of CSP ideas

dm3 2016-10-05T17:53:01.000226Z

I guess a programming model + a proper runtime makes a robust system

dm3 2016-10-05T17:53:19.000228Z

you have natural bulkheads and the ability to fail fast

dm3 2016-10-05T17:53:37.000229Z

in erlang

2016-10-05T17:55:49.000230Z

yes, but if focuses mostly on abstractions of CSP

2016-10-05T17:56:28.000231Z

it's not pure CSP, probably because of the ease of use