Coverage

100%
39
39
0

bagpipe.js

100%
39
39
0
LineHitsSource
11var util = require("util");
21var events = require("events");
3
4/**
5 * 构造器,传入限流值,设置异步调用最大并发数
6 * Examples:
7 * ```
8 * var bagpipe = new Bagpipe(100);
9 * bagpipe.push(fs.readFile, ['path', 'utf-8'], function (err, data) {
10 * // TODO
11 * });
12 * ```
13 * @param {Number} limit 并发数限制值
14 * @param {Boolean} disabled 是否禁用并发限制,多用于测试
15 */
161var Bagpipe = function (limit, disabled) {
1710 events.EventEmitter.call(this);
1810 this.limit = limit;
1910 this.active = 0;
2010 this.queue = [];
2110 this.disabled = !!disabled;
22};
231util.inherits(Bagpipe, events.EventEmitter);
24
25/**
26 * 推入方法,参数。最后一个参数为回调函数
27 * @param {Function} method 异步方法
28 * @param {Mix} args 参数列表,最后一个参数为回调函数。
29 */
301Bagpipe.prototype.push = function (method) {
31125 var args = [].slice.call(arguments, 1);
32125 var callback = args[args.length - 1];
33125 if (typeof callback !== 'function') {
341 args.push(function () {});
35 }
36125 if (this.disabled || this.limit < 1) {
3713 method.apply(null, args);
3813 return this;
39 }
40112 this.queue.push({
41 method: method,
42 args: args
43 });
44
45112 this.next();
46
47112 var upper = Math.min(this.limit * 2, 100);
48112 if (this.queue.length > upper) {
4985 this.emit('full', this.queue.length);
50 }
51
52112 return this;
53};
54
55/*!
56 * 继续执行队列中的后续动作
57 */
581Bagpipe.prototype.next = function () {
59123 var that = this;
60123 if (that.active < that.limit && that.queue.length) {
6117 var req = that.queue.shift();
6217 that.run(req.method, req.args);
63 }
64};
65
66/*!
67 * 执行队列中的方法
68 */
691Bagpipe.prototype.run = function (method, args) {
7017 var that = this;
7117 that.active++;
7217 var callback = args[args.length - 1];
73 // 注入回调函数
7417 args[args.length - 1] = function () {
7511 that.active--;
7611 if (that.active < that.limit) {
7711 that.next();
78 }
7911 callback.apply(null, arguments);
80 };
8117 method.apply(null, args);
82};
83
841module.exports = Bagpipe;
85