Pregel: A System for Large-Scale Graph Processing
引言
Google的这篇关于图计算的Paper也是一篇重量级的论文。Pregel几个基本的特点:分布式、容错、可拓展性强、以定点为核心和BSP并发模型等,
This vertex-centric approach is flexible enough to express a broad set of algorithms. The model has been designed for efficient, scalable and fault-tolerant implementation on clusters of thousands of commodity computers, and its implied synchronicity makes reasoning about programs easier. Distribution-related details are hidden behind an abstract API. The result is a framework for processing large graphs that is expressive and easy to program.
编程模型
Pregel处理的输入是一个有向图,每一个节点都会有一个唯一的ID标识(string类型)。在输入初始化之后,计算的工程是若干的supertsteps组成的过程,supersteps以一个全局的同步点作为分割。这个也就是Pregel主要体现BSP编程模型的地方。在supersteps都完成之后,输出计算结构这个计算任务也就完成了。每一个superstep中节点的计算时并发的,都执行一个用户定义的函数。Pregel是以顶点为核心的,一个顶点可以计算的时候可以改变它自从的状态,也可以改变它相关的边的状态,接受来自前一superstep的信息,也可以发送消息给其它的节点,还可以修改图的拓扑。几个Pregel模型特点的总结:
- 几个重要的概念: 节点、消息、superstep,active和inactive等。
- 节点的状态有active和inactive之分,在第0步,每一个节点都是active的,一个节点可以通过voting to halt操作使自己进入inactive的状态。节点接受到消息会使其重新进入active状态,如果它还需要进入inactive状态,需要重新执行voting to halt操作。当所有的节点处于inactive状态有没有消息发送是,计算也就结束了;
- Pregel程序最终的输出是每一个节点输出的集合;
- Pregel的模型是一个纯粹的基于消息通信的模型,Pregel认为这个很适合这样的处理方式,有着不少的缺点(当然Pregel后面的一些系统提出了不同的看法);
- 在Pregel实现图算法和Mapreduce的程序有很多类型的地方,也存在不少的区别。
API(C++)
Google很多的系统都是使用C++写出的,Pregel也不例外。Pregel在Paper中这一节介绍了它的API的设计。由于Pregel是以节点为核心的,这里先来看看它节点的API
template <typename VertexValue,
typename EdgeValue,
typename MessageValue>
class Vertex {
public:
virtual void Compute(MessageIterator* msgs) = 0;
const string& vertex_id() const;
int64 superstep() const;
const VertexValue& GetValue();
VertexValue* MutableValue();
OutEdgeIterator GetOutEdgeIterator();
void SendMessageTo(const string& dest_vertex,
const MessageValue& message);
void VoteToHalt();
};
编写Pregel的编程以编写Vertex的子类为核心。主要的API就是计算、值操作、边、消息发送和VoteToHalt的操作,对应了上面介绍的计算模型。另外几个要点:
- Message Passing,消息是节点间通信的方式,每个消息包含接受的目的节点和一个值。
- Combiners,通过Combiner可以实现聚合操作,大大减少消息的通信量;
- Aggregators,这个是一种所有节点在一个superstep向一个Aggregator提供一个信息,然后有其处理之后。处理的结果可以提供下一步的所有节点;
- Topology Mutations,如果计算的过程需要对图的拓扑进行修改,由于Pregel是分布式的等原因,Pregel是通过前面提到的消息通信的方式实现的;
- 输入输出,持久化的数据一般保存在GFS或者是Bigtable之类的分布式存储系统中,而临时性的数据一般就保存在本地的磁盘上面。
实现
基本架构
Pregel也是常见的主从架构。为了将节点数据分发到各个结点上面,Pregel使用了基于hash分区的方式。执行的步骤:
- 一台机器作为Master,不处理图,负责协调其它的Workers结点。
- Master处理图的分区;
- Master分配用户输入给Workers;
- Master协调Worker执行supersteps的计算;
- 计算完成之后,Master可以要求Workers保存它们处理的部分的图;
容错
Pregel容错的机制主要就是两种:
- Checkpointing,在一个superstep开始的时候,Master指示Workers保存目前的状态作为checkpointing,作为失败后恢复重启的基础,这种方式在Worker输入出现了失败都是仍然可用是很有用;
- Confined recovery,当结点失败变得完全不可用的时候,它的checkpoint也变得不可用,这个时候需要重新输入来自上一步的输入。为了避免不必要的重复计算,这里使用了log来保存一个节点在一个superstep计算时发出的消息。
几个例子
PageRank
class PageRankVertex
: public Vertex<double, void, double> {
public:
virtual void Compute(MessageIterator* msgs) {
if (superstep() >= 1) {
double sum = 0;
for (; !msgs->Done(); msgs->Next())
sum += msgs->Value();
*MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
}
if (superstep() < 30) {
const int64 n = GetOutEdgeIterator().size();
SendMessageToAllNeighbors(GetValue() / n);
} else {
VoteToHalt();
} }
};
最短路径
class ShortestPathVertex
: public Vertex<int, int, int> {
void Compute(MessageIterator* msgs) {
int mindist = IsSource(vertex_id()) ? 0 : INF;
for (; !msgs->Done(); msgs->Next())
mindist = min(mindist, msgs->Value());
if (mindist < GetValue()) {
*MutableValue() = mindist;
OutEdgeIterator iter = GetOutEdgeIterator();
for (; !iter.Done(); iter.Next())
SendMessageTo(iter.Target(),
mindist + iter.GetValue());
VoteToHalt();
}
};
评估
具体参考[1].
GraphX(补充)
GraphLab(补充)
参考
- Pregel: A System for Large-Scale Graph Processing, SIGMOD’10.