Has anyone ever implemented a ManyToManyChannel
to ByteBuf
conversion? Especially regarding to byte-streams/def-conversion
?
I have never implemented these conversions and have already spent way too much time into this
@niklas.collin Do you mean “to read until closed and convert to ByteBuf whatever was there”?
Yeah
I have actually another problem too but even though that is related it is not strictly the same
You can try implement this by extending ByteSource
protocol, let me find you an example
https://github.com/ztellman/byte-streams/blob/master/src/byte_streams.clj#L676
I’m not sure if it’s a good thing to do in general… such reader would be blocking
Maybe there’s another approach for tackling the same problem? Or you’re okay with blocking version?
Maybe. I'll think about this. Cheers for the example 👍
Sure!
@niklas.collin as a matter of fact, i've done exactly this, but it's much simpler
Sounds promising
it's pretty evil, though
but
i basically use manifold.stream/stream->seq
, and then reduce the whole sequence into a single bytebuf 🙂
(defn ^ByteBuffer concat-bytebufs
"Takes a collection of bytebuffers and returns a single large concatenated bytebuffer."
[xs]
(let [size (->> xs
(map #(.remaining %))
(reduce +))
bb (ByteBuffer/allocate size)
;; Puts a bytebuffer on the large bytebuffer
put-on-bb (fn [^ByteBuffer big ^ByteBuffer small]
(assert (>= (.remaining big) (.remaining small)))
(.put big (.slice small)))]
(run! (partial put-on-bb bb) xs)
(.flip bb)))
it's probably not the best performing way to do it
@lmergen bytes-streams
actually has conversions defined for (seq-of ByteBuffer)
. Meaning… concat-bytebuf
should work out of the box, no?
@niklas.collin Are we talking about ByteBuf
from Netty or ByteBuffer
from NIO?
so that makes me an idiot for writing this code 🙂
Netty
So ByteBuf
interesting, i did not know about that function
So, it’s a bit different and I’ve got the question correct
@lmergen https://github.com/ztellman/byte-streams/blob/3e0822b58172b8d351b366213eda6090892e32b3/src/byte_streams.clj#L352
that's awesome, thanks @kachayev
stream-of ByteBuffer
to InputStream means that everything else should go automatically
yes
😎
The only one conversion defined for netty’s ByteBuf
is https://github.com/ztellman/aleph/blob/d35d5bbbb090c31f3f53997ee1e0d68fc7b03044/src/aleph/netty.clj#L138-L142
So, that requires some work, but @niklas.collin you can use the same conversion as an example: https://github.com/ztellman/byte-streams/blob/3e0822b58172b8d351b366213eda6090892e32b3/src/byte_streams.clj#L352
But anyway. Basically what I would like to do is return a core async channel, then let it be automatically serialized as data comes in an async manner
core.async channel -> manifold.stream is simple
serialized and… what should happen next?
written to network?
Naturally it should be written to the socket
And yes, websockets
If you use aleph
for handling websocket connection, everything you need to do is to “connect” your core.async channel to manifold.stream that represent the connection
Aleph handles serialization to ByteBuf
from pretty much anything byte-streams
can understand
Here’s the connect
function: https://github.com/ztellman/manifold/blob/5d986a5b8ceb46a69caf38192076230d03f6a280/src/manifold/stream.clj#L300
Basically it means “take everything that comes from here and put it there”. If you need to apply custom serialization, just use manifold.stream/map
. You don’t need to worry about netty’s ByteBuf
- something that’s a text or binary (like byte array) would work