| 1 | 1 | var util = require("util"); |
| 2 | 1 | var events = require("events"); |
| 3 | | |
| 4 | | /** |
| 5 | | * 构造器,传入限流值,设置异步调用最大并发数 |
| 6 | | * Examples: |
| 7 | | * ``` |
| 8 | | * var bagpipe = new Bagpipe(100); |
| 9 | | * bagpipe.push(fs.readFile, ['path', 'utf-8'], function (err, data) { |
| 10 | | * // TODO |
| 11 | | * }); |
| 12 | | * ``` |
| 13 | | * @param {Number} limit 并发数限制值 |
| 14 | | * @param {Boolean} disabled 是否禁用并发限制,多用于测试 |
| 15 | | */ |
| 16 | 1 | var Bagpipe = function (limit, disabled) { |
| 17 | 10 | events.EventEmitter.call(this); |
| 18 | 10 | this.limit = limit; |
| 19 | 10 | this.active = 0; |
| 20 | 10 | this.queue = []; |
| 21 | 10 | this.disabled = !!disabled; |
| 22 | | }; |
| 23 | 1 | util.inherits(Bagpipe, events.EventEmitter); |
| 24 | | |
| 25 | | /** |
| 26 | | * 推入方法,参数。最后一个参数为回调函数 |
| 27 | | * @param {Function} method 异步方法 |
| 28 | | * @param {Mix} args 参数列表,最后一个参数为回调函数。 |
| 29 | | */ |
| 30 | 1 | Bagpipe.prototype.push = function (method) { |
| 31 | 125 | var args = [].slice.call(arguments, 1); |
| 32 | 125 | var callback = args[args.length - 1]; |
| 33 | 125 | if (typeof callback !== 'function') { |
| 34 | 1 | args.push(function () {}); |
| 35 | | } |
| 36 | 125 | if (this.disabled || this.limit < 1) { |
| 37 | 13 | method.apply(null, args); |
| 38 | 13 | return this; |
| 39 | | } |
| 40 | 112 | this.queue.push({ |
| 41 | | method: method, |
| 42 | | args: args |
| 43 | | }); |
| 44 | | |
| 45 | 112 | this.next(); |
| 46 | | |
| 47 | 112 | var upper = Math.min(this.limit * 2, 100); |
| 48 | 112 | if (this.queue.length > upper) { |
| 49 | 85 | this.emit('full', this.queue.length); |
| 50 | | } |
| 51 | | |
| 52 | 112 | return this; |
| 53 | | }; |
| 54 | | |
| 55 | | /*! |
| 56 | | * 继续执行队列中的后续动作 |
| 57 | | */ |
| 58 | 1 | Bagpipe.prototype.next = function () { |
| 59 | 123 | var that = this; |
| 60 | 123 | if (that.active < that.limit && that.queue.length) { |
| 61 | 17 | var req = that.queue.shift(); |
| 62 | 17 | that.run(req.method, req.args); |
| 63 | | } |
| 64 | | }; |
| 65 | | |
| 66 | | /*! |
| 67 | | * 执行队列中的方法 |
| 68 | | */ |
| 69 | 1 | Bagpipe.prototype.run = function (method, args) { |
| 70 | 17 | var that = this; |
| 71 | 17 | that.active++; |
| 72 | 17 | var callback = args[args.length - 1]; |
| 73 | | // 注入回调函数 |
| 74 | 17 | args[args.length - 1] = function () { |
| 75 | 11 | that.active--; |
| 76 | 11 | if (that.active < that.limit) { |
| 77 | 11 | that.next(); |
| 78 | | } |
| 79 | 11 | callback.apply(null, arguments); |
| 80 | | }; |
| 81 | 17 | method.apply(null, args); |
| 82 | | }; |
| 83 | | |
| 84 | 1 | module.exports = Bagpipe; |
| 85 | | |