You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
127 lines
3.5 KiB
127 lines
3.5 KiB
/*
|
|
* Copyright (c) 2015-present, Vitaly Tomilov
|
|
*
|
|
* See the LICENSE file at the top-level directory of this distribution
|
|
* for licensing information.
|
|
*
|
|
* Removal or modification of this copyright notice is prohibited.
|
|
*/
|
|
|
|
const {Events} = require(`./events`);
|
|
|
|
const npm = {
|
|
utils: require(`./utils`),
|
|
text: require(`./text`)
|
|
};
|
|
|
|
////////////////////////////////////////////
|
|
// Streams query data into any destination,
|
|
// with the help of pg-query-stream library.
|
|
function $stream(ctx, qs, initCB, config) {
|
|
|
|
const $p = config.promise;
|
|
|
|
// istanbul ignore next:
|
|
// we do not provide code coverage for the Native Bindings specifics
|
|
if (ctx.options.pgNative) {
|
|
return $p.reject(new Error(npm.text.nativeStreaming));
|
|
}
|
|
// Stream class was renamed again, see the following issue:
|
|
// https://github.com/brianc/node-postgres/issues/2412
|
|
if (!qs || !qs.constructor || qs.constructor.name !== `QueryStream`) {
|
|
// invalid or missing stream object;
|
|
return $p.reject(new TypeError(npm.text.invalidStream));
|
|
}
|
|
if (qs._reading || qs._closed) {
|
|
// stream object is in the wrong state;
|
|
return $p.reject(new Error(npm.text.invalidStreamState));
|
|
}
|
|
if (typeof initCB !== `function`) {
|
|
// parameter `initCB` must be passed as the initialization callback;
|
|
return $p.reject(new TypeError(npm.text.invalidStreamCB));
|
|
}
|
|
|
|
let error = Events.query(ctx.options, getContext());
|
|
|
|
if (error) {
|
|
error = getError(error);
|
|
Events.error(ctx.options, error, getContext());
|
|
return $p.reject(error);
|
|
}
|
|
|
|
const stream = ctx.db.client.query(qs);
|
|
|
|
stream.on(`data`, onData);
|
|
stream.on(`error`, onError);
|
|
stream.on(`end`, onEnd);
|
|
|
|
try {
|
|
initCB.call(this, stream); // the stream must be initialized during the call;
|
|
} catch (e) {
|
|
release();
|
|
error = getError(e);
|
|
Events.error(ctx.options, error, getContext());
|
|
return $p.reject(error);
|
|
}
|
|
|
|
const start = Date.now();
|
|
let resolve, reject, nRows = 0;
|
|
|
|
function onData(data) {
|
|
nRows++;
|
|
error = Events.receive(ctx.options, [data], undefined, getContext());
|
|
if (error) {
|
|
onError(error);
|
|
}
|
|
}
|
|
|
|
function onError(e) {
|
|
release();
|
|
stream.destroy();
|
|
e = getError(e);
|
|
Events.error(ctx.options, e, getContext());
|
|
reject(e);
|
|
}
|
|
|
|
function onEnd() {
|
|
release();
|
|
resolve({
|
|
processed: nRows, // total number of rows processed;
|
|
duration: Date.now() - start // duration, in milliseconds;
|
|
});
|
|
}
|
|
|
|
function release() {
|
|
stream.removeListener(`data`, onData);
|
|
stream.removeListener(`error`, onError);
|
|
stream.removeListener(`end`, onEnd);
|
|
}
|
|
|
|
function getError(e) {
|
|
return e instanceof npm.utils.InternalError ? e.error : e;
|
|
}
|
|
|
|
function getContext() {
|
|
let client;
|
|
if (ctx.db) {
|
|
client = ctx.db.client;
|
|
} else {
|
|
error = new Error(npm.text.looseQuery);
|
|
}
|
|
return {
|
|
client,
|
|
dc: ctx.dc,
|
|
query: qs.cursor.text,
|
|
params: qs.cursor.values,
|
|
ctx: ctx.ctx
|
|
};
|
|
}
|
|
|
|
return $p((res, rej) => {
|
|
resolve = res;
|
|
reject = rej;
|
|
});
|
|
|
|
}
|
|
|
|
module.exports = $stream;
|
|
|