You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
307 lines
7.7 KiB
307 lines
7.7 KiB
5 years ago
|
'use strict'
|
||
|
/**
|
||
|
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
|
||
|
* All rights reserved.
|
||
|
*
|
||
|
* This source code is licensed under the MIT license found in the
|
||
|
* README.md file in the root directory of this source tree.
|
||
|
*/
|
||
|
|
||
|
// eslint-disable-next-line
|
||
|
var Native = require('pg-native')
|
||
|
var TypeOverrides = require('../type-overrides')
|
||
|
var semver = require('semver')
|
||
|
var pkg = require('../../package.json')
|
||
|
var assert = require('assert')
|
||
|
var EventEmitter = require('events').EventEmitter
|
||
|
var util = require('util')
|
||
|
var ConnectionParameters = require('../connection-parameters')
|
||
|
|
||
|
var msg = 'Version >= ' + pkg.minNativeVersion + ' of pg-native required.'
|
||
|
assert(semver.gte(Native.version, pkg.minNativeVersion), msg)
|
||
|
|
||
|
var NativeQuery = require('./query')
|
||
|
|
||
|
var Client = (module.exports = function (config) {
|
||
|
EventEmitter.call(this)
|
||
|
config = config || {}
|
||
|
|
||
|
this._Promise = config.Promise || global.Promise
|
||
|
this._types = new TypeOverrides(config.types)
|
||
|
|
||
|
this.native = new Native({
|
||
|
types: this._types,
|
||
|
})
|
||
|
|
||
|
this._queryQueue = []
|
||
|
this._ending = false
|
||
|
this._connecting = false
|
||
|
this._connected = false
|
||
|
this._queryable = true
|
||
|
|
||
|
// keep these on the object for legacy reasons
|
||
|
// for the time being. TODO: deprecate all this jazz
|
||
|
var cp = (this.connectionParameters = new ConnectionParameters(config))
|
||
|
this.user = cp.user
|
||
|
|
||
|
// "hiding" the password so it doesn't show up in stack traces
|
||
|
// or if the client is console.logged
|
||
|
Object.defineProperty(this, 'password', {
|
||
|
configurable: true,
|
||
|
enumerable: false,
|
||
|
writable: true,
|
||
|
value: cp.password,
|
||
|
})
|
||
|
this.database = cp.database
|
||
|
this.host = cp.host
|
||
|
this.port = cp.port
|
||
|
|
||
|
// a hash to hold named queries
|
||
|
this.namedQueries = {}
|
||
|
})
|
||
|
|
||
|
Client.Query = NativeQuery
|
||
|
|
||
|
util.inherits(Client, EventEmitter)
|
||
|
|
||
|
Client.prototype._errorAllQueries = function (err) {
|
||
|
const enqueueError = (query) => {
|
||
|
process.nextTick(() => {
|
||
|
query.native = this.native
|
||
|
query.handleError(err)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
if (this._hasActiveQuery()) {
|
||
|
enqueueError(this._activeQuery)
|
||
|
this._activeQuery = null
|
||
|
}
|
||
|
|
||
|
this._queryQueue.forEach(enqueueError)
|
||
|
this._queryQueue.length = 0
|
||
|
}
|
||
|
|
||
|
// connect to the backend
|
||
|
// pass an optional callback to be called once connected
|
||
|
// or with an error if there was a connection error
|
||
|
Client.prototype._connect = function (cb) {
|
||
|
var self = this
|
||
|
|
||
|
if (this._connecting) {
|
||
|
process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
this._connecting = true
|
||
|
|
||
|
this.connectionParameters.getLibpqConnectionString(function (err, conString) {
|
||
|
if (err) return cb(err)
|
||
|
self.native.connect(conString, function (err) {
|
||
|
if (err) {
|
||
|
self.native.end()
|
||
|
return cb(err)
|
||
|
}
|
||
|
|
||
|
// set internal states to connected
|
||
|
self._connected = true
|
||
|
|
||
|
// handle connection errors from the native layer
|
||
|
self.native.on('error', function (err) {
|
||
|
self._queryable = false
|
||
|
self._errorAllQueries(err)
|
||
|
self.emit('error', err)
|
||
|
})
|
||
|
|
||
|
self.native.on('notification', function (msg) {
|
||
|
self.emit('notification', {
|
||
|
channel: msg.relname,
|
||
|
payload: msg.extra,
|
||
|
})
|
||
|
})
|
||
|
|
||
|
// signal we are connected now
|
||
|
self.emit('connect')
|
||
|
self._pulseQueryQueue(true)
|
||
|
|
||
|
cb()
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
Client.prototype.connect = function (callback) {
|
||
|
if (callback) {
|
||
|
this._connect(callback)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return new this._Promise((resolve, reject) => {
|
||
|
this._connect((error) => {
|
||
|
if (error) {
|
||
|
reject(error)
|
||
|
} else {
|
||
|
resolve()
|
||
|
}
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// send a query to the server
|
||
|
// this method is highly overloaded to take
|
||
|
// 1) string query, optional array of parameters, optional function callback
|
||
|
// 2) object query with {
|
||
|
// string query
|
||
|
// optional array values,
|
||
|
// optional function callback instead of as a separate parameter
|
||
|
// optional string name to name & cache the query plan
|
||
|
// optional string rowMode = 'array' for an array of results
|
||
|
// }
|
||
|
Client.prototype.query = function (config, values, callback) {
|
||
|
var query
|
||
|
var result
|
||
|
var readTimeout
|
||
|
var readTimeoutTimer
|
||
|
var queryCallback
|
||
|
|
||
|
if (config === null || config === undefined) {
|
||
|
throw new TypeError('Client was passed a null or undefined query')
|
||
|
} else if (typeof config.submit === 'function') {
|
||
|
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
|
||
|
result = query = config
|
||
|
// accept query(new Query(...), (err, res) => { }) style
|
||
|
if (typeof values === 'function') {
|
||
|
config.callback = values
|
||
|
}
|
||
|
} else {
|
||
|
readTimeout = this.connectionParameters.query_timeout
|
||
|
query = new NativeQuery(config, values, callback)
|
||
|
if (!query.callback) {
|
||
|
let resolveOut, rejectOut
|
||
|
result = new this._Promise((resolve, reject) => {
|
||
|
resolveOut = resolve
|
||
|
rejectOut = reject
|
||
|
})
|
||
|
query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (readTimeout) {
|
||
|
queryCallback = query.callback
|
||
|
|
||
|
readTimeoutTimer = setTimeout(() => {
|
||
|
var error = new Error('Query read timeout')
|
||
|
|
||
|
process.nextTick(() => {
|
||
|
query.handleError(error, this.connection)
|
||
|
})
|
||
|
|
||
|
queryCallback(error)
|
||
|
|
||
|
// we already returned an error,
|
||
|
// just do nothing if query completes
|
||
|
query.callback = () => {}
|
||
|
|
||
|
// Remove from queue
|
||
|
var index = this._queryQueue.indexOf(query)
|
||
|
if (index > -1) {
|
||
|
this._queryQueue.splice(index, 1)
|
||
|
}
|
||
|
|
||
|
this._pulseQueryQueue()
|
||
|
}, readTimeout)
|
||
|
|
||
|
query.callback = (err, res) => {
|
||
|
clearTimeout(readTimeoutTimer)
|
||
|
queryCallback(err, res)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!this._queryable) {
|
||
|
query.native = this.native
|
||
|
process.nextTick(() => {
|
||
|
query.handleError(new Error('Client has encountered a connection error and is not queryable'))
|
||
|
})
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
if (this._ending) {
|
||
|
query.native = this.native
|
||
|
process.nextTick(() => {
|
||
|
query.handleError(new Error('Client was closed and is not queryable'))
|
||
|
})
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
this._queryQueue.push(query)
|
||
|
this._pulseQueryQueue()
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
// disconnect from the backend server
|
||
|
Client.prototype.end = function (cb) {
|
||
|
var self = this
|
||
|
|
||
|
this._ending = true
|
||
|
|
||
|
if (!this._connected) {
|
||
|
this.once('connect', this.end.bind(this, cb))
|
||
|
}
|
||
|
var result
|
||
|
if (!cb) {
|
||
|
result = new this._Promise(function (resolve, reject) {
|
||
|
cb = (err) => (err ? reject(err) : resolve())
|
||
|
})
|
||
|
}
|
||
|
this.native.end(function () {
|
||
|
self._errorAllQueries(new Error('Connection terminated'))
|
||
|
|
||
|
process.nextTick(() => {
|
||
|
self.emit('end')
|
||
|
if (cb) cb()
|
||
|
})
|
||
|
})
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
Client.prototype._hasActiveQuery = function () {
|
||
|
return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'
|
||
|
}
|
||
|
|
||
|
Client.prototype._pulseQueryQueue = function (initialConnection) {
|
||
|
if (!this._connected) {
|
||
|
return
|
||
|
}
|
||
|
if (this._hasActiveQuery()) {
|
||
|
return
|
||
|
}
|
||
|
var query = this._queryQueue.shift()
|
||
|
if (!query) {
|
||
|
if (!initialConnection) {
|
||
|
this.emit('drain')
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
this._activeQuery = query
|
||
|
query.submit(this)
|
||
|
var self = this
|
||
|
query.once('_done', function () {
|
||
|
self._pulseQueryQueue()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// attempt to cancel an in-progress query
|
||
|
Client.prototype.cancel = function (query) {
|
||
|
if (this._activeQuery === query) {
|
||
|
this.native.cancel(function () {})
|
||
|
} else if (this._queryQueue.indexOf(query) !== -1) {
|
||
|
this._queryQueue.splice(this._queryQueue.indexOf(query), 1)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Client.prototype.setTypeParser = function (oid, format, parseFn) {
|
||
|
return this._types.setTypeParser(oid, format, parseFn)
|
||
|
}
|
||
|
|
||
|
Client.prototype.getTypeParser = function (oid, format) {
|
||
|
return this._types.getTypeParser(oid, format)
|
||
|
}
|