Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: migrate to streamx #893

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 53 additions & 63 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const debug = require('debug')('simple-peer')
const getBrowserRTC = require('get-browser-rtc')
const randombytes = require('randombytes')
const stream = require('readable-stream')
const { Duplex } = require('streamx')
const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported
const errCode = require('err-code')
const { Buffer } = require('buffer')
Expand All @@ -25,14 +25,16 @@ function warn (message) {
* Duplex stream.
* @param {Object} opts
*/
class Peer extends stream.Duplex {
class Peer extends Duplex {
constructor (opts) {
opts = Object.assign({
allowHalfOpen: false
}, opts)

super(opts)

this.__objectMode = !!opts.objectMode // streamx is objectMode by default, so implement readable's fuctionality

this._id = randombytes(4).toString('hex').slice(0, 7)
this._debug('new peer %o', opts)

Expand All @@ -52,8 +54,7 @@ class Peer extends stream.Duplex {
this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false
this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT

this.destroyed = false
this.destroying = false
this._destroying = false
this._connected = false

this.remoteAddress = undefined
Expand Down Expand Up @@ -100,7 +101,7 @@ class Peer extends stream.Duplex {
try {
this._pc = new (this._wrtc.RTCPeerConnection)(this.config)
} catch (err) {
this.destroy(errCode(err, 'ERR_PC_CONSTRUCTOR'))
this.__destroy(errCode(err, 'ERR_PC_CONSTRUCTOR'))
return
}

Expand All @@ -127,7 +128,7 @@ class Peer extends stream.Duplex {
// HACK: Fix for odd Firefox behavior, see: https://github.com/feross/simple-peer/pull/783
if (typeof this._pc.peerIdentity === 'object') {
this._pc.peerIdentity.catch(err => {
this.destroy(errCode(err, 'ERR_PC_PEER_IDENTITY'))
this.__destroy(errCode(err, 'ERR_PC_PEER_IDENTITY'))
})
}

Expand Down Expand Up @@ -180,7 +181,7 @@ class Peer extends stream.Duplex {
}

signal (data) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED')
if (typeof data === 'string') {
try {
Expand Down Expand Up @@ -219,11 +220,11 @@ class Peer extends stream.Duplex {
if (this._pc.remoteDescription.type === 'offer') this._createAnswer()
})
.catch(err => {
this.destroy(errCode(err, 'ERR_SET_REMOTE_DESCRIPTION'))
this.__destroy(errCode(err, 'ERR_SET_REMOTE_DESCRIPTION'))
})
}
if (!data.sdp && !data.candidate && !data.renegotiate && !data.transceiverRequest) {
this.destroy(errCode(new Error('signal() called with invalid signal data'), 'ERR_SIGNALING'))
this.__destroy(errCode(new Error('signal() called with invalid signal data'), 'ERR_SIGNALING'))
}
}

Expand All @@ -234,7 +235,7 @@ class Peer extends stream.Duplex {
if (!iceCandidateObj.address || iceCandidateObj.address.endsWith('.local')) {
warn('Ignoring unsupported ICE candidate.')
} else {
this.destroy(errCode(err, 'ERR_ADD_ICE_CANDIDATE'))
this.__destroy(errCode(err, 'ERR_ADD_ICE_CANDIDATE'))
}
})
}
Expand All @@ -244,7 +245,7 @@ class Peer extends stream.Duplex {
* @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob} chunk
*/
send (chunk) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED')
this._channel.send(chunk)
}
Expand All @@ -255,7 +256,7 @@ class Peer extends stream.Duplex {
* @param {Object} init
*/
addTransceiver (kind, init) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTransceiver()')

Expand All @@ -264,7 +265,7 @@ class Peer extends stream.Duplex {
this._pc.addTransceiver(kind, init)
this._needsNegotiation()
} catch (err) {
this.destroy(errCode(err, 'ERR_ADD_TRANSCEIVER'))
this.__destroy(errCode(err, 'ERR_ADD_TRANSCEIVER'))
}
} else {
this.emit('signal', { // request initiator to renegotiate
Expand All @@ -279,7 +280,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addStream (stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addStream()')

Expand All @@ -294,7 +295,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addTrack (track, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTrack()')

Expand All @@ -319,7 +320,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
replaceTrack (oldTrack, newTrack, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('replaceTrack()')

Expand All @@ -333,7 +334,7 @@ class Peer extends stream.Duplex {
if (sender.replaceTrack != null) {
sender.replaceTrack(newTrack)
} else {
this.destroy(errCode(new Error('replaceTrack is not supported in this browser'), 'ERR_UNSUPPORTED_REPLACETRACK'))
this.__destroy(errCode(new Error('replaceTrack is not supported in this browser'), 'ERR_UNSUPPORTED_REPLACETRACK'))
}
}

Expand All @@ -343,7 +344,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeTrack (track, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSender()')

Expand All @@ -359,7 +360,7 @@ class Peer extends stream.Duplex {
if (err.name === 'NS_ERROR_UNEXPECTED') {
this._sendersAwaitingStable.push(sender) // HACK: Firefox must wait until (signalingState === stable) https://bugzilla.mozilla.org/show_bug.cgi?id=1133874
} else {
this.destroy(errCode(err, 'ERR_REMOVE_TRACK'))
this.__destroy(errCode(err, 'ERR_REMOVE_TRACK'))
}
}
this._needsNegotiation()
Expand All @@ -370,7 +371,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeStream (stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSenders()')

Expand All @@ -396,7 +397,7 @@ class Peer extends stream.Duplex {
}

negotiate () {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED')

if (this.initiator) {
Expand Down Expand Up @@ -424,30 +425,23 @@ class Peer extends stream.Duplex {
this._isNegotiating = true
}

// TODO: Delete this method once readable-stream is updated to contain a default
// implementation of destroy() that automatically calls _destroy()
// See: https://github.com/nodejs/readable-stream/issues/283
destroy (err) {
this._destroy(err, () => {})
_final (cb) {
if (!this._readableState.ended) this.push(null)
cb(null)
}

_destroy (err, cb) {
if (this.destroyed || this.destroying) return
this.destroying = true

this._debug('destroying (error: %s)', err && (err.message || err))

queueMicrotask(() => { // allow events concurrent with the call to _destroy() to fire (see #692)
this.destroyed = true
this.destroying = false

this._debug('destroy (error: %s)', err && (err.message || err))
__destroy (err) {
this.end()
this._destroy(() => {}, err)
}

this.readable = this.writable = false
_destroy (cb, err) {
if (this.destroyed || this._destroying) return
this._destroying = true

if (!this._readableState.ended) this.push(null)
if (!this._writableState.finished) this.end()
this._debug('destroying (error: %s)', err && (err.message || err))

setTimeout(() => { // allow events concurrent with the call to _destroy() to fire (see #692)
this._connected = false
this._pcReady = false
this._channelReady = false
Expand Down Expand Up @@ -492,19 +486,17 @@ class Peer extends stream.Duplex {
}
this._pc = null
this._channel = null

if (err) this.emit('error', err)
this.emit('close')
cb()
})
}, 0)
}

_setupData (event) {
if (!event.channel) {
// In some situations `pc.createDataChannel()` returns `undefined` (in wrtc),
// which is invalid behavior. Handle it gracefully.
// See: https://github.com/feross/simple-peer/issues/163
return this.destroy(errCode(new Error('Data channel event is missing `channel` property'), 'ERR_DATA_CHANNEL'))
return this.__destroy(errCode(new Error('Data channel event is missing `channel` property'), 'ERR_DATA_CHANNEL'))
}

this._channel = event.channel
Expand Down Expand Up @@ -532,7 +524,7 @@ class Peer extends stream.Duplex {
const err = event.error instanceof Error
? event.error
: new Error(`Datachannel error: ${event.message} ${event.filename}:${event.lineno}:${event.colno}`)
this.destroy(errCode(err, 'ERR_DATA_CHANNEL'))
this.__destroy(errCode(err, 'ERR_DATA_CHANNEL'))
}

// HACK: Chrome will sometimes get stuck in readyState "closing", let's check for this condition
Expand All @@ -548,16 +540,14 @@ class Peer extends stream.Duplex {
}, CHANNEL_CLOSING_TIMEOUT)
}

_read () {}

_write (chunk, encoding, cb) {
_write (chunk, cb) {
if (this.destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL'))

if (this._connected) {
try {
this.send(chunk)
} catch (err) {
return this.destroy(errCode(err, 'ERR_DATA_CHANNEL'))
return this.__destroy(errCode(err, 'ERR_DATA_CHANNEL'))
}
if (this._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) {
this._debug('start backpressure: bufferedAmount %d', this._channel.bufferedAmount)
Expand All @@ -580,7 +570,7 @@ class Peer extends stream.Duplex {
// Wait a bit before destroying so the socket flushes.
// TODO: is there a more reliable way to accomplish this?
const destroySoon = () => {
setTimeout(() => this.destroy(), 1000)
setTimeout(() => this.__destroy(), 1000)
}

if (this._connected) {
Expand Down Expand Up @@ -631,15 +621,15 @@ class Peer extends stream.Duplex {
}

const onError = err => {
this.destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
this.__destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
}

this._pc.setLocalDescription(offer)
.then(onSuccess)
.catch(onError)
})
.catch(err => {
this.destroy(errCode(err, 'ERR_CREATE_OFFER'))
this.__destroy(errCode(err, 'ERR_CREATE_OFFER'))
})
}

Expand Down Expand Up @@ -681,22 +671,22 @@ class Peer extends stream.Duplex {
}

const onError = err => {
this.destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
this.__destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
}

this._pc.setLocalDescription(answer)
.then(onSuccess)
.catch(onError)
})
.catch(err => {
this.destroy(errCode(err, 'ERR_CREATE_ANSWER'))
this.__destroy(errCode(err, 'ERR_CREATE_ANSWER'))
})
}

_onConnectionStateChange () {
if (this.destroyed) return
if (this.destroyed || this._destroying) return
if (this._pc.connectionState === 'failed') {
this.destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE'))
this.__destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE'))
}
}

Expand All @@ -717,10 +707,10 @@ class Peer extends stream.Duplex {
this._maybeReady()
}
if (iceConnectionState === 'failed') {
this.destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE'))
this.__destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE'))
}
if (iceConnectionState === 'closed') {
this.destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED'))
this.__destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED'))
}
}

Expand Down Expand Up @@ -781,10 +771,10 @@ class Peer extends stream.Duplex {

// HACK: We can't rely on order here, for details see https://github.com/js-platform/node-webrtc/issues/339
const findCandidatePair = () => {
if (this.destroyed) return
if (this.destroyed || this._destroying) return

this.getStats((err, items) => {
if (this.destroyed) return
if (this.destroyed || this._destroying) return

// Treat getStats error as non-fatal. It's not essential.
if (err) items = []
Expand Down Expand Up @@ -889,7 +879,7 @@ class Peer extends stream.Duplex {
try {
this.send(this._chunk)
} catch (err) {
return this.destroy(errCode(err, 'ERR_DATA_CHANNEL'))
return this.__destroy(errCode(err, 'ERR_DATA_CHANNEL'))
}
this._chunk = null
this._debug('sent chunk from "write before connect"')
Expand Down Expand Up @@ -972,7 +962,7 @@ class Peer extends stream.Duplex {
_onChannelMessage (event) {
if (this.destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
if (data instanceof ArrayBuffer || this.__objectMode === false) data = Buffer.from(data)
this.push(data)
}

Expand All @@ -994,7 +984,7 @@ class Peer extends stream.Duplex {
_onChannelClose () {
if (this.destroyed) return
this._debug('on channel close')
this.destroy()
this.__destroy()
}

_onTrack (event) {
Expand Down
Loading