'use strict' /** * Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com) * All rights reserved. * * This source code is licensed under the MIT license found in the * README.md file in the root directory of this source tree. */ // eslint-disable-next-line var Native = require('pg-native') var TypeOverrides = require('../type-overrides') var semver = require('semver') var pkg = require('../../package.json') var assert = require('assert') var EventEmitter = require('events').EventEmitter var util = require('util') var ConnectionParameters = require('../connection-parameters') var msg = 'Version >= ' + pkg.minNativeVersion + ' of pg-native required.' assert(semver.gte(Native.version, pkg.minNativeVersion), msg) var NativeQuery = require('./query') var Client = (module.exports = function (config) { EventEmitter.call(this) config = config || {} this._Promise = config.Promise || global.Promise this._types = new TypeOverrides(config.types) this.native = new Native({ types: this._types, }) this._queryQueue = [] this._ending = false this._connecting = false this._connected = false this._queryable = true // keep these on the object for legacy reasons // for the time being. TODO: deprecate all this jazz var cp = (this.connectionParameters = new ConnectionParameters(config)) this.user = cp.user // "hiding" the password so it doesn't show up in stack traces // or if the client is console.logged Object.defineProperty(this, 'password', { configurable: true, enumerable: false, writable: true, value: cp.password, }) this.database = cp.database this.host = cp.host this.port = cp.port // a hash to hold named queries this.namedQueries = {} }) Client.Query = NativeQuery util.inherits(Client, EventEmitter) Client.prototype._errorAllQueries = function (err) { const enqueueError = (query) => { process.nextTick(() => { query.native = this.native query.handleError(err) }) } if (this._hasActiveQuery()) { enqueueError(this._activeQuery) this._activeQuery = null } this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 } // connect to the backend // pass an optional callback to be called once connected // or with an error if there was a connection error Client.prototype._connect = function (cb) { var self = this if (this._connecting) { process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.'))) return } this._connecting = true this.connectionParameters.getLibpqConnectionString(function (err, conString) { if (err) return cb(err) self.native.connect(conString, function (err) { if (err) { self.native.end() return cb(err) } // set internal states to connected self._connected = true // handle connection errors from the native layer self.native.on('error', function (err) { self._queryable = false self._errorAllQueries(err) self.emit('error', err) }) self.native.on('notification', function (msg) { self.emit('notification', { channel: msg.relname, payload: msg.extra, }) }) // signal we are connected now self.emit('connect') self._pulseQueryQueue(true) cb() }) }) } Client.prototype.connect = function (callback) { if (callback) { this._connect(callback) return } return new this._Promise((resolve, reject) => { this._connect((error) => { if (error) { reject(error) } else { resolve() } }) }) } // send a query to the server // this method is highly overloaded to take // 1) string query, optional array of parameters, optional function callback // 2) object query with { // string query // optional array values, // optional function callback instead of as a separate parameter // optional string name to name & cache the query plan // optional string rowMode = 'array' for an array of results // } Client.prototype.query = function (config, values, callback) { var query var result var readTimeout var readTimeoutTimer var queryCallback if (config === null || config === undefined) { throw new TypeError('Client was passed a null or undefined query') } else if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config // accept query(new Query(...), (err, res) => { }) style if (typeof values === 'function') { config.callback = values } } else { readTimeout = this.connectionParameters.query_timeout query = new NativeQuery(config, values, callback) if (!query.callback) { let resolveOut, rejectOut result = new this._Promise((resolve, reject) => { resolveOut = resolve rejectOut = reject }) query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res)) } } if (readTimeout) { queryCallback = query.callback readTimeoutTimer = setTimeout(() => { var error = new Error('Query read timeout') process.nextTick(() => { query.handleError(error, this.connection) }) queryCallback(error) // we already returned an error, // just do nothing if query completes query.callback = () => {} // Remove from queue var index = this._queryQueue.indexOf(query) if (index > -1) { this._queryQueue.splice(index, 1) } this._pulseQueryQueue() }, readTimeout) query.callback = (err, res) => { clearTimeout(readTimeoutTimer) queryCallback(err, res) } } if (!this._queryable) { query.native = this.native process.nextTick(() => { query.handleError(new Error('Client has encountered a connection error and is not queryable')) }) return result } if (this._ending) { query.native = this.native process.nextTick(() => { query.handleError(new Error('Client was closed and is not queryable')) }) return result } this._queryQueue.push(query) this._pulseQueryQueue() return result } // disconnect from the backend server Client.prototype.end = function (cb) { var self = this this._ending = true if (!this._connected) { this.once('connect', this.end.bind(this, cb)) } var result if (!cb) { result = new this._Promise(function (resolve, reject) { cb = (err) => (err ? reject(err) : resolve()) }) } this.native.end(function () { self._errorAllQueries(new Error('Connection terminated')) process.nextTick(() => { self.emit('end') if (cb) cb() }) }) return result } Client.prototype._hasActiveQuery = function () { return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end' } Client.prototype._pulseQueryQueue = function (initialConnection) { if (!this._connected) { return } if (this._hasActiveQuery()) { return } var query = this._queryQueue.shift() if (!query) { if (!initialConnection) { this.emit('drain') } return } this._activeQuery = query query.submit(this) var self = this query.once('_done', function () { self._pulseQueryQueue() }) } // attempt to cancel an in-progress query Client.prototype.cancel = function (query) { if (this._activeQuery === query) { this.native.cancel(function () {}) } else if (this._queryQueue.indexOf(query) !== -1) { this._queryQueue.splice(this._queryQueue.indexOf(query), 1) } } Client.prototype.setTypeParser = function (oid, format, parseFn) { return this._types.setTypeParser(oid, format, parseFn) } Client.prototype.getTypeParser = function (oid, format) { return this._types.getTypeParser(oid, format) }