the-one-pitt

[TOC]

Bubble Rap论文和the-one-pitt简析


Bubble Rap 论文分析

Bubble Rap 是由Pan et.al 所提出的一种DTN算法,其主要包括了两个重要的部分,community 和 centrality,其思想来源于生态学。其主要思想是将网络转化为类似人类社区的模式,而将整个map 划分为相应的社区,对于社区中的人而言(结点),其受欢迎的程度是不同的,因而具有不同的中心度。这里主要分析论文中所提到的community和 centrality 两个具体内容,而对于实现结果分析的部分则暂时不加以考虑。

community

在文章中,主要介绍了两种集中式社区检测算法(centralized community detection algorithms): K-CLIQUE 和 WNA。 这两种算法各有优劣,因此在最终的过程中,结合了这两种算法,进行相互的补充。

K-CLIQUE

Palla et. al. 将k-clique 社区定义为一个大小为k 的完整子图的合集,可以通过一系列相邻的k-clique到达,如果两个k-clique是相邻的,意味着他们共享k-1个结点

WAN(Weighted Network Analysis)

使用WAN来进行相应的数据分析,其将一个未加权模块扩展到一个加权模块,具体计算方式如下:

其中$A_{vw}$是两个点之间的边的权重值(如果存在,否则为0),$\delta(i,j)$是1如果$i=j$否则为0。$m=\frac{1}{2}\sum_{vm}A_{vm}$,$k_v$是顶点v的度,定义为:$\sum_wA_{vw}$,然后$c_i$代表顶点i所属的community。模块化定义为该分数之间的差,如果边缘是随机分配的,则期望落入社区内的边缘的分数之间的差,但我们保持顶点的度不变。算法本质上是一种遗传算法,使用模块化作为适应性的度量。我们没有选择和改变当前的最佳解决方案,而是枚举了当前解决方案中任何两个社区的所有可能合并,并评估了结果合并的相对适合度,并选择了最佳解决方案作为下一次迭代的种子。

这些集中式社区检测算法为我们提供了有关人类社会聚类的丰富信息,可用于对收集的流动性痕迹进行离线数据分析。

centrality

中心度计算方法:

1.使用HaggleSimemulator创建的大量不同的,均匀分布的流量模式进行无限洪泛仿真。

2.计算在所有最短延迟传递中一个节点充当其他节点的中继的次数。这里最短的延迟传递是指同一条消息通过不同的路径传递到目的地的情况,在这里我们仅以最短的延迟对传递进行计数。

the-one-pitt 源码分析

the-one-pitt是一个实现了Bubble Rap 的相应源码,这里首先对其进行相应的分析,结合one源码与实现方式,判断其关于community 和 centrality的相关实现。

Social Routing

routing.community.CommunityDetectionEngine(Interface)

接口(英文:Interface),在JAVA编程语言中是一个抽象类型,是抽象方法的集合,接口通常以interface来声明。一个类通过继承接口的方式,从而来继承接口的抽象方法。

1
2
3
4
5
6
7
8
9
public interface CommunityDetectionEngine
{
/**
* Returns the set of nodes in the local Community.
*
* @return Set of hosts in the local community
*/
public Set<DTNHost> getLocalCommunity();
}

获得localCommunity 的所有hosts

routing.community.DistributedBubbleRap

实现分布式的Bubble Rap 算法,其中有很多很重要的代码,下面逐一进行分析;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public DistributedBubbleRap(Settings s)//构造函数
{
//COMMUNITY_ALG_SETTING = "communityDetectAlg"
if(s.contains(COMMUNITY_ALG_SETTING))
this.community = (CommunityDetection)
s.createIntializedObject(s.getSetting(COMMUNITY_ALG_SETTING));
else
this.community = new SimpleCommunityDetection(s);//最简单的计算community 的算法
//CENTRALITY_ALG_SETTING = "centralityAlg"
if(s.contains(CENTRALITY_ALG_SETTING))
this.centrality = (Centrality)
s.createIntializedObject(s.getSetting(CENTRALITY_ALG_SETTING));
else
this.centrality = new SWindowCentrality(s);//最简单的计算centrality的算法
}

可以看到,如果没有在seeting的文件中设置相应算法,那么采用的是SimpleCommunityDetection()SWindowCentrality来对community 和 centrality 来进行相应计算,否则那么则构造相应的类来计算该两个部分。

