WebSocketSubject.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import { Subject, AnonymousSubject } from '../../Subject';
  2. import { Subscriber } from '../../Subscriber';
  3. import { Observable } from '../../Observable';
  4. import { Subscription } from '../../Subscription';
  5. import { ReplaySubject } from '../../ReplaySubject';
  6. const DEFAULT_WEBSOCKET_CONFIG = {
  7. url: '',
  8. deserializer: (e) => JSON.parse(e.data),
  9. serializer: (value) => JSON.stringify(value),
  10. };
  11. const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
  12. export class WebSocketSubject extends AnonymousSubject {
  13. constructor(urlConfigOrSource, destination) {
  14. super();
  15. if (urlConfigOrSource instanceof Observable) {
  16. this.destination = destination;
  17. this.source = urlConfigOrSource;
  18. }
  19. else {
  20. const config = this._config = Object.assign({}, DEFAULT_WEBSOCKET_CONFIG);
  21. this._output = new Subject();
  22. if (typeof urlConfigOrSource === 'string') {
  23. config.url = urlConfigOrSource;
  24. }
  25. else {
  26. for (let key in urlConfigOrSource) {
  27. if (urlConfigOrSource.hasOwnProperty(key)) {
  28. config[key] = urlConfigOrSource[key];
  29. }
  30. }
  31. }
  32. if (!config.WebSocketCtor && WebSocket) {
  33. config.WebSocketCtor = WebSocket;
  34. }
  35. else if (!config.WebSocketCtor) {
  36. throw new Error('no WebSocket constructor can be found');
  37. }
  38. this.destination = new ReplaySubject();
  39. }
  40. }
  41. lift(operator) {
  42. const sock = new WebSocketSubject(this._config, this.destination);
  43. sock.operator = operator;
  44. sock.source = this;
  45. return sock;
  46. }
  47. _resetState() {
  48. this._socket = null;
  49. if (!this.source) {
  50. this.destination = new ReplaySubject();
  51. }
  52. this._output = new Subject();
  53. }
  54. multiplex(subMsg, unsubMsg, messageFilter) {
  55. const self = this;
  56. return new Observable((observer) => {
  57. try {
  58. self.next(subMsg());
  59. }
  60. catch (err) {
  61. observer.error(err);
  62. }
  63. const subscription = self.subscribe(x => {
  64. try {
  65. if (messageFilter(x)) {
  66. observer.next(x);
  67. }
  68. }
  69. catch (err) {
  70. observer.error(err);
  71. }
  72. }, err => observer.error(err), () => observer.complete());
  73. return () => {
  74. try {
  75. self.next(unsubMsg());
  76. }
  77. catch (err) {
  78. observer.error(err);
  79. }
  80. subscription.unsubscribe();
  81. };
  82. });
  83. }
  84. _connectSocket() {
  85. const { WebSocketCtor, protocol, url, binaryType } = this._config;
  86. const observer = this._output;
  87. let socket = null;
  88. try {
  89. socket = protocol ?
  90. new WebSocketCtor(url, protocol) :
  91. new WebSocketCtor(url);
  92. this._socket = socket;
  93. if (binaryType) {
  94. this._socket.binaryType = binaryType;
  95. }
  96. }
  97. catch (e) {
  98. observer.error(e);
  99. return;
  100. }
  101. const subscription = new Subscription(() => {
  102. this._socket = null;
  103. if (socket && socket.readyState === 1) {
  104. socket.close();
  105. }
  106. });
  107. socket.onopen = (e) => {
  108. const { _socket } = this;
  109. if (!_socket) {
  110. socket.close();
  111. this._resetState();
  112. return;
  113. }
  114. const { openObserver } = this._config;
  115. if (openObserver) {
  116. openObserver.next(e);
  117. }
  118. const queue = this.destination;
  119. this.destination = Subscriber.create((x) => {
  120. if (socket.readyState === 1) {
  121. try {
  122. const { serializer } = this._config;
  123. socket.send(serializer(x));
  124. }
  125. catch (e) {
  126. this.destination.error(e);
  127. }
  128. }
  129. }, (e) => {
  130. const { closingObserver } = this._config;
  131. if (closingObserver) {
  132. closingObserver.next(undefined);
  133. }
  134. if (e && e.code) {
  135. socket.close(e.code, e.reason);
  136. }
  137. else {
  138. observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
  139. }
  140. this._resetState();
  141. }, () => {
  142. const { closingObserver } = this._config;
  143. if (closingObserver) {
  144. closingObserver.next(undefined);
  145. }
  146. socket.close();
  147. this._resetState();
  148. });
  149. if (queue && queue instanceof ReplaySubject) {
  150. subscription.add(queue.subscribe(this.destination));
  151. }
  152. };
  153. socket.onerror = (e) => {
  154. this._resetState();
  155. observer.error(e);
  156. };
  157. socket.onclose = (e) => {
  158. this._resetState();
  159. const { closeObserver } = this._config;
  160. if (closeObserver) {
  161. closeObserver.next(e);
  162. }
  163. if (e.wasClean) {
  164. observer.complete();
  165. }
  166. else {
  167. observer.error(e);
  168. }
  169. };
  170. socket.onmessage = (e) => {
  171. try {
  172. const { deserializer } = this._config;
  173. observer.next(deserializer(e));
  174. }
  175. catch (err) {
  176. observer.error(err);
  177. }
  178. };
  179. }
  180. _subscribe(subscriber) {
  181. const { source } = this;
  182. if (source) {
  183. return source.subscribe(subscriber);
  184. }
  185. if (!this._socket) {
  186. this._connectSocket();
  187. }
  188. this._output.subscribe(subscriber);
  189. subscriber.add(() => {
  190. const { _socket } = this;
  191. if (this._output.observers.length === 0) {
  192. if (_socket && _socket.readyState === 1) {
  193. _socket.close();
  194. }
  195. this._resetState();
  196. }
  197. });
  198. return subscriber;
  199. }
  200. unsubscribe() {
  201. const { _socket } = this;
  202. if (_socket && _socket.readyState === 1) {
  203. _socket.close();
  204. }
  205. this._resetState();
  206. super.unsubscribe();
  207. }
  208. }
  209. //# sourceMappingURL=WebSocketSubject.js.map