123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- var EventEmitter = require('events').EventEmitter;
- var util = require('util');
- var DEFAULT_TIMEOUT = 3000;
- var INIT_ID = 0;
- var EVENT_CLOSED = 'closed';
- var EVENT_DRAINED = 'drained';
- var SeqQueue = function(timeout) {
- EventEmitter.call(this);
-
- if(timeout && timeout > 0) {
- this.timeout = timeout;
- } else {
- this.timeout = DEFAULT_TIMEOUT;
- }
-
- this.status = SeqQueueManager.STATUS_IDLE;
- this.curId = INIT_ID;
- this.queue = [];
- };
- util.inherits(SeqQueue, EventEmitter);
- SeqQueue.prototype.push = function(fn, ontimeout, timeout) {
- if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
-
- return false;
- }
-
- if(typeof fn !== 'function') {
- throw new Error('fn should be a function.');
- }
- this.queue.push({fn: fn, ontimeout: ontimeout, timeout: timeout});
- if(this.status === SeqQueueManager.STATUS_IDLE) {
- this.status = SeqQueueManager.STATUS_BUSY;
- var self = this;
- process.nextTick(function() {
- self._next(self.curId);
- });
- }
- return true;
- };
- SeqQueue.prototype.close = function(force) {
- if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
-
- return;
- }
-
- if(force) {
- this.status = SeqQueueManager.STATUS_DRAINED;
- if(this.timerId) {
- clearTimeout(this.timerId);
- this.timerId = undefined;
- }
- this.emit(EVENT_DRAINED);
- } else {
- this.status = SeqQueueManager.STATUS_CLOSED;
- this.emit(EVENT_CLOSED);
- }
- };
- SeqQueue.prototype._next = function(tid) {
- if(tid !== this.curId || this.status !== SeqQueueManager.STATUS_BUSY && this.status !== SeqQueueManager.STATUS_CLOSED) {
-
- return;
- }
-
- if(this.timerId) {
- clearTimeout(this.timerId);
- this.timerId = undefined;
- }
-
- var task = this.queue.shift();
- if(!task) {
- if(this.status === SeqQueueManager.STATUS_BUSY) {
- this.status = SeqQueueManager.STATUS_IDLE;
- this.curId++;
- } else {
- this.status = SeqQueueManager.STATUS_DRAINED;
- this.emit(EVENT_DRAINED);
- }
- return;
- }
-
- var self = this;
- task.id = ++this.curId;
- var timeout = task.timeout > 0 ? task.timeout : this.timeout;
- timeout = timeout > 0 ? timeout : DEFAULT_TIMEOUT;
- this.timerId = setTimeout(function() {
- process.nextTick(function() {
- self._next(task.id);
- });
- self.emit('timeout', task);
- if(task.ontimeout) {
- task.ontimeout();
- }
- }, timeout);
- try {
- task.fn({
- done: function() {
- var res = task.id === self.curId;
- process.nextTick(function() {
- self._next(task.id);
- });
- return res;
- }
- });
- } catch(err) {
- self.emit('error', err, task);
- process.nextTick(function() {
- self._next(task.id);
- });
- }
- };
- var SeqQueueManager = module.exports;
- SeqQueueManager.STATUS_IDLE = 0;
- SeqQueueManager.STATUS_BUSY = 1;
- SeqQueueManager.STATUS_CLOSED = 2;
- SeqQueueManager.STATUS_DRAINED = 3;
- SeqQueueManager.createQueue = function(timeout) {
- return new SeqQueue(timeout);
- };
|