/** * @method stream.read * @description * Consumes and processes data from a $[Readable] stream. * * It reads the entire stream, using either **paused mode** (default), or in chunks (see `options.readChunks`) * with support for both synchronous and asynchronous data processing. * * **NOTE:** Once the method has finished, the onus is on the caller to release the stream * according to its protocol. * * @param {Object} stream * $[Readable] stream object. * * Passing in anything else will throw `Readable stream is required.` * * @param {Function|generator} receiver * Data processing callback (or generator). * * Passing in anything else will throw `Invalid stream receiver.` * * Parameters: * - `index` = index of the call made to the function * - `data` = array of all data reads from the stream's buffer * - `delay` = number of milliseconds since the last call (`undefined` when `index=0`) * * The function is called with the same `this` context as the calling method. * * It can optionally return a promise object, if data processing is asynchronous. * And if a promise is returned, the method will not read data from the stream again, * until the promise has been resolved. * * If the function throws an error or returns a rejected promise, the method rejects * with the same error / rejection reason. * * @param {Object} [options] * Optional Parameters. * * @param {Boolean} [options.closable=false] * Instructs the method to resolve on event `close` supported by the stream, as opposed to event * `end` that's used by default. * * @param {Boolean} [options.readChunks=false] * By default, the method handles event `readable` of the stream to consume data in a simplified form, * item by item. If you enable this option, the method will instead handle event `data` of the stream, * to consume chunks of data. * * @param {Number} [options.readSize] * When the value is greater than 0, it sets the read size from the stream's buffer * when the next data is available. By default, the method uses as few reads as possible * to get all the data currently available in the buffer. * * NOTE: This option is ignored when option `readChunks` is enabled. * * @returns {external:Promise} * * When finished successfully, resolves with object `{calls, reads, length, duration}`: * - `calls` = number of calls made into the `receiver` * - `reads` = number of successful reads from the stream * - `length` = total length for all the data reads from the stream * - `duration` = number of milliseconds consumed by the method * * When it fails, the method rejects with the error/reject specified, * which can happen as a result of: * - event `error` emitted by the stream * - receiver throws an error or returns a rejected promise */ function read(stream, receiver, options, config) { const $p = config.promise, utils = config.utils; if (!utils.isReadableStream(stream)) { return $p.reject(new TypeError('Readable stream is required.')); } if (typeof receiver !== 'function') { return $p.reject(new TypeError('Invalid stream receiver.')); } receiver = utils.wrap(receiver); options = options || {}; const readSize = (options.readSize > 0) ? parseInt(options.readSize) : null, self = this, start = Date.now(), receiveEvent = options.readChunks ? 'data' : 'readable'; let cbTime, ready, waiting, stop, reads = 0, length = 0, index = 0; return $p((resolve, reject) => { function onReceive(data) { ready = true; process(data); } function onEnd() { if (!options.closable) { success(); } } function onClose() { success(); } function onError(error) { fail(error); } stream.on(receiveEvent, onReceive); stream.on('end', onEnd); stream.on('close', onClose); stream.on('error', onError); function process(data) { if (!ready || stop || waiting) { return; } ready = false; let cache; if (options.readChunks) { cache = data; // istanbul ignore else; // we cannot test the else condition, as it requires a special broken stream interface. if (!Array.isArray(cache)) { cache = [cache]; } length += cache.length; reads++; } else { cache = []; waiting = true; let page; do { page = stream.read(readSize); if (page) { cache.push(page); // istanbul ignore next: requires a unique stream that // creates objects without property `length` defined. length += page.length || 0; reads++; } } while (page); if (!cache.length) { waiting = false; return; } } const cbNow = Date.now(), cbDelay = index ? (cbNow - cbTime) : undefined; let result; cbTime = cbNow; try { result = receiver.call(self, index++, cache, cbDelay); } catch (e) { fail(e); return; } if (utils.isPromise(result)) { result .then(() => { waiting = false; process(); return null; // this dummy return is just to prevent Bluebird warnings; }) .catch(error => { fail(error); }); } else { waiting = false; process(); } } function success() { cleanup(); resolve({ calls: index, reads: reads, length: length, duration: Date.now() - start }); } function fail(error) { stop = true; cleanup(); reject(error); } function cleanup() { stream.removeListener(receiveEvent, onReceive); stream.removeListener('close', onClose); stream.removeListener('error', onError); stream.removeListener('end', onEnd); } }); } module.exports = function (config) { return function (stream, receiver, options) { return read.call(this, stream, receiver, options, config); }; };