## Sparrow: Distributed, Low Latency Scheduling

### 0x00 引言

Sparrow的一个最大的特点就是她是一个分布式的集群调度器，采用了去中心化的设计。Sparrow的目标是实现毫秒级的调度延迟。Sparrow架构和Borg这类的中心化的调度器差别是很大的，它里面的Scheduler会发布在多个集群中的节点上面，独立执行调度任务，关于调度器的架构，Firmament的作者有一篇很好的介绍的文章。这样的设计有可能操作一些问题，比如调度冲突的问题等等。Sparrow解决分布式调度器存在的这些问题主要采用的是基于取样的方法以及延迟绑定技术。另外，Sparrow一个prototype在Github上面有开源，总体上来说这个开源的实现还是比较简单的，

 We demonstrate that a decentralized, randomized sampling approach provides near-optimal performance while avoiding the throughput and availability limitations of a centralized design. We implement and deploy our scheduler, Sparrow, on a 110-machine cluster and demonstrate that Sparrow performs within 12% of an ideal scheduler.


### 0x01 基本架构

To schedule using batch sampling, a scheduler randomly selects dm worker machines (for d ≥ 1). The scheduler sends a probe to each of the dm workers; as with per-task sampling, each worker replies with the number of queued tasks. The scheduler places one of the job’s m tasks on each of the m least loaded workers.


 In this manner, the scheduler guarantees that the tasks will be placed on the m probed workers where they will be launched soonest. For exponentially-distributed task durations at 80% load, late binding provides response times 0.55× those with batch sampling, bringing response time to within 5% (4ms) of an omniscient scheduler (as shown in Figure 3).


#### 调度测量 & 限制

  Per-job constraints (e.g., all tasks should be run on a worker with a GPU) are trivially handled at a Sparrow scheduler. Sparrow randomly selects the dm candidate workers from the subset of workers that satisfy the constraint.


Sparrow supports two types of policies: strict priorities and weighted fair sharing ... Sparrow supports all such policies by maintaining multiple queues on worker nodes. FIFO, earliest deadline first, and shortest job first all reduce to assign- ing a priority to each job, and running the highest priority jobs first. ...


### 0x02 实现

Sparrow在实现上采用的流程如下，Github上面的开源的代码使用Java来实现，RPC使用的是Thrift。

public List<TTaskLaunchSpec> getTask(
/* TODO: Consider making this synchronized to avoid the need for synchronization in
* the task placers (although then we'd lose the ability to parallelize over task placers). */
return Lists.newArrayList();
}

LOG.error("Received invalid task placement for request " + requestId + ": " +
return Lists.newArrayList();
} else if (taskLaunchSpecs.size() == 1) {
} else {
}

LOG.debug("All tasks placed for request " + requestId);
if (useCancellation) {
Set<THostPort> outstandingNodeMonitors =
for (THostPort nodeMonitorToCancel : outstandingNodeMonitors) {
}
}
}
}
}


  public List<TTaskLaunchSpec> assignTask(THostPort nodeMonitorAddress) {
if (numOutstandingReservations == null) {
LOG.error("Node monitor " + nodeMonitorAddress +
" not in list of outstanding reservations");
return Lists.newArrayList();
}
if (numOutstandingReservations == 1) {
} else {
}

LOG.debug("Request " + requestId + ", node monitor " + nodeMonitorAddress.toString() +
return Lists.newArrayList();
} else {
LOG.debug("Request " + requestId + ", node monitor " + nodeMonitorAddress.toString() +