|
- var original = require('original')
- , parse = require('url').parse
- , events = require('events')
- , https = require('https')
- , http = require('http')
- , util = require('util');
- function isPlainObject(obj) {
- return Object.getPrototypeOf(obj) === Object.prototype;
- }
- /**
- * Creates a new EventSource object
- *
- * @param {String} url the URL to which to connect
- * @param {Object} eventSourceInitDict extra init params. See README for details.
- * @api public
- **/
- function EventSource(url, eventSourceInitDict) {
- var readyState = EventSource.CONNECTING;
- Object.defineProperty(this, 'readyState', {
- get: function () {
- return readyState;
- }
- });
- Object.defineProperty(this, 'url', {
- get: function () {
- return url;
- }
- });
- var self = this;
- self.reconnectInterval = 1000;
- var connectPending = false;
- function onConnectionClosed() {
- if (connectPending || readyState === EventSource.CLOSED) return;
- connectPending = true;
- readyState = EventSource.CONNECTING;
- _emit('error', new Event('error'));
- // The url may have been changed by a temporary
- // redirect. If that's the case, revert it now.
- if (reconnectUrl) {
- url = reconnectUrl;
- reconnectUrl = null;
- }
- setTimeout(function () {
- if (readyState !== EventSource.CONNECTING) {
- return;
- }
- connect();
- }, self.reconnectInterval);
- }
- var req;
- var lastEventId = '';
- if (eventSourceInitDict && eventSourceInitDict.headers && isPlainObject(eventSourceInitDict.headers) && eventSourceInitDict.headers['Last-Event-ID']) {
- lastEventId = eventSourceInitDict.headers['Last-Event-ID'];
- delete eventSourceInitDict.headers['Last-Event-ID'];
- }
- var discardTrailingNewline = false
- , data = ''
- , eventName = '';
- var reconnectUrl = null;
- function connect() {
- connectPending = false;
- var options = parse(url);
- var isSecure = options.protocol == 'https:';
- options.headers = { 'Cache-Control': 'no-cache', 'Accept': 'text/event-stream' };
- if (lastEventId) options.headers['Last-Event-ID'] = lastEventId;
- if (eventSourceInitDict && eventSourceInitDict.headers && isPlainObject(eventSourceInitDict.headers)) {
- for (var i in eventSourceInitDict.headers) {
- var header = eventSourceInitDict.headers[i];
- if (header) {
- options.headers[i] = header;
- }
- }
- }
- options.rejectUnauthorized = !(eventSourceInitDict && eventSourceInitDict.rejectUnauthorized == false);
- req = (isSecure ? https : http).request(options, function (res) {
- // Handle HTTP redirects
- if (res.statusCode == 301 || res.statusCode == 307) {
- if (!res.headers.location) {
- // Server sent redirect response without Location header.
- _emit('error', new Event('error', {status: res.statusCode}));
- return;
- }
- if (res.statusCode == 307) reconnectUrl = url;
- url = res.headers.location;
- process.nextTick(connect);
- return;
- }
- if (res.statusCode !== 200) {
- _emit('error', new Event('error', {status: res.statusCode}));
- return self.close();
- }
- readyState = EventSource.OPEN;
- res.on('close', onConnectionClosed);
- res.on('end', onConnectionClosed);
- _emit('open', new Event('open'));
- // text/event-stream parser adapted from webkit's
- // Source/WebCore/page/EventSource.cpp
- var buf = '';
- res.on('data', function (chunk) {
- buf += chunk;
- var pos = 0
- , length = buf.length;
- while (pos < length) {
- if (discardTrailingNewline) {
- if (buf[pos] === '\n') {
- ++pos;
- }
- discardTrailingNewline = false;
- }
- var lineLength = -1
- , fieldLength = -1
- , c;
- for (var i = pos; lineLength < 0 && i < length; ++i) {
- c = buf[i];
- if (c === ':') {
- if (fieldLength < 0) {
- fieldLength = i - pos;
- }
- } else if (c === '\r') {
- discardTrailingNewline = true;
- lineLength = i - pos;
- } else if (c === '\n') {
- lineLength = i - pos;
- }
- }
- if (lineLength < 0) {
- break;
- }
- parseEventStreamLine(buf, pos, fieldLength, lineLength);
- pos += lineLength + 1;
- }
- if (pos === length) {
- buf = '';
- } else if (pos > 0) {
- buf = buf.slice(pos);
- }
- });
- });
- req.on('error', onConnectionClosed);
- req.setNoDelay(true);
- req.end();
- }
- connect();
- function _emit() {
- if (self.listeners(arguments[0]).length > 0) {
- self.emit.apply(self, arguments);
- }
- }
- this.close = function () {
- if (readyState == EventSource.CLOSED) return;
- readyState = EventSource.CLOSED;
- req.abort();
- };
- function parseEventStreamLine(buf, pos, fieldLength, lineLength) {
- if (lineLength === 0) {
- if (data.length > 0) {
- var type = eventName || 'message';
- _emit(type, new MessageEvent(type, {
- data: data.slice(0, -1), // remove trailing newline
- lastEventId: lastEventId,
- origin: original(url)
- }));
- data = '';
- }
- eventName = void 0;
- } else if (fieldLength > 0) {
- var noValue = fieldLength < 0
- , step = 0
- , field = buf.slice(pos, pos + (noValue ? lineLength : fieldLength));
- if (noValue) {
- step = lineLength;
- } else if (buf[pos + fieldLength + 1] !== ' ') {
- step = fieldLength + 1;
- } else {
- step = fieldLength + 2;
- }
- pos += step;
- var valueLength = lineLength - step
- , value = buf.slice(pos, pos + valueLength);
- if (field === 'data') {
- data += value + '\n';
- } else if (field === 'event') {
- eventName = value;
- } else if (field === 'id') {
- lastEventId = value;
- } else if (field === 'retry') {
- var retry = parseInt(value, 10);
- if (!Number.isNaN(retry)) {
- self.reconnectInterval = retry;
- }
- }
- }
- }
- }
- module.exports = EventSource;
- util.inherits(EventSource, events.EventEmitter);
- EventSource.prototype.constructor = EventSource; // make stacktraces readable
- ['open', 'error', 'message'].forEach(function (method) {
- Object.defineProperty(EventSource.prototype, 'on' + method, {
- /**
- * Returns the current listener
- *
- * @return {Mixed} the set function or undefined
- * @api private
- */
- get: function get() {
- var listener = this.listeners(method)[0];
- return listener ? (listener._listener ? listener._listener : listener) : undefined;
- },
- /**
- * Start listening for events
- *
- * @param {Function} listener the listener
- * @return {Mixed} the set function or undefined
- * @api private
- */
- set: function set(listener) {
- this.removeAllListeners(method);
- this.addEventListener(method, listener);
- }
- });
- });
- /**
- * Ready states
- */
- Object.defineProperty(EventSource, 'CONNECTING', { enumerable: true, value: 0});
- Object.defineProperty(EventSource, 'OPEN', { enumerable: true, value: 1});
- Object.defineProperty(EventSource, 'CLOSED', { enumerable: true, value: 2});
- /**
- * Emulates the W3C Browser based WebSocket interface using addEventListener.
- *
- * @param {String} method Listen for an event
- * @param {Function} listener callback
- * @see https://developer.mozilla.org/en/DOM/element.addEventListener
- * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
- * @api public
- */
- EventSource.prototype.addEventListener = function addEventListener(method, listener) {
- if (typeof listener === 'function') {
- // store a reference so we can return the original function again
- listener._listener = listener;
- this.on(method, listener);
- }
- };
- /**
- * W3C Event
- *
- * @see http://www.w3.org/TR/DOM-Level-3-Events/#interface-Event
- * @api private
- */
- function Event(type, optionalProperties) {
- Object.defineProperty(this, 'type', { writable: false, value: type, enumerable: true });
- if (optionalProperties) {
- for (var f in optionalProperties) {
- if (optionalProperties.hasOwnProperty(f)) {
- Object.defineProperty(this, f, { writable: false, value: optionalProperties[f], enumerable: true });
- }
- }
- }
- }
- /**
- * W3C MessageEvent
- *
- * @see http://www.w3.org/TR/webmessaging/#event-definitions
- * @api private
- */
- function MessageEvent(type, eventInitDict) {
- Object.defineProperty(this, 'type', { writable: false, value: type, enumerable: true });
- for (var f in eventInitDict) {
- if (eventInitDict.hasOwnProperty(f)) {
- Object.defineProperty(this, f, { writable: false, value: eventInitDict[f], enumerable: true });
- }
- }
- }
|