Hello fellow Clojurians I am new to Clojure and next jdbc (so far loving both) and was hoping to get some help to understand how to use next jdbc. I am currently trying to take ids from one database and inserting them into another. I would like to do this in portions as the table contains potentially millions of ids. As far I have been able to read I should be able to use the option {:fetch-size portion} to take the data in smaller portions at a time. My issue is that I don't understand how I can perform the following on each portion: 1. Get a portion of ids from database-1 2. Pick out relevant columns and perform transformations 3. Put the transformed columns into database-2 4. Take the next portion and do step 1 through 3 again. 5. Keep doing step 4 until there are no more rows in table. Here is my (simplified) attempt:
(def db-1-spec
{:dbtype "sqlserver"
:read-only false
:auto-commit false
:sslmode "require"
:dbname "database"
:user "username"
:password "secret-password"
:host "host"})
(def db-2-spec
{:dbtype "postgres"
the rest is the same as above...})
(defonce db-1-ds (jdbc/get-datasource db-1-spec))
(defonce db-2-ds (jdbc/get-datasource db-2-spec))
(def ids (delay
(plan/select! db-1-ds [:id]
["SELECT * FROM table-ids"]
{:fetch-size 10000})))
(defn insert-ids
[ids]
(sql/insert-multi! db-2-ds :target-table
[:id :other-info]
(transform @ids)))
Won't ids
simply take everything in the table, instead of only fetching 10000 rows at the time? How can I be sure that it only fetches 10000 ids at a time and then inserts those 10000 ids into db-2?
Hope someone can help me understand this better and that you are having a lovely weekend.You need to reduce
over a call to plan
-- which will essentially stream results from the first database and eagerly store them into the other database.
plan/select!
is going to produce all the IDs in one go. No streaming.
Hi Sean, thank you for your response and the jdbc clojure wrappers.
Is it correctly understood that the function I give to reduce
will be applied to each row in the result set? And if so is it the result of reduce
I should insert? I am having a hard time understanding exactly how to plan
and reduce
in this case.
Yes, essentially.
And it's more efficient if you can avoid realizing a row as Clojure data -- plan
lets you select specific columns from the row (in the reducing function) without turning the ResultSet
data into a Clojure hash map.
The "simple" way is to insert a row into db2 for each call of the reducing function but that is potentially slow. You want do the inserts in batches (see execute-batch!
). But the catch there is that you would need a stateful transducer and there are some subtle issues around using a stateful transducer over a reducible collection that manages resources (which is what the result of plan
is).
Hmm, this caught my eye as I’ve implemented something like this.
Basic scenario is I want to query (“stream”) data from a table and insert those results to another table.
I’m using prepared insert statement, with execute-batch!
and using plan
(this happens inside with-transaction
):
(with-open [prepared (next/prepare db [(insert-statement target-table columns)])]
(next/execute-batch!
prepared
(next/plan db [query-str (into-array String ids)] opts)))
Are there some fundamental problems with this approach? I started feeling uncomfortable when you mention “subtle issues” 😄 But I don’t think I have any stateful transducers in line here. If you have any resources you could point me to read more about this I would be grateful! I’ve read the next.jdbc cljdoc with care and I think I understand this more-or-less.
When using execute-batch! the plan is eventually reduced by run!
when adding parameters to batch, right?@snurppa My first reaction is: I’m surprised that even works at all! I’ll have to dig in the code to see what it is actually doing…
execute-batch!
expects “parameter groups” so that should be a vector-of-vectors — are you using an array builder on plan
? Or have you omitted some critical reducing function over plan
?
Hi Sean, yes I’m using array builder as-unqualified-arrays
, sorry I forgot to put that to the example. The opts for plan are:
{:fetch-size 1000
; <https://cljdoc.org/d/seancorfield/next.jdbc/1.0.462/doc/getting-started/tips-tricks#streaming-result-sets-1>
:auto-commit false
:concurrency :read-only,
:cursors :close,
:result-type :forward-only
:builder-fn rs/as-unqualified-arrays}
OK. I’ll have to give this some thought and experiment with it a bit. Like I say, I’m a bit surprised it works at all… and it may be just fine (there’s no obvious “state” that I can think of in consuming an array-based plan
)… and if it’s safe then it might well be something I should put in the docs because it’s not an approach I’d even thought of for this…
It looks like that code is “safe”. It is working because each row of the result set is being realized as set-parameters
calls seq
on it (and, you’re right that it is being reduced via the run!
call — but it needs each row to be realized as well for plan
to be consumed correctly).
So it is safe by virtue of the implementation of set-parameters
rather than being inherently “safe”. I think I’m going to add notes to execute-batch!
and set-parameters
so that they preserve this behavior — I think it’s actually rather nice to be able to just execute-batch!
over plan
to insert the results of a query, so I’ll document that as well.
Hi Sean. Ok great, I am relieved 😄 Yes I remember the construct being pretty handy when I came up with that about 1yr ago. Don’t remember the details how it ended like that, maybe I just found that “ok seems to work” and left it like that and continued elsewhere 🙂 > It is working because each row of the result set is being realized as `set-parameters` calls `seq` on it (and, you’re right that it is being reduced via the `run!` call — but it needs each row to be realized as well for `plan` to be consumed correctly). Thanks, good information, I will check this also myself so I have better understanding! > So it is safe by virtue of the implementation of… Yeah indeed, I thought that this may be the case when you said you’re surprised it even works and it has been working in production for almost a year, haha! Thanks again for this and all your efforts for the community 🙂
I noticed your GH comment. Appreciate the testing of the approach and I agree with you that this approach seems in the end to be discouraged :thumbsup: Thanks.
So you might want to think about reducing over plan and using core.async as a buffer between that ingest process and your transform process and perhaps also between that and your output process (inserting batches into the second DB).
This stuff is definitely non-trivial if you're new to Clojure. There are no "magic bullets" when dealing with extremely large data sets.
It definitely isn't, but you have given me a direction that I can now start to study. Thanks a lot for your help