eventsource.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. var original = require('original')
  2. , parse = require('url').parse
  3. , events = require('events')
  4. , https = require('https')
  5. , http = require('http')
  6. , util = require('util');
  7. function isPlainObject(obj) {
  8. return Object.getPrototypeOf(obj) === Object.prototype;
  9. }
  10. /**
  11. * Creates a new EventSource object
  12. *
  13. * @param {String} url the URL to which to connect
  14. * @param {Object} eventSourceInitDict extra init params. See README for details.
  15. * @api public
  16. **/
  17. function EventSource(url, eventSourceInitDict) {
  18. var readyState = EventSource.CONNECTING;
  19. Object.defineProperty(this, 'readyState', {
  20. get: function () {
  21. return readyState;
  22. }
  23. });
  24. Object.defineProperty(this, 'url', {
  25. get: function () {
  26. return url;
  27. }
  28. });
  29. var self = this;
  30. self.reconnectInterval = 1000;
  31. var connectPending = false;
  32. function onConnectionClosed() {
  33. if (connectPending || readyState === EventSource.CLOSED) return;
  34. connectPending = true;
  35. readyState = EventSource.CONNECTING;
  36. _emit('error', new Event('error'));
  37. // The url may have been changed by a temporary
  38. // redirect. If that's the case, revert it now.
  39. if (reconnectUrl) {
  40. url = reconnectUrl;
  41. reconnectUrl = null;
  42. }
  43. setTimeout(function () {
  44. if (readyState !== EventSource.CONNECTING) {
  45. return;
  46. }
  47. connect();
  48. }, self.reconnectInterval);
  49. }
  50. var req;
  51. var lastEventId = '';
  52. if (eventSourceInitDict && eventSourceInitDict.headers && isPlainObject(eventSourceInitDict.headers) && eventSourceInitDict.headers['Last-Event-ID']) {
  53. lastEventId = eventSourceInitDict.headers['Last-Event-ID'];
  54. delete eventSourceInitDict.headers['Last-Event-ID'];
  55. }
  56. var discardTrailingNewline = false
  57. , data = ''
  58. , eventName = '';
  59. var reconnectUrl = null;
  60. function connect() {
  61. connectPending = false;
  62. var options = parse(url);
  63. var isSecure = options.protocol == 'https:';
  64. options.headers = { 'Cache-Control': 'no-cache', 'Accept': 'text/event-stream' };
  65. if (lastEventId) options.headers['Last-Event-ID'] = lastEventId;
  66. if (eventSourceInitDict && eventSourceInitDict.headers && isPlainObject(eventSourceInitDict.headers)) {
  67. for (var i in eventSourceInitDict.headers) {
  68. var header = eventSourceInitDict.headers[i];
  69. if (header) {
  70. options.headers[i] = header;
  71. }
  72. }
  73. }
  74. options.rejectUnauthorized = !(eventSourceInitDict && eventSourceInitDict.rejectUnauthorized == false);
  75. req = (isSecure ? https : http).request(options, function (res) {
  76. // Handle HTTP redirects
  77. if (res.statusCode == 301 || res.statusCode == 307) {
  78. if (!res.headers.location) {
  79. // Server sent redirect response without Location header.
  80. _emit('error', new Event('error', {status: res.statusCode}));
  81. return;
  82. }
  83. if (res.statusCode == 307) reconnectUrl = url;
  84. url = res.headers.location;
  85. process.nextTick(connect);
  86. return;
  87. }
  88. if (res.statusCode !== 200) {
  89. _emit('error', new Event('error', {status: res.statusCode}));
  90. return self.close();
  91. }
  92. readyState = EventSource.OPEN;
  93. res.on('close', onConnectionClosed);
  94. res.on('end', onConnectionClosed);
  95. _emit('open', new Event('open'));
  96. // text/event-stream parser adapted from webkit's
  97. // Source/WebCore/page/EventSource.cpp
  98. var buf = '';
  99. res.on('data', function (chunk) {
  100. buf += chunk;
  101. var pos = 0
  102. , length = buf.length;
  103. while (pos < length) {
  104. if (discardTrailingNewline) {
  105. if (buf[pos] === '\n') {
  106. ++pos;
  107. }
  108. discardTrailingNewline = false;
  109. }
  110. var lineLength = -1
  111. , fieldLength = -1
  112. , c;
  113. for (var i = pos; lineLength < 0 && i < length; ++i) {
  114. c = buf[i];
  115. if (c === ':') {
  116. if (fieldLength < 0) {
  117. fieldLength = i - pos;
  118. }
  119. } else if (c === '\r') {
  120. discardTrailingNewline = true;
  121. lineLength = i - pos;
  122. } else if (c === '\n') {
  123. lineLength = i - pos;
  124. }
  125. }
  126. if (lineLength < 0) {
  127. break;
  128. }
  129. parseEventStreamLine(buf, pos, fieldLength, lineLength);
  130. pos += lineLength + 1;
  131. }
  132. if (pos === length) {
  133. buf = '';
  134. } else if (pos > 0) {
  135. buf = buf.slice(pos);
  136. }
  137. });
  138. });
  139. req.on('error', onConnectionClosed);
  140. req.setNoDelay(true);
  141. req.end();
  142. }
  143. connect();
  144. function _emit() {
  145. if (self.listeners(arguments[0]).length > 0) {
  146. self.emit.apply(self, arguments);
  147. }
  148. }
  149. this.close = function () {
  150. if (readyState == EventSource.CLOSED) return;
  151. readyState = EventSource.CLOSED;
  152. req.abort();
  153. };
  154. function parseEventStreamLine(buf, pos, fieldLength, lineLength) {
  155. if (lineLength === 0) {
  156. if (data.length > 0) {
  157. var type = eventName || 'message';
  158. _emit(type, new MessageEvent(type, {
  159. data: data.slice(0, -1), // remove trailing newline
  160. lastEventId: lastEventId,
  161. origin: original(url)
  162. }));
  163. data = '';
  164. }
  165. eventName = void 0;
  166. } else if (fieldLength > 0) {
  167. var noValue = fieldLength < 0
  168. , step = 0
  169. , field = buf.slice(pos, pos + (noValue ? lineLength : fieldLength));
  170. if (noValue) {
  171. step = lineLength;
  172. } else if (buf[pos + fieldLength + 1] !== ' ') {
  173. step = fieldLength + 1;
  174. } else {
  175. step = fieldLength + 2;
  176. }
  177. pos += step;
  178. var valueLength = lineLength - step
  179. , value = buf.slice(pos, pos + valueLength);
  180. if (field === 'data') {
  181. data += value + '\n';
  182. } else if (field === 'event') {
  183. eventName = value;
  184. } else if (field === 'id') {
  185. lastEventId = value;
  186. } else if (field === 'retry') {
  187. var retry = parseInt(value, 10);
  188. if (!Number.isNaN(retry)) {
  189. self.reconnectInterval = retry;
  190. }
  191. }
  192. }
  193. }
  194. }
  195. module.exports = EventSource;
  196. util.inherits(EventSource, events.EventEmitter);
  197. EventSource.prototype.constructor = EventSource; // make stacktraces readable
  198. ['open', 'error', 'message'].forEach(function (method) {
  199. Object.defineProperty(EventSource.prototype, 'on' + method, {
  200. /**
  201. * Returns the current listener
  202. *
  203. * @return {Mixed} the set function or undefined
  204. * @api private
  205. */
  206. get: function get() {
  207. var listener = this.listeners(method)[0];
  208. return listener ? (listener._listener ? listener._listener : listener) : undefined;
  209. },
  210. /**
  211. * Start listening for events
  212. *
  213. * @param {Function} listener the listener
  214. * @return {Mixed} the set function or undefined
  215. * @api private
  216. */
  217. set: function set(listener) {
  218. this.removeAllListeners(method);
  219. this.addEventListener(method, listener);
  220. }
  221. });
  222. });
  223. /**
  224. * Ready states
  225. */
  226. Object.defineProperty(EventSource, 'CONNECTING', { enumerable: true, value: 0});
  227. Object.defineProperty(EventSource, 'OPEN', { enumerable: true, value: 1});
  228. Object.defineProperty(EventSource, 'CLOSED', { enumerable: true, value: 2});
  229. /**
  230. * Emulates the W3C Browser based WebSocket interface using addEventListener.
  231. *
  232. * @param {String} method Listen for an event
  233. * @param {Function} listener callback
  234. * @see https://developer.mozilla.org/en/DOM/element.addEventListener
  235. * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
  236. * @api public
  237. */
  238. EventSource.prototype.addEventListener = function addEventListener(method, listener) {
  239. if (typeof listener === 'function') {
  240. // store a reference so we can return the original function again
  241. listener._listener = listener;
  242. this.on(method, listener);
  243. }
  244. };
  245. /**
  246. * W3C Event
  247. *
  248. * @see http://www.w3.org/TR/DOM-Level-3-Events/#interface-Event
  249. * @api private
  250. */
  251. function Event(type, optionalProperties) {
  252. Object.defineProperty(this, 'type', { writable: false, value: type, enumerable: true });
  253. if (optionalProperties) {
  254. for (var f in optionalProperties) {
  255. if (optionalProperties.hasOwnProperty(f)) {
  256. Object.defineProperty(this, f, { writable: false, value: optionalProperties[f], enumerable: true });
  257. }
  258. }
  259. }
  260. }
  261. /**
  262. * W3C MessageEvent
  263. *
  264. * @see http://www.w3.org/TR/webmessaging/#event-definitions
  265. * @api private
  266. */
  267. function MessageEvent(type, eventInitDict) {
  268. Object.defineProperty(this, 'type', { writable: false, value: type, enumerable: true });
  269. for (var f in eventInitDict) {
  270. if (eventInitDict.hasOwnProperty(f)) {
  271. Object.defineProperty(this, f, { writable: false, value: eventInitDict[f], enumerable: true });
  272. }
  273. }
  274. }