From 64b22885807a114911397cfb176429ff4c2fceb5 Mon Sep 17 00:00:00 2001 From: ThaUnknown <6506529+ThaUnknown@users.noreply.github.com> Date: Fri, 26 May 2023 22:53:03 +0200 Subject: [PATCH] fix: migrate to streamx --- index.js | 116 ++++++++++++++++++++++--------------------------- package.json | 4 +- test/common.js | 2 +- test/stream.js | 8 ++-- 4 files changed, 60 insertions(+), 70 deletions(-) diff --git a/index.js b/index.js index 0ee8051a..55acd5be 100644 --- a/index.js +++ b/index.js @@ -2,7 +2,7 @@ const debug = require('debug')('simple-peer') const getBrowserRTC = require('get-browser-rtc') const randombytes = require('randombytes') -const stream = require('readable-stream') +const { Duplex } = require('streamx') const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported const errCode = require('err-code') const { Buffer } = require('buffer') @@ -25,7 +25,7 @@ function warn (message) { * Duplex stream. * @param {Object} opts */ -class Peer extends stream.Duplex { +class Peer extends Duplex { constructor (opts) { opts = Object.assign({ allowHalfOpen: false @@ -33,6 +33,8 @@ class Peer extends stream.Duplex { super(opts) + this.__objectMode = !!opts.objectMode // streamx is objectMode by default, so implement readable's fuctionality + this._id = randombytes(4).toString('hex').slice(0, 7) this._debug('new peer %o', opts) @@ -52,8 +54,7 @@ class Peer extends stream.Duplex { this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT - this.destroyed = false - this.destroying = false + this._destroying = false this._connected = false this.remoteAddress = undefined @@ -100,7 +101,7 @@ class Peer extends stream.Duplex { try { this._pc = new (this._wrtc.RTCPeerConnection)(this.config) } catch (err) { - this.destroy(errCode(err, 'ERR_PC_CONSTRUCTOR')) + this.__destroy(errCode(err, 'ERR_PC_CONSTRUCTOR')) return } @@ -127,7 +128,7 @@ class Peer extends stream.Duplex { // HACK: Fix for odd Firefox behavior, see: https://github.com/feross/simple-peer/pull/783 if (typeof this._pc.peerIdentity === 'object') { this._pc.peerIdentity.catch(err => { - this.destroy(errCode(err, 'ERR_PC_PEER_IDENTITY')) + this.__destroy(errCode(err, 'ERR_PC_PEER_IDENTITY')) }) } @@ -180,7 +181,7 @@ class Peer extends stream.Duplex { } signal (data) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED') if (typeof data === 'string') { try { @@ -219,11 +220,11 @@ class Peer extends stream.Duplex { if (this._pc.remoteDescription.type === 'offer') this._createAnswer() }) .catch(err => { - this.destroy(errCode(err, 'ERR_SET_REMOTE_DESCRIPTION')) + this.__destroy(errCode(err, 'ERR_SET_REMOTE_DESCRIPTION')) }) } if (!data.sdp && !data.candidate && !data.renegotiate && !data.transceiverRequest) { - this.destroy(errCode(new Error('signal() called with invalid signal data'), 'ERR_SIGNALING')) + this.__destroy(errCode(new Error('signal() called with invalid signal data'), 'ERR_SIGNALING')) } } @@ -234,7 +235,7 @@ class Peer extends stream.Duplex { if (!iceCandidateObj.address || iceCandidateObj.address.endsWith('.local')) { warn('Ignoring unsupported ICE candidate.') } else { - this.destroy(errCode(err, 'ERR_ADD_ICE_CANDIDATE')) + this.__destroy(errCode(err, 'ERR_ADD_ICE_CANDIDATE')) } }) } @@ -244,7 +245,7 @@ class Peer extends stream.Duplex { * @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob} chunk */ send (chunk) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED') this._channel.send(chunk) } @@ -255,7 +256,7 @@ class Peer extends stream.Duplex { * @param {Object} init */ addTransceiver (kind, init) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED') this._debug('addTransceiver()') @@ -264,7 +265,7 @@ class Peer extends stream.Duplex { this._pc.addTransceiver(kind, init) this._needsNegotiation() } catch (err) { - this.destroy(errCode(err, 'ERR_ADD_TRANSCEIVER')) + this.__destroy(errCode(err, 'ERR_ADD_TRANSCEIVER')) } } else { this.emit('signal', { // request initiator to renegotiate @@ -279,7 +280,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ addStream (stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED') this._debug('addStream()') @@ -294,7 +295,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ addTrack (track, stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED') this._debug('addTrack()') @@ -319,7 +320,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ replaceTrack (oldTrack, newTrack, stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED') this._debug('replaceTrack()') @@ -333,7 +334,7 @@ class Peer extends stream.Duplex { if (sender.replaceTrack != null) { sender.replaceTrack(newTrack) } else { - this.destroy(errCode(new Error('replaceTrack is not supported in this browser'), 'ERR_UNSUPPORTED_REPLACETRACK')) + this.__destroy(errCode(new Error('replaceTrack is not supported in this browser'), 'ERR_UNSUPPORTED_REPLACETRACK')) } } @@ -343,7 +344,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ removeTrack (track, stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED') this._debug('removeSender()') @@ -359,7 +360,7 @@ class Peer extends stream.Duplex { if (err.name === 'NS_ERROR_UNEXPECTED') { this._sendersAwaitingStable.push(sender) // HACK: Firefox must wait until (signalingState === stable) https://bugzilla.mozilla.org/show_bug.cgi?id=1133874 } else { - this.destroy(errCode(err, 'ERR_REMOVE_TRACK')) + this.__destroy(errCode(err, 'ERR_REMOVE_TRACK')) } } this._needsNegotiation() @@ -370,7 +371,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ removeStream (stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED') this._debug('removeSenders()') @@ -396,7 +397,7 @@ class Peer extends stream.Duplex { } negotiate () { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED') if (this.initiator) { @@ -424,30 +425,23 @@ class Peer extends stream.Duplex { this._isNegotiating = true } - // TODO: Delete this method once readable-stream is updated to contain a default - // implementation of destroy() that automatically calls _destroy() - // See: https://github.com/nodejs/readable-stream/issues/283 - destroy (err) { - this._destroy(err, () => {}) + _final (cb) { + if (!this._readableState.ended) this.push(null) + cb(null) } - _destroy (err, cb) { - if (this.destroyed || this.destroying) return - this.destroying = true - - this._debug('destroying (error: %s)', err && (err.message || err)) - - queueMicrotask(() => { // allow events concurrent with the call to _destroy() to fire (see #692) - this.destroyed = true - this.destroying = false - - this._debug('destroy (error: %s)', err && (err.message || err)) + __destroy (err) { + this.end() + this._destroy(() => {}, err) + } - this.readable = this.writable = false + _destroy (cb, err) { + if (this.destroyed || this._destroying) return + this._destroying = true - if (!this._readableState.ended) this.push(null) - if (!this._writableState.finished) this.end() + this._debug('destroying (error: %s)', err && (err.message || err)) + setTimeout(() => { // allow events concurrent with the call to _destroy() to fire (see #692) this._connected = false this._pcReady = false this._channelReady = false @@ -492,11 +486,9 @@ class Peer extends stream.Duplex { } this._pc = null this._channel = null - if (err) this.emit('error', err) - this.emit('close') cb() - }) + }, 0) } _setupData (event) { @@ -504,7 +496,7 @@ class Peer extends stream.Duplex { // In some situations `pc.createDataChannel()` returns `undefined` (in wrtc), // which is invalid behavior. Handle it gracefully. // See: https://github.com/feross/simple-peer/issues/163 - return this.destroy(errCode(new Error('Data channel event is missing `channel` property'), 'ERR_DATA_CHANNEL')) + return this.__destroy(errCode(new Error('Data channel event is missing `channel` property'), 'ERR_DATA_CHANNEL')) } this._channel = event.channel @@ -532,7 +524,7 @@ class Peer extends stream.Duplex { const err = event.error instanceof Error ? event.error : new Error(`Datachannel error: ${event.message} ${event.filename}:${event.lineno}:${event.colno}`) - this.destroy(errCode(err, 'ERR_DATA_CHANNEL')) + this.__destroy(errCode(err, 'ERR_DATA_CHANNEL')) } // HACK: Chrome will sometimes get stuck in readyState "closing", let's check for this condition @@ -548,16 +540,14 @@ class Peer extends stream.Duplex { }, CHANNEL_CLOSING_TIMEOUT) } - _read () {} - - _write (chunk, encoding, cb) { + _write (chunk, cb) { if (this.destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL')) if (this._connected) { try { this.send(chunk) } catch (err) { - return this.destroy(errCode(err, 'ERR_DATA_CHANNEL')) + return this.__destroy(errCode(err, 'ERR_DATA_CHANNEL')) } if (this._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) { this._debug('start backpressure: bufferedAmount %d', this._channel.bufferedAmount) @@ -580,7 +570,7 @@ class Peer extends stream.Duplex { // Wait a bit before destroying so the socket flushes. // TODO: is there a more reliable way to accomplish this? const destroySoon = () => { - setTimeout(() => this.destroy(), 1000) + setTimeout(() => this.__destroy(), 1000) } if (this._connected) { @@ -631,7 +621,7 @@ class Peer extends stream.Duplex { } const onError = err => { - this.destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION')) + this.__destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION')) } this._pc.setLocalDescription(offer) @@ -639,7 +629,7 @@ class Peer extends stream.Duplex { .catch(onError) }) .catch(err => { - this.destroy(errCode(err, 'ERR_CREATE_OFFER')) + this.__destroy(errCode(err, 'ERR_CREATE_OFFER')) }) } @@ -681,7 +671,7 @@ class Peer extends stream.Duplex { } const onError = err => { - this.destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION')) + this.__destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION')) } this._pc.setLocalDescription(answer) @@ -689,14 +679,14 @@ class Peer extends stream.Duplex { .catch(onError) }) .catch(err => { - this.destroy(errCode(err, 'ERR_CREATE_ANSWER')) + this.__destroy(errCode(err, 'ERR_CREATE_ANSWER')) }) } _onConnectionStateChange () { - if (this.destroyed) return + if (this.destroyed || this._destroying) return if (this._pc.connectionState === 'failed') { - this.destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE')) + this.__destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE')) } } @@ -717,10 +707,10 @@ class Peer extends stream.Duplex { this._maybeReady() } if (iceConnectionState === 'failed') { - this.destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE')) + this.__destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE')) } if (iceConnectionState === 'closed') { - this.destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED')) + this.__destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED')) } } @@ -781,10 +771,10 @@ class Peer extends stream.Duplex { // HACK: We can't rely on order here, for details see https://github.com/js-platform/node-webrtc/issues/339 const findCandidatePair = () => { - if (this.destroyed) return + if (this.destroyed || this._destroying) return this.getStats((err, items) => { - if (this.destroyed) return + if (this.destroyed || this._destroying) return // Treat getStats error as non-fatal. It's not essential. if (err) items = [] @@ -889,7 +879,7 @@ class Peer extends stream.Duplex { try { this.send(this._chunk) } catch (err) { - return this.destroy(errCode(err, 'ERR_DATA_CHANNEL')) + return this.__destroy(errCode(err, 'ERR_DATA_CHANNEL')) } this._chunk = null this._debug('sent chunk from "write before connect"') @@ -972,7 +962,7 @@ class Peer extends stream.Duplex { _onChannelMessage (event) { if (this.destroyed) return let data = event.data - if (data instanceof ArrayBuffer) data = Buffer.from(data) + if (data instanceof ArrayBuffer || this.__objectMode === false) data = Buffer.from(data) this.push(data) } @@ -994,7 +984,7 @@ class Peer extends stream.Duplex { _onChannelClose () { if (this.destroyed) return this._debug('on channel close') - this.destroy() + this.__destroy() } _onTrack (event) { diff --git a/package.json b/package.json index aa4635cb..12b8aaed 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "get-browser-rtc": "^1.1.0", "queue-microtask": "^1.2.3", "randombytes": "^2.1.0", - "readable-stream": "^3.6.0" + "streamx": "^2.13.2" }, "devDependencies": { "airtap": "^4.0.3", @@ -66,7 +66,7 @@ "test": "standard && npm run test-browser", "test-browser": "airtap --coverage --concurrency 1 -- test/*.js", "test-browser-local": "airtap --coverage --preset local -- test/*.js", - "test-node": "WRTC=wrtc tape test/*.js", + "test-node": "set WRTC=wrtc && tape test/*.js", "coverage": "nyc report --reporter=text-lcov | coveralls" }, "funding": [ diff --git a/test/common.js b/test/common.js index 204c3024..e0440446 100644 --- a/test/common.js +++ b/test/common.js @@ -20,7 +20,7 @@ exports.getConfig = thunky(function (cb) { }) // For testing on node, we must provide a WebRTC implementation -if (process.env.WRTC === 'wrtc') { +if (process.env.WRTC?.startsWith('wrtc')) { exports.wrtc = require('wrtc') } diff --git a/test/stream.js b/test/stream.js index 55993987..36600f27 100644 --- a/test/stream.js +++ b/test/stream.js @@ -28,7 +28,7 @@ test('duplex stream: send data before "connect" event', function (t) { }) peer1.on('finish', function () { t.pass('got peer1 "finish"') - t.ok(peer1._writableState.finished) + t.ok(peer1._writableState.ended) }) peer1.on('end', function () { t.pass('got peer1 "end"') @@ -40,7 +40,7 @@ test('duplex stream: send data before "connect" event', function (t) { }) peer2.on('finish', function () { t.pass('got peer2 "finish"') - t.ok(peer2._writableState.finished) + t.ok(peer2._writableState.ended) }) peer2.on('end', function () { t.pass('got peer2 "end"') @@ -67,7 +67,7 @@ test('duplex stream: send data one-way', function (t) { }) peer1.on('finish', function () { t.pass('got peer1 "finish"') - t.ok(peer1._writableState.finished) + t.ok(peer1._writableState.ended) }) peer1.on('end', function () { t.pass('got peer1 "end"') @@ -79,7 +79,7 @@ test('duplex stream: send data one-way', function (t) { }) peer2.on('finish', function () { t.pass('got peer2 "finish"') - t.ok(peer2._writableState.finished) + t.ok(peer2._writableState.ended) }) peer2.on('end', function () { t.pass('got peer2 "end"')