A suite to track Project Diva score statistics and ratings / D4DJ event data.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
projectdivar/server/node_modules/pg-promise/lib/stream.js

127 lines
3.5 KiB

/*
* Copyright (c) 2015-present, Vitaly Tomilov
*
* See the LICENSE file at the top-level directory of this distribution
* for licensing information.
*
* Removal or modification of this copyright notice is prohibited.
*/
const {Events} = require(`./events`);
const npm = {
utils: require(`./utils`),
text: require(`./text`)
};
////////////////////////////////////////////
// Streams query data into any destination,
// with the help of pg-query-stream library.
function $stream(ctx, qs, initCB, config) {
const $p = config.promise;
// istanbul ignore next:
// we do not provide code coverage for the Native Bindings specifics
if (ctx.options.pgNative) {
return $p.reject(new Error(npm.text.nativeStreaming));
}
// Stream class was renamed again, see the following issue:
// https://github.com/brianc/node-postgres/issues/2412
if (!qs || !qs.constructor || qs.constructor.name !== `QueryStream`) {
// invalid or missing stream object;
return $p.reject(new TypeError(npm.text.invalidStream));
}
if (qs._reading || qs._closed) {
// stream object is in the wrong state;
return $p.reject(new Error(npm.text.invalidStreamState));
}
if (typeof initCB !== `function`) {
// parameter `initCB` must be passed as the initialization callback;
return $p.reject(new TypeError(npm.text.invalidStreamCB));
}
let error = Events.query(ctx.options, getContext());
if (error) {
error = getError(error);
Events.error(ctx.options, error, getContext());
return $p.reject(error);
}
const stream = ctx.db.client.query(qs);
stream.on(`data`, onData);
stream.on(`error`, onError);
stream.on(`end`, onEnd);
try {
initCB.call(this, stream); // the stream must be initialized during the call;
} catch (e) {
release();
error = getError(e);
Events.error(ctx.options, error, getContext());
return $p.reject(error);
}
const start = Date.now();
let resolve, reject, nRows = 0;
function onData(data) {
nRows++;
error = Events.receive(ctx.options, [data], undefined, getContext());
if (error) {
onError(error);
}
}
function onError(e) {
release();
stream.destroy();
e = getError(e);
Events.error(ctx.options, e, getContext());
reject(e);
}
function onEnd() {
release();
resolve({
processed: nRows, // total number of rows processed;
duration: Date.now() - start // duration, in milliseconds;
});
}
function release() {
stream.removeListener(`data`, onData);
stream.removeListener(`error`, onError);
stream.removeListener(`end`, onEnd);
}
function getError(e) {
return e instanceof npm.utils.InternalError ? e.error : e;
}
function getContext() {
let client;
if (ctx.db) {
client = ctx.db.client;
} else {
error = new Error(npm.text.looseQuery);
}
return {
client,
dc: ctx.dc,
query: qs.cursor.text,
params: qs.cursor.values,
ctx: ctx.ctx
};
}
return $p((res, rej) => {
resolve = res;
reject = rej;
});
}
module.exports = $stream;