public DistributedBubbleRap(DistributedBubbleRap proto)*// copy*函数使用来进行相应的copy的,也就是说,不用每个routing类都需要从setting中进行读取。

1
2
3
4
5
6
7
8
9
10
11
public void doExchangeForNewConnection(Connection con, DTNHost peer)
{
// 开始计时这个新连接的持续时间,并通知社区检测对象新连接已经形成
DTNHost myHost = con.getOtherNode(peer);
DistributedBubbleRap de = this.getOtherDecisionEngine(peer);//获得con的另一个router

this.startTimestamps.put(peer, SimClock.getTime());//记录开始时间
de.startTimestamps.put(myHost, SimClock.getTime());

this.community.newConnection(myHost, peer, de.community);//调用newConnection来通知连接已经形成
}

connectionDown 的作用是当连接断开的时候,记录相应的history,最后通知连接已经断开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void connectionDown(DTNHost thisHost, DTNHost peer)
{
double time = startTimestamps.get(peer);
double etime = SimClock.getTime();

// Find or create the connection history list
List<Duration> history;
if(!connHistory.containsKey(peer))//如果不包含该节点的历史
{
history = new LinkedList<Duration>();
connHistory.put(peer, history);
}
else
history = connHistory.get(peer);

// add this connection to the list
if(etime - time > 0)
history.add(new Duration(time, etime));

CommunityDetection peerCD = this.getOtherDecisionEngine(peer).community;

// inform the community detection object that a connection was lost.
// The object might need the whole connection history at this point.
community.connectionLost(thisHost, peer, peerCD, history);

startTimestamps.remove(peer);
}
转发的条件

