Pregel -- A System for Large-Scale Graph Processing


Pregel: A System for Large-Scale Graph Processing

引言

Google的这篇关于图计算的Paper也是一篇重量级的论文。Pregel几个基本的特点:分布式、容错、可拓展性强、以定点为核心和BSP并发模型等,

This vertex-centric approach is flexible enough to express a broad set of algorithms. The model has been designed for efficient, scalable and fault-tolerant implementation on clusters of thousands of commodity computers, and its implied synchronicity makes reasoning about programs easier. Distribution-related details are hidden behind an abstract API. The result is a framework for processing large graphs that is expressive and easy to program.

编程模型

Pregel处理的输入是一个有向图,每一个节点都会有一个唯一的ID标识(string类型)。在输入初始化之后,计算的工程是若干的supertsteps组成的过程,supersteps以一个全局的同步点作为分割。这个也就是Pregel主要体现BSP编程模型的地方。在supersteps都完成之后,输出计算结构这个计算任务也就完成了。每一个superstep中节点的计算时并发的,都执行一个用户定义的函数。Pregel是以顶点为核心的,一个顶点可以计算的时候可以改变它自从的状态,也可以改变它相关的边的状态,接受来自前一superstep的信息,也可以发送消息给其它的节点,还可以修改图的拓扑。几个Pregel模型特点的总结:

  • 几个重要的概念: 节点、消息、superstep,active和inactive等。
  • 节点的状态有active和inactive之分,在第0步,每一个节点都是active的,一个节点可以通过voting to halt操作使自己进入inactive的状态。节点接受到消息会使其重新进入active状态,如果它还需要进入inactive状态,需要重新执行voting to halt操作。当所有的节点处于inactive状态有没有消息发送是,计算也就结束了;
  • Pregel程序最终的输出是每一个节点输出的集合;
  • Pregel的模型是一个纯粹的基于消息通信的模型,Pregel认为这个很适合这样的处理方式,有着不少的缺点(当然Pregel后面的一些系统提出了不同的看法);
  • 在Pregel实现图算法和Mapreduce的程序有很多类型的地方,也存在不少的区别。

API(C++)

Google很多的系统都是使用C++写出的,Pregel也不例外。Pregel在Paper中这一节介绍了它的API的设计。由于Pregel是以节点为核心的,这里先来看看它节点的API

template <typename VertexValue,
          typename EdgeValue,
          typename MessageValue>
class Vertex {
 public:
  virtual void Compute(MessageIterator* msgs) = 0;
  const string& vertex_id() const;
  int64 superstep() const;
  const VertexValue& GetValue();
  VertexValue* MutableValue();
  OutEdgeIterator GetOutEdgeIterator();
  void SendMessageTo(const string& dest_vertex,
                     const MessageValue& message);
  void VoteToHalt();
};

编写Pregel的编程以编写Vertex的子类为核心。主要的API就是计算、值操作、边、消息发送和VoteToHalt的操作,对应了上面介绍的计算模型。另外几个要点:

  • Message Passing,消息是节点间通信的方式,每个消息包含接受的目的节点和一个值。
  • Combiners,通过Combiner可以实现聚合操作,大大减少消息的通信量;
  • Aggregators,这个是一种所有节点在一个superstep向一个Aggregator提供一个信息,然后有其处理之后。处理的结果可以提供下一步的所有节点;
  • Topology Mutations,如果计算的过程需要对图的拓扑进行修改,由于Pregel是分布式的等原因,Pregel是通过前面提到的消息通信的方式实现的;
  • 输入输出,持久化的数据一般保存在GFS或者是Bigtable之类的分布式存储系统中,而临时性的数据一般就保存在本地的磁盘上面。

实现

基本架构

Pregel也是常见的主从架构。为了将节点数据分发到各个结点上面,Pregel使用了基于hash分区的方式。执行的步骤:

  1. 一台机器作为Master,不处理图,负责协调其它的Workers结点。
  2. Master处理图的分区;
  3. Master分配用户输入给Workers;
  4. Master协调Worker执行supersteps的计算;
  5. 计算完成之后,Master可以要求Workers保存它们处理的部分的图;
容错

Pregel容错的机制主要就是两种:

  1. Checkpointing,在一个superstep开始的时候,Master指示Workers保存目前的状态作为checkpointing,作为失败后恢复重启的基础,这种方式在Worker输入出现了失败都是仍然可用是很有用;
  2. Confined recovery,当结点失败变得完全不可用的时候,它的checkpoint也变得不可用,这个时候需要重新输入来自上一步的输入。为了避免不必要的重复计算,这里使用了log来保存一个节点在一个superstep计算时发出的消息。

几个例子

PageRank
class PageRankVertex
    : public Vertex<double, void, double> {
 public:
  virtual void Compute(MessageIterator* msgs) {
    if (superstep() >= 1) {
      double sum = 0;
      for (; !msgs->Done(); msgs->Next())
        sum += msgs->Value();
      *MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
}
    if (superstep() < 30) {
      const int64 n = GetOutEdgeIterator().size();
      SendMessageToAllNeighbors(GetValue() / n);
    } else {
      VoteToHalt();
} }
};

最短路径

class ShortestPathVertex
    : public Vertex<int, int, int> {
  void Compute(MessageIterator* msgs) {
    int mindist = IsSource(vertex_id()) ? 0 : INF;
    for (; !msgs->Done(); msgs->Next())
      mindist = min(mindist, msgs->Value());
    if (mindist < GetValue()) {
      *MutableValue() = mindist;
      OutEdgeIterator iter = GetOutEdgeIterator();
      for (; !iter.Done(); iter.Next())
        SendMessageTo(iter.Target(),
                      mindist + iter.GetValue());
    VoteToHalt();
  }
};

评估

具体参考[1].

GraphX(补充)

GraphLab(补充)

参考

  1. Pregel: A System for Large-Scale Graph Processing, SIGMOD’10.