aleph

Empperi 2019-02-05T12:00:03.053200Z

Has anyone ever implemented a ManyToManyChannel to ByteBuf conversion? Especially regarding to byte-streams/def-conversion?

Empperi 2019-02-05T12:00:23.053600Z

I have never implemented these conversions and have already spent way too much time into this

kachayev 2019-02-05T13:54:47.054200Z

@niklas.collin Do you mean “to read until closed and convert to ByteBuf whatever was there”?

Empperi 2019-02-05T13:55:10.054400Z

Yeah

Empperi 2019-02-05T13:55:54.054800Z

I have actually another problem too but even though that is related it is not strictly the same

kachayev 2019-02-05T13:57:30.055600Z

You can try implement this by extending ByteSource protocol, let me find you an example

kachayev 2019-02-05T13:59:05.057100Z

I’m not sure if it’s a good thing to do in general… such reader would be blocking

kachayev 2019-02-05T13:59:49.057800Z

Maybe there’s another approach for tackling the same problem? Or you’re okay with blocking version?

Empperi 2019-02-05T14:16:33.058100Z

Maybe. I'll think about this. Cheers for the example 👍

kachayev 2019-02-05T14:17:13.058300Z

Sure!

2019-02-05T14:32:49.058800Z

@niklas.collin as a matter of fact, i've done exactly this, but it's much simpler

Empperi 2019-02-05T14:33:07.059Z

Sounds promising

2019-02-05T14:33:37.059700Z

it's pretty evil, though

2019-02-05T14:33:37.059900Z

but

2019-02-05T14:34:03.060500Z

i basically use manifold.stream/stream->seq, and then reduce the whole sequence into a single bytebuf 🙂

2019-02-05T14:34:32.060700Z

(defn ^ByteBuffer concat-bytebufs
  "Takes a collection of bytebuffers and returns a single large concatenated bytebuffer."

  [xs]

  (let [size (->> xs
                  (map #(.remaining %))
                  (reduce +))
        bb (ByteBuffer/allocate size)

        ;; Puts a bytebuffer on the large bytebuffer
        put-on-bb (fn [^ByteBuffer big ^ByteBuffer small]
                    (assert (>= (.remaining big) (.remaining small)))
                    (.put big (.slice small)))]

    (run! (partial put-on-bb bb) xs)
    (.flip bb)))

2019-02-05T14:37:33.061200Z

it's probably not the best performing way to do it

kachayev 2019-02-05T14:38:41.062700Z

@lmergen bytes-streams actually has conversions defined for (seq-of ByteBuffer). Meaning… concat-bytebuf should work out of the box, no?

kachayev 2019-02-05T14:39:04.063300Z

@niklas.collin Are we talking about ByteBuf from Netty or ByteBuffer from NIO?

2019-02-05T14:39:05.063400Z

so that makes me an idiot for writing this code 🙂

Empperi 2019-02-05T14:39:13.063900Z

Netty

Empperi 2019-02-05T14:39:29.064500Z

So ByteBuf

2019-02-05T14:39:29.064600Z

interesting, i did not know about that function

kachayev 2019-02-05T14:39:34.064800Z

So, it’s a bit different and I’ve got the question correct

2019-02-05T14:40:20.065600Z

that's awesome, thanks @kachayev

kachayev 2019-02-05T14:40:33.066Z

stream-of ByteBuffer to InputStream means that everything else should go automatically

2019-02-05T14:40:42.066200Z

yes

kachayev 2019-02-05T14:40:49.066400Z

😎

kachayev 2019-02-05T14:42:02.067Z

The only one conversion defined for netty’s ByteBuf is https://github.com/ztellman/aleph/blob/d35d5bbbb090c31f3f53997ee1e0d68fc7b03044/src/aleph/netty.clj#L138-L142

kachayev 2019-02-05T14:42:56.068Z

So, that requires some work, but @niklas.collin you can use the same conversion as an example: https://github.com/ztellman/byte-streams/blob/3e0822b58172b8d351b366213eda6090892e32b3/src/byte_streams.clj#L352

Empperi 2019-02-05T14:43:22.069Z

But anyway. Basically what I would like to do is return a core async channel, then let it be automatically serialized as data comes in an async manner

kachayev 2019-02-05T14:43:25.069100Z

core.async channel -> manifold.stream is simple

kachayev 2019-02-05T14:44:28.069700Z

serialized and… what should happen next?

kachayev 2019-02-05T14:44:39.070Z

written to network?

Empperi 2019-02-05T14:44:44.070200Z

Naturally it should be written to the socket

Empperi 2019-02-05T14:45:13.070400Z

And yes, websockets

kachayev 2019-02-05T14:48:25.071500Z

If you use aleph for handling websocket connection, everything you need to do is to “connect” your core.async channel to manifold.stream that represent the connection

kachayev 2019-02-05T14:49:12.072200Z

Aleph handles serialization to ByteBuf from pretty much anything byte-streams can understand

kachayev 2019-02-05T14:51:44.074200Z

Basically it means “take everything that comes from here and put it there”. If you need to apply custom serialization, just use manifold.stream/map. You don’t need to worry about netty’s ByteBuf - something that’s a text or binary (like byte array) would work