All files / utils asyncLimit.ts

94.44% Statements 34/36
88.89% Branches 16/18
100% Functions 8/8
94.44% Lines 34/36

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97                                                    25x                       12x   12x   12x 20x 20x 11x 11x   20x     12x 20x 20x     20x 20x 20x       12x 40x 40x       40x 20x 20x 20x 20x 20x 18x 2x         12x 20x 20x       20x 20x   20x 11x       12x    
export interface AsyncLimitOptions<T extends (...args: any[]) => Promise<any>> {
  /**
   * 并行量。
   *
   * @default 1
   */
  concurrency?: number
 
  /**
   * 按参数分组,分组之间并行执行,组内按顺序执行。
   *
   * @default 不分组
   */
  groupBy?: (...args: Parameters<T>) => string
}
 
interface QueueItem {
  args: any[]
  resolve: (res: any) => void
}
 
interface GroupState {
  queue: QueueItem[]
  activeCount: number
}
 
const defaultGroupKey = '__DEFAULT__'
 
/**
 * 异步函数并行执行限制。
 *
 * @param asyncFn 异步函数
 * @param options 选项
 */
export function asyncLimit<T extends (...args: any[]) => Promise<any>>(
  asyncFn: T,
  options: AsyncLimitOptions<T> = {},
): T {
  const { concurrency = 1, groupBy } = options
 
  const groups = new Map<string, GroupState>()
 
  const getGroupState = (groupKey: string) => {
    let groupState = groups.get(groupKey)
    if (!groupState) {
      groupState = { queue: [], activeCount: 0 }
      groups.set(groupKey, groupState)
    }
    return groupState
  }
 
  const call = (...args: any[]) => {
    return new Promise(resolve => {
      const groupKey = groupBy
        ? groupBy(...(args as Parameters<T>))
        : defaultGroupKey
      const groupState = getGroupState(groupKey)
      groupState.queue.push({ args, resolve })
      run(groupKey)
    })
  }
 
  const run = (groupKey: string) => {
    const groupState = groups.get(groupKey)
    Iif (!groupState) {
      return
    }
 
    if (groupState.activeCount < concurrency && groupState.queue.length) {
      groupState.activeCount++
      const { args, resolve } = groupState.queue.shift()!
      const res = asyncFn(...args)
      resolve(res)
      res.then(
        () => next(groupKey),
        () => next(groupKey),
      )
    }
  }
 
  const next = (groupKey: string) => {
    const groupState = groups.get(groupKey)
    Iif (!groupState) {
      return
    }
 
    groupState.activeCount--
    run(groupKey)
 
    if (!groupState.activeCount && !groupState.queue.length) {
      groups.delete(groupKey)
    }
  }
 
  return call as any
}