注意关于该函数的实现,包括了这样几个部分(具体见代码注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean shouldSendMessageToHost(Message m, DTNHost otherHost)//转发
{
if(m.getTo() == otherHost) return true; //如果不是当前结点,而是连接到的节点,那么转发

DTNHost dest = m.getTo();//获取目的host
DistributedBubbleRap de = getOtherDecisionEngine(otherHost);//得到邻接的router

// 判断这两个community中哪个有该目的host
boolean peerInCommunity = de.commumesWithHost(dest);
boolean meInCommunity = this.commumesWithHost(dest);

if(peerInCommunity && !meInCommunity) // 如果在peer中,那么转发
return true;
else if(!peerInCommunity && meInCommunity) // 只在当前community中,则不转发
return false;
else if(peerInCommunity) // 如果都在
{
if(de.getLocalCentrality() > this.getLocalCentrality())//根据中心度的大小进行判断(local)
return true;
else
return false;
}
// Neither in local community, forward to more globally central node
else if(de.getGlobalCentrality() > this.getGlobalCentrality())//如果都不在,则根据全局中心度大小判断
return true;

return false;
}

routing.RoutingDecisionEngine

这个接口,主要是为了判断在信息接收,转发,删除过程中的一些条件:

  • connectionUp(DTNHost thisHost, DTNHost peer): 当此host和peer之间的连接up时调用。
  • connectionDown(DTNHost thisHost, DTNHost peer): 当此host 和 peer之间的连接down时调用
  • doExchangeForNewConnection(Connection con, DTNHost peer): 为出现的每次连接调用一次,其作用是进行信息的同步交换和更新信息的机会。
  • newMessage(Message m): 判断所给的这个message需要被转发还是被丢弃(bubble rap 始终保持的是传递)
  • isFinalDest(Message m, DTNHost aHost): 确定给定主机是否为给定消息的预期接收方。
  • shouldSaveReceivedMessage(Message m, DTNHost thisHost):判断是否要保存接收到的信息并进行传递
  • shouldSendMessageToHost(Message m, DTNHost otherHost):调用此函数以决定是否这个信息需要被转发到给定的host中
  • shouldDeleteSentMessage(Message m, DTNHost otherHost): 调用此函数来判断是否需要删除一个已经发送了的信息
  • shouldDeleteOldMessage(Message m, DTNHost hostReportingOld): 判断是否需要删除一个old message(无法成功发送,表示已经发送了或者old)
  • replicate(): copy

routing.community.LABELDecisionEngine

LABELDecisionEngine 其中只有关于community 的计算,没有关于centrality的计算,其大部分的实现与DistributeBubbleRap相同,这里不再过多进行解释

Community Detection

routing.community.CommunityDetection(Interface)

这个接口主要用于social-based routing protocols,比如实现的DistributedBubbleRap 和 LABELDecisionEngine算法,在当前的接口中,主要有的函数为:

  • newConnection():调用此函数用于通知已经建立了一个新的connection
  • connectionLost(): 调用此函数以通知一个connection已经丢失
  • isHostInCommunity(DTNHost): 判断当前给定的host是否是这个对象的local community
  • getLocalCommunity():得到当前对象的所有的Local Community的hosts
  • replicate(): 复制当前的CommunityDetection 对象

routing.community.SimpleCommunityDetection

实现了CommunityDetection的接口,在其中有几个固定参数:lambdagammafamiliarThreshold,在后面的方法中,会涉及到关于这些参数参与的计算当中。除了这三个参数之外,还包括了Set<DTNHost> familiarSetSet<DTNHost> localCommunity

下面主要说明其中最重要的函数:

newConnection(DTNHost myHost, DTNHost peer,CommunityDetection peerCD)

关于该函数的基础功能,直接参考CommunityDetection接口中的函数说明即可知道,下面主要通过注释来说明函数中的一些具体功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public void newConnection(DTNHost myHost, DTNHost peer, 
CommunityDetection peerCD)
{
boolean addPeerToMyLocal=false, addMeToPeerLocal=false;
//初始化两个判断
SimpleCommunityDetection scd = (SimpleCommunityDetection)peerCD;

this.localCommunity.add(myHost);
scd.localCommunity.add(peer);
//在当前的community中加入当前的host ,而对于peerCD加入相应的peer

// 如果需要的话,将peer加入到this的localCommunity中
if(!this.localCommunity.contains(peer))
{
// 计算交集的个数 (scd 的 familiarSet 和 this.localCommunity)
int count=0, peerFsize = scd.familiarSet.size();
for(DTNHost h : scd.familiarSet)
if(this.localCommunity.contains(h))
count++;

// 当有很多相同的时候,则将peer加入到当前community中
if(addPeerToMyLocal = ((double)count)/peerFsize > this.lambda)
{
this.localCommunity.add(peer);
}
}

//同样,也判断是否需要将myHost 加入到scd的localCommunity中
if(!scd.localCommunity.contains(myHost))
{
int count = 0, myFsize = this.familiarSet.size();
for(DTNHost h : this.familiarSet)
if(scd.localCommunity.contains(h))
count++;

if(addMeToPeerLocal = ((double)count)/myFsize > scd.lambda)
{
scd.localCommunity.add(myHost);
}
}

//判断local communities是否需要进行合并
if(addPeerToMyLocal || addMeToPeerLocal)
{
// Compute set union
Set<DTNHost> commUnion = new HashSet<DTNHost>(this.localCommunity.size() +
scd.localCommunity.size() + 2);
commUnion.addAll(this.localCommunity);
commUnion.addAll(scd.localCommunity);

//计算两个localcommunity交集的个数(count)
int count = 0;
for(DTNHost h : this.localCommunity)
if(scd.localCommunity.contains(h))
count++;

// 如果交集的个数超过某个阈值,则进行合并
if(addPeerToMyLocal && count > this.gamma * commUnion.size())
{
this.localCommunity.addAll(scd.localCommunity);
}
if(addMeToPeerLocal && count > scd.gamma * commUnion.size())
{
scd.localCommunity.addAll(this.localCommunity);
}
}
}
connectionLost(DTNHost myHost, DTNHost peer, CommunityDetection peerCD, List\ history)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void connectionLost(DTNHost myHost, DTNHost peer, 
CommunityDetection peerCD, List<Duration> history)
{
if(this.familiarSet.contains(peer)) return;

// Compute total contact duration
Iterator<Duration> i = history.iterator();
double time = 0;
while(i.hasNext())
{
Duration d = i.next();
time += d.end - d.start;
}

// Add peer to familiar set if needed (and by extension to the local comm.)
if(time > this.familiarThreshold)
{
this.familiarSet.add(peer);
this.localCommunity.add(peer);
}
}

routing.community.KCliqueCommunityDetection

实现了CommunityDetection接口,同样,重点关注函数newConnectionconnectionLost的实现,下面进行具体分析:

newConnection

首先将这两个节点分别加到相应的localcommunity中,然后进行检测,如果当前的localCommunity包含了peer 节点,那么计算两个commmunity中的相应交叉规模,然后根据规模和K值大小的对比,来计算是否要将其加入到其它当中,实际上,Kclique和Simple不同的地方就是一个是通过$\alpha 和 \beta$等参数来进行判断的,而这里是用K来进行判断的。

connectionLost

与SimpleCommunityDetection是相同的。

Centrality Algorithms

routing.community.Centrality (Interface)

这个接口主要用于计算centrality,主要含有的方法为:

  • getGlobalCentrality(Map> connHistory): 基于连接历史来计算一个全局的centrality

  • getLocalCentrality(Map> connHistory,

    ​ CommunityDetection cd) 基于连接历史和一个CommunityDetection 来计算本地的centrality。

  • replicate(): 复制当前的Centrality 对象

routing.community.DegreeCentrality

显然的,继承了Centrality接口,实现非常简单。这里不再详细介绍。

routing.community.AvgDegreeCentrality

实现了Centrality的相应接口,这里具体介绍相关函数的实现

getGlobalCentrality()

首先需要计算时间,在COMPUTE_INTERVAL的范围内,那么直接返回,不需要重新计算。否则,则需要进行计算,下面给出相应部分的代码

1
2
3
4
5
6
7
8
9
// initialize
int epochCount = SimClock.getIntTime() / CENTRALITY_TIME_WINDOW;
int[] centralities = new int[epochCount];
int epoch, timeNow = SimClock.getIntTime();
Map<Integer, Set<DTNHost>> nodesCountedInEpoch =
new HashMap<Integer, Set<DTNHost>>();

for(int i = 0; i < epochCount; i++)
nodesCountedInEpoch.put(i, new HashSet<DTNHost>());

首先进行初始化操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for(Map.Entry<DTNHost, List<Duration>> entry : connHistory.entrySet())
{
DTNHost h = entry.getKey();
for(Duration d : entry.getValue())
{
int timePassed = (int)(timeNow - d.end);

// if we reached the end of the last epoch, we're done with this node
if(timePassed > CENTRALITY_TIME_WINDOW * epochCount)
break;

// compute the epoch this contact belongs to
epoch = timePassed / CENTRALITY_TIME_WINDOW;

// Only consider each node once per epoch
Set<DTNHost> nodesAlreadyCounted = nodesCountedInEpoch.get(epoch);
if(nodesAlreadyCounted.contains(h))
continue;

// increment the degree for the given epoch
centralities[epoch]++;
nodesAlreadyCounted.add(h);
}
}

循环查看每一个在connHistory中的结点,直到检查完所有相应的结点,然后会在centralities中做上相应的记录,并将这些结点放入nodeAlreadyCount中。

1
2
3
4
5
6
7
8
int sum = 0;
for(int i = 0; i < epochCount; i++)
sum += centralities[i];
this.globalCentrality = ((double)sum) / epochCount;

this.lastGlobalComputationTime = SimClock.getIntTime();

return this.globalCentrality;

最后计算所有记录好的centralities,然后将其加到sum中,然后计算得到globalCentrality即可,并保存相应的lastGlobalComputationTime

getLocalCentrality()

对于本地中心度的计算,在整个计算上是大同小异的,唯一的差别是检查相应的节点是否在本地的community中,然后进行计算。

routing.community.SWindowCentrality

同样的,实现了Centrality的接口,主要的关键还是两个方法,下面主要进行介绍

getGlobalCentrality

对于SWindow的计算,其具体计算为检查每个节点,判断每个节点是否满足要求,如果满足,则centrality加1

getLocalCentrality

同样的,对于Local中心度的计算,那么也是需要判断是否在community当中。

routing.community.CWindowCentrality

同样的,实现了Centrality的接口,主要的关键还是两个方法,下面主要进行介绍

getGlobalCentrality

对于具体的代码不再分析,其实际上和AvgDegreeCentrality的计算几乎是相同的,唯一的不同在于AvgDegreeCentrality中的epochCount是通过计算得到的,而对于CWindowCentrality而言,那么实际上就是直接设定的EPOCH_COUNT

getLocalCentrality

其具体的不同也就是计算的是本地的community的中心度,而非总体的中心度。

Added Reports

report.CommunityDetectionReport

继承自Report中,实现done()方法,其具体就是打印了相应的内容,主要是计算当前的community。

report.DeliveryCentralityReport

report.SimpleCommunityDetectionReport

附录:

在学习ONE使用的过程中,从这个The ONE使用笔记中学习到了很多知识,故在最后强烈推荐。