WebSocketSubject.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import { Subject, AnonymousSubject } from '../../Subject';
  2. import { Subscriber } from '../../Subscriber';
  3. import { Observable } from '../../Observable';
  4. import { Subscription } from '../../Subscription';
  5. import { root } from '../../util/root';
  6. import { ReplaySubject } from '../../ReplaySubject';
  7. import { tryCatch } from '../../util/tryCatch';
  8. import { errorObject } from '../../util/errorObject';
  9. import { assign } from '../../util/assign';
  10. /**
  11. * We need this JSDoc comment for affecting ESDoc.
  12. * @extends {Ignored}
  13. * @hide true
  14. */
  15. export class WebSocketSubject extends AnonymousSubject {
  16. constructor(urlConfigOrSource, destination) {
  17. if (urlConfigOrSource instanceof Observable) {
  18. super(destination, urlConfigOrSource);
  19. }
  20. else {
  21. super();
  22. this.WebSocketCtor = root.WebSocket;
  23. this._output = new Subject();
  24. if (typeof urlConfigOrSource === 'string') {
  25. this.url = urlConfigOrSource;
  26. }
  27. else {
  28. // WARNING: config object could override important members here.
  29. assign(this, urlConfigOrSource);
  30. }
  31. if (!this.WebSocketCtor) {
  32. throw new Error('no WebSocket constructor can be found');
  33. }
  34. this.destination = new ReplaySubject();
  35. }
  36. }
  37. resultSelector(e) {
  38. return JSON.parse(e.data);
  39. }
  40. /**
  41. * Wrapper around the w3c-compatible WebSocket object provided by the browser.
  42. *
  43. * @example <caption>Wraps browser WebSocket</caption>
  44. *
  45. * let socket$ = Observable.webSocket('ws://localhost:8081');
  46. *
  47. * socket$.subscribe(
  48. * (msg) => console.log('message received: ' + msg),
  49. * (err) => console.log(err),
  50. * () => console.log('complete')
  51. * );
  52. *
  53. * socket$.next(JSON.stringify({ op: 'hello' }));
  54. *
  55. * @example <caption>Wraps WebSocket from nodejs-websocket (using node.js)</caption>
  56. *
  57. * import { w3cwebsocket } from 'websocket';
  58. *
  59. * let socket$ = Observable.webSocket({
  60. * url: 'ws://localhost:8081',
  61. * WebSocketCtor: w3cwebsocket
  62. * });
  63. *
  64. * socket$.subscribe(
  65. * (msg) => console.log('message received: ' + msg),
  66. * (err) => console.log(err),
  67. * () => console.log('complete')
  68. * );
  69. *
  70. * socket$.next(JSON.stringify({ op: 'hello' }));
  71. *
  72. * @param {string | WebSocketSubjectConfig} urlConfigOrSource the source of the websocket as an url or a structure defining the websocket object
  73. * @return {WebSocketSubject}
  74. * @static true
  75. * @name webSocket
  76. * @owner Observable
  77. */
  78. static create(urlConfigOrSource) {
  79. return new WebSocketSubject(urlConfigOrSource);
  80. }
  81. lift(operator) {
  82. const sock = new WebSocketSubject(this, this.destination);
  83. sock.operator = operator;
  84. return sock;
  85. }
  86. _resetState() {
  87. this.socket = null;
  88. if (!this.source) {
  89. this.destination = new ReplaySubject();
  90. }
  91. this._output = new Subject();
  92. }
  93. // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
  94. multiplex(subMsg, unsubMsg, messageFilter) {
  95. const self = this;
  96. return new Observable((observer) => {
  97. const result = tryCatch(subMsg)();
  98. if (result === errorObject) {
  99. observer.error(errorObject.e);
  100. }
  101. else {
  102. self.next(result);
  103. }
  104. let subscription = self.subscribe(x => {
  105. const result = tryCatch(messageFilter)(x);
  106. if (result === errorObject) {
  107. observer.error(errorObject.e);
  108. }
  109. else if (result) {
  110. observer.next(x);
  111. }
  112. }, err => observer.error(err), () => observer.complete());
  113. return () => {
  114. const result = tryCatch(unsubMsg)();
  115. if (result === errorObject) {
  116. observer.error(errorObject.e);
  117. }
  118. else {
  119. self.next(result);
  120. }
  121. subscription.unsubscribe();
  122. };
  123. });
  124. }
  125. _connectSocket() {
  126. const { WebSocketCtor } = this;
  127. const observer = this._output;
  128. let socket = null;
  129. try {
  130. socket = this.protocol ?
  131. new WebSocketCtor(this.url, this.protocol) :
  132. new WebSocketCtor(this.url);
  133. this.socket = socket;
  134. if (this.binaryType) {
  135. this.socket.binaryType = this.binaryType;
  136. }
  137. }
  138. catch (e) {
  139. observer.error(e);
  140. return;
  141. }
  142. const subscription = new Subscription(() => {
  143. this.socket = null;
  144. if (socket && socket.readyState === 1) {
  145. socket.close();
  146. }
  147. });
  148. socket.onopen = (e) => {
  149. const openObserver = this.openObserver;
  150. if (openObserver) {
  151. openObserver.next(e);
  152. }
  153. const queue = this.destination;
  154. this.destination = Subscriber.create((x) => socket.readyState === 1 && socket.send(x), (e) => {
  155. const closingObserver = this.closingObserver;
  156. if (closingObserver) {
  157. closingObserver.next(undefined);
  158. }
  159. if (e && e.code) {
  160. socket.close(e.code, e.reason);
  161. }
  162. else {
  163. observer.error(new TypeError('WebSocketSubject.error must be called with an object with an error code, ' +
  164. 'and an optional reason: { code: number, reason: string }'));
  165. }
  166. this._resetState();
  167. }, () => {
  168. const closingObserver = this.closingObserver;
  169. if (closingObserver) {
  170. closingObserver.next(undefined);
  171. }
  172. socket.close();
  173. this._resetState();
  174. });
  175. if (queue && queue instanceof ReplaySubject) {
  176. subscription.add(queue.subscribe(this.destination));
  177. }
  178. };
  179. socket.onerror = (e) => {
  180. this._resetState();
  181. observer.error(e);
  182. };
  183. socket.onclose = (e) => {
  184. this._resetState();
  185. const closeObserver = this.closeObserver;
  186. if (closeObserver) {
  187. closeObserver.next(e);
  188. }
  189. if (e.wasClean) {
  190. observer.complete();
  191. }
  192. else {
  193. observer.error(e);
  194. }
  195. };
  196. socket.onmessage = (e) => {
  197. const result = tryCatch(this.resultSelector)(e);
  198. if (result === errorObject) {
  199. observer.error(errorObject.e);
  200. }
  201. else {
  202. observer.next(result);
  203. }
  204. };
  205. }
  206. /** @deprecated internal use only */ _subscribe(subscriber) {
  207. const { source } = this;
  208. if (source) {
  209. return source.subscribe(subscriber);
  210. }
  211. if (!this.socket) {
  212. this._connectSocket();
  213. }
  214. let subscription = new Subscription();
  215. subscription.add(this._output.subscribe(subscriber));
  216. subscription.add(() => {
  217. const { socket } = this;
  218. if (this._output.observers.length === 0) {
  219. if (socket && socket.readyState === 1) {
  220. socket.close();
  221. }
  222. this._resetState();
  223. }
  224. });
  225. return subscription;
  226. }
  227. unsubscribe() {
  228. const { source, socket } = this;
  229. if (socket && socket.readyState === 1) {
  230. socket.close();
  231. this._resetState();
  232. }
  233. super.unsubscribe();
  234. if (!source) {
  235. this.destination = new ReplaySubject();
  236. }
  237. }
  238. }
  239. //# sourceMappingURL=WebSocketSubject.js.map