the-one-code-3

[TOC]

ONE程序代码简析


Routing

所有的路由的实现都基于了MessageRouter这个基本的抽象类,所以我们首先关注这个抽象类

MessageRouter

这是一个抽象类,使用了public abstract class 来进行声明。其主要定义了如下变量,主要包括了buffer size , TTL 和传输队列,包括传输和接收的信息反馈等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//message buffer size 
public static final String B_SIZE_S = "bufferSize";
//TTL 即最多能够传递的次数 (time to live)
public static final String MSG_TTL_S = "msgTtl";
//如果路由协议没有定义任何特殊的传输message 的序列,那么这将影响传输的顺序;也就是信息传输队列的类型。
public static final String SEND_QUEUE_MODE_S = "sendQueue";
/** Setting value for random queue mode */
public static final int Q_MODE_RANDOM = 1;
/** Setting value for FIFO queue mode */
public static final int Q_MODE_FIFO = 2;
/** Setting string for random queue mode */
public static final String STR_Q_MODE_RANDOM = "RANDOM";
/** Setting string for FIFO queue mode */
public static final String STR_Q_MODE_FIFO = "FIFO";

/** Receive return value for OK */
//当开始传输的时候,返回的结果
public static final int RCV_OK = 0;
/** Receive return value for busy receiver */
public static final int TRY_LATER_BUSY = 1;
/** Receive return value for an old (already received) message */
public static final int DENIED_OLD = -1;
/** Receive return value for not enough space in the buffer for the msg */
public static final int DENIED_NO_SPACE = -2;
/** Receive return value for messages whose TTL has expired */
public static final int DENIED_TTL = -3;
/** Receive return value for a node low on some resource(s) */
public static final int DENIED_LOW_RESOURCES = -4;
/** Receive return value for a node low on some resource(s) */
public static final int DENIED_POLICY = -5;
/** Receive return value for unspecified reason */
public static final int DENIED_UNSPECIFIED = -99;
/** Maximum Ttl value */
public static final int MAX_TTL_VALUE = 35791394;

private List<MessageListener> mListeners;
/** The messages being transferred with msgID_hostName keys */
private HashMap<String, Message> incomingMessages;
/** The messages this router is carrying */
private HashMap<String, Message> messages;
/** The messages this router has received as the final recipient */
private HashMap<String, Message> deliveredMessages;
/** The messages that Applications on this router have blacklisted */
private HashMap<String, Object> blacklistedMessages;
/** Host where this router belongs to */
private DTNHost host;
/** size of the buffer */
private long bufferSize;
/** TTL for all messages */
protected int msgTtl;
/** Queue mode for sending messages */
private int sendQueueMode;

/** applications attached to the host */
private HashMap<String, Collection<Application>> applications = null;

然后定义了如下的一些主要函数(方法):

MessageRouter(Setting s)

根据配置文件default_setting.txt进行读取并创建相应的mesage router。

init(DTNHost,List)

初始化一个router , 设定这个router 在具体的哪个host 当中 和当该router 发送消息时所有需要被通知的message listener 。

update()

当进行更新的时候,其实际上是向上层传递(应用层),然后让相应需要更新的应用层进行更新。

sendMessage(String , DTNHost)

在这里比较需要进行关注的点在于,要发送的信息不能是当前信息,而需要对其进行拷贝:m2 = m.replicate(),然后再进行发送。包括在receiveMessage中也会进行相应的拷贝,要注意这一含义,否则则有可能进行了深拷贝,那么message进行修改的时候就会导致相应的错误。

messageTransferred()

通过判断当前接收到的message是否是到达目的地,来决定是否要进行转发,同时会调用到上层的application 来决定其是否需要drop掉它。

sortByQueueMode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@SuppressWarnings(value = "unchecked") /* ugly way to make this generic */
protected List sortByQueueMode(List list) {
switch (sendQueueMode) {
case Q_MODE_RANDOM:
Collections.shuffle(list, new Random(SimClock.getIntTime()));
break;
case Q_MODE_FIFO:
Collections.sort(list,
new Comparator() {
/** Compares two tuples by their messages' receiving time */
public int compare(Object o1, Object o2) {
double diff;
Message m1, m2;

if (o1 instanceof Tuple) {
m1 = ((Tuple<Message, Connection>)o1).getKey();
m2 = ((Tuple<Message, Connection>)o2).getKey();
}
else if (o1 instanceof Message) {
m1 = (Message)o1;
m2 = (Message)o2;
}
else {
throw new SimError("Invalid type of objects in " +
"the list");
}

diff = m1.getReceiveTime() - m2.getReceiveTime();
if (diff == 0) {
return 0;
}
return (diff < 0 ? -1 : 1);
}
});
break;
/* add more queue modes here */
default:
throw new SimError("Unknown queue mode " + sendQueueMode);
}

这里根据两种不同的方式 Q_MODE_RANDOM 和Q_MODE_FIFO 来进行的排序。同样的 compare 也是依据这两种不同模式来分别进行比较的。

ActiveRouter

主动路由的超类(实际上是被动路由),继承自MessageRouter。

update()

关于更新,主要分为了这样几步,首先是调用了MessageRouter 实现的update函数,然后查找所有的sendingConnections,然后检测该connection是否已经传输过信息,根据此决定是否是否已经传输完成(transferDone),如果当前的connection已经关闭,那么就abort掉,如果有需要进行remove的,那么根据相应的信息进行移除,最后将已经过期的信息从当前的buffer中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public void update() {
super.update();

/* in theory we can have multiple sending connections even though
currently all routers allow only one concurrent sending connection */
for (int i=0; i<this.sendingConnections.size(); ) {
boolean removeCurrent = false;
Connection con = sendingConnections.get(i);

/* finalize ready transfers */
if (con.isMessageTransferred()) {
if (con.getMessage() != null) {
transferDone(con);
con.finalizeTransfer();
} /* else: some other entity aborted transfer */
removeCurrent = true;
}
/* remove connections that have gone down */
else if (!con.isUp()) {
if (con.getMessage() != null) {
transferAborted(con);
con.abortTransfer();
}
removeCurrent = true;
}

if (removeCurrent) {
// if the message being sent was holding excess buffer, free it
if (this.getFreeBufferSize() < 0) {
this.makeRoomForMessage(0);
}
sendingConnections.remove(i);
}
else {
/* index increase needed only if nothing was removed */
i++;
}
}

/* time to do a TTL check and drop old messages? Only if not sending */
if (SimClock.getTime() - lastTtlCheck >= TTL_CHECK_INTERVAL &&
sendingConnections.size() == 0) {
dropExpiredMessages();
lastTtlCheck = SimClock.getTime();
}

if (energy != null) {
/* TODO: add support for other interfaces */
NetworkInterface iface = getHost().getInterface(1);
energy.update(iface, getHost().getComBus());
}
}

PassiveRouter

PassiveRouter , 实际上是我们所讲的主动路由,即不发送任何消息除非在需要的时候,即只有在需要发送message的时候才进行路由表的查找。这对于一些外部事件是比较有效的一种路由方式,同样地,其继承于MessageRouter。其没有增加任何相应的变量。

DirectDeliveryRouter

继承自ActiveRouter ,是一种具体的被动路由,路由器只会向最终的router 发送相应的message,而不是采用广播的方式进行。 这里只是对update进行了相应的补充,即如果正在传输或者当前路由无法传输,不需要更新;然后去检测是否能到最终的接收方,并根据此进行判断。

EpidemicOracleRouter

同样继承自ActiveRouter,这个router 利用相应的数据库保存了message是何时被传输的,并且何时被所有节点移除的。(所有使用该路由的节点)。所有信息都是立即发出。

其增加了一个新的变量List<EpidemicOracleRouter> allRouters,来记录所有使用该路由的节点。

注意此处

1
2
3
4
5
6
7
8
9
10
11
12
public EpidemicOracleRouter(Settings s) {
super(s);
}

/**
* Copy constructor.
* @param r The router prototype where setting values are copied from
*/
protected EpidemicOracleRouter(EpidemicOracleRouter r) {
super(r);
allRouters.add(this);
}

利用setting 进行初始化的时候没有调用allRouters.add(this),因为利用setting 进行初始化的时候实际上可以视作建立的是一个template,只用于后面的其它路由的建立。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Message messageTransferred(String id, DTNHost from) {
Message m = super.messageTransferred(id, from);

if (m.getTo() == this.getHost()) {
for (EpidemicOracleRouter r : allRouters) {
if (r != this && r != from.getRouter()) {
r.removeDeliveredMessage(id);
}
}
} else {
sendMessageToConnected(m);
}

return m;
}

这个函数的实现,主要是多加了这样一个部分,即当到达的时候-> (m.getTo() == this.getHost()),那么就删掉所有不是该路由自身或者传递过来的路由的其它路由的相应该信息,否则就传递给connection。

EpidemicRouter

每次只有单个的传输连接,并且会抛弃之前的buffer中的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void update() {
super.update();
if (isTransferring() || !canStartTransfer()) {
return; // transferring, don't try other connections yet
}

// Try first the messages that can be delivered to final recipient
if (exchangeDeliverableMessages() != null) {
return; // started a transfer, don't try others (yet)
}

// then try any/all message to any/all connection
this.tryAllMessagesToAllConnections();
}

可以看到,其首先会尝试传输给最终需要接收的节点,如果无法直接传输到,那么就会传给所有相应的connections,相当于是一个广播的方式。

FirstContactRouter

这个router 只用message 的一个copy 来进行传输,一直到第一个可用的contact。

1
2
3
4
5
6
7
8
9
10
11
12
protected int checkReceiving(Message m, DTNHost from) {
int recvCheck = super.checkReceiving(m, from);

if (recvCheck == RCV_OK) {
/* don't accept a message that has already traversed this node */
if (m.getHops().contains(getHost())) {
recvCheck = DENIED_OLD;
}
}

return recvCheck;
}

由于是first contact ,所以只接收一次相关的message,而不是每次都接收。

LifeRouter

Router module mimicking the game-of-life behavior, 也就是模拟game-of-life 的方式来进行route。

新增加了两个变量:

1
2
public static final String NM_COUNT_S = "nmcount";
private int countRange[];

nmcount指的是neighboring message count , count_range是一个最小值和最大值,并且用逗号分开。

实现了一个新的函数:getPeerMessageCount(Message m)

1
2
3
4
5
6
7
8
9
10
11
12
private int getPeerMessageCount(Message m) {
DTNHost me = getHost();
String id = m.getId();
int peerMsgCount = 0;

for (Connection c : getConnections()) {
if (c.getOtherNode(me).getRouter().hasMessage(id)) {
peerMsgCount++;
}
}
return peerMsgCount;
}

计算有多少个connected peers 拥有这个Message

对于update 函数而言,其为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void update() {
int peerMsgCount;
Vector<String> messagesToDelete = new Vector<String>();
super.update();

if (isTransferring() || !canStartTransfer()) {
return; /* transferring, don't try other connections yet */
}

/* Try first the messages that can be delivered to final recipient */
if (exchangeDeliverableMessages() != null) {
return;
}
this.tryAllMessagesToAllConnections();

/* see if need to drop some messages... */
for (Message m : getMessageCollection()) {
peerMsgCount = getPeerMessageCount(m);
if (peerMsgCount < this.countRange[0] ||
peerMsgCount > this.countRange[1]) {
messagesToDelete.add(m.getId());
}
}
for (String id : messagesToDelete) { /* ...and drop them */
this.deleteMessage(id, true);
}
}

跟前一个而言,其最大的不同在于需要检查是否需要drop 相应的message,即看当前的message 是否超过了设定的范围,如果超过了相应的范围,就会有:messagesToDelete

MaxPropRouter

这是一种基于车联网的路由方式,是一种能够disruption-tolerant 的网络。同样继承自ActiveRouter,这个协议的扩展是通过添加一个参数$\alpha$来进行实现的。对于一个新的connection, 它的传递的可能性为 $\frac{\alpha \times p}{1+\alpha}$。

新增变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
   // setting 文件中的名字空间 (name)
public static final String MAXPROP_NS = "MaxPropRouter";
// 传递可能性的最大值的设定
public static final String PROB_SET_MAX_SIZE_S = "probSetMaxSize";
// 一个default 的值
public static final int DEFAULT_PROB_SET_MAX_SIZE = 50;
private static int probSetMaxSize;
//MeetingProbabilitySet 声明的位置为 routing.maxprop 中,
//其主要是为了来保存和进行关于meeting probability 的操作
//最主要的,包括了一个Map<Integer,Double> 也就是相应下一个结点的连接可能性。
private MeetingProbabilitySet probs;
//进行相应的映射
private Map<Integer, MeetingProbabilitySet> allProbs;
/** the cost-to-node calculator Dijkstra 算法*/
private MaxPropDijkstra dijkstra;
/** IDs of the messages that are known to have reached the final dst */
private Set<String> ackedMessageIds;
/** message(key) -> cost(value) */
private Map<Integer, Double> costsForMessages;
/** From host of the last cost calculation */
private DTNHost lastCostFrom;

/**记录了向哪些host 发送了具体的哪些message */
private Map<DTNHost, Set<String>> sentMessages;

// 在每次传输机会,能够传输的平均bytes (采样)
public static int BYTES_TRANSFERRED_AVG_SAMPLES = 10;
private int[] avgSamples;
private int nextSampleIndex = 0;

// 当前的一个value 值
private int avgTransferredBytes = 0;

/** The alpha parameter string*/
public static final String ALPHA_S = "alpha";

/** The alpha variable, default = 1;*/
private double alpha;

/** The default value for alpha */
public static final double DEFAULT_ALPHA = 1.0;

新增的变量,主要是为了实现相应的传播可能性的计算,以及所传输的相应信息,包括传送到的具体位置,以及传输的平均bytes(每次传输机会)。

对于此,有一些相应的补充,即为了实现MaxPropRouter而实现的两个相应类,具体在routing.maxprop的package之中,具体为下:

MeetingProbabilitySet

这个类主要是为了存储和对meeting probabilities的进行操作。

其中一个最重要的函数为updateMeetingProbFor(),其具体作用是更新meeting probability 对于一个给定的node index。

可以看到,对于第一个,其可能性为1 ,如果某个结点node 进行了更新,那么其会增加,但是为了保证所有的可能性之和为1,所以会除以$\alpha + 1$,最后做了一个异常的处理,即超出相应的范围。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void updateMeetingProbFor(Integer index) {
Map.Entry<Integer, Double> smallestEntry = null;
double smallestValue = Double.MAX_VALUE;

this.lastUpdateTime = SimClock.getTime();

if (probs.size() == 0) { // first entry
probs.put(index, 1.0);
return;
}

double newValue = getProbFor(index) + alpha;
probs.put(index, newValue);
/* now the sum of all entries is 1+alpha;
* normalize to one by dividing all the entries by 1+alpha */
for (Map.Entry<Integer, Double> entry : probs.entrySet()) {
entry.setValue(entry.getValue() / (1+alpha));
if (entry.getValue() < smallestValue) {
smallestEntry = entry;
smallestValue = entry.getValue();
}
}
if (probs.size() >= maxSetSize) {
if (DEBUG) core.Debug.p("Probsize: " + probs.size() + " dropping " +
probs.remove(smallestEntry.getKey()));
}
}
MaxPropDijkstra

这个类,实际上就很简单,其实就是利用相应的信息,使用dijkstra算法去找到最短的路径。其中,一个重要的函数时relax(Integer)函数,其作用就是relax 一个结点的所有邻居,即更新最短路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void relax(Integer node) {
double nodeDist = distancesFromStart.get(node);
Collection<Integer> neighbors;

if (!this.probs.containsKey(node)) {
return; // node's neighbors are not known
}

neighbors = this.probs.get(node).getAllProbs().keySet();

for (Integer n : neighbors) {
if (visited.contains(n)) {
continue; // skip visited nodes
}

// n node's distance from path's source node
double nDist = nodeDist + getDistance(node, n);

if (distancesFromStart.get(n) > nDist) {
// stored distance > found dist -> update
prevNodes.put(n, node); // for debugging
setDistance(n, nDist);
}
}
}

可以看到,首先需要得到当前的neighbors,然后去检查是否其已经被访问过,否则进行相应的distance 的更新,然后去判断是否当前是最短路径,如果是,那么进行修改。实际上就是一个Dijkstra算法的具体情况下的实现。

MaxPropRouterWithEstimation

这个路由协议和MaxPropRouter最大的不同在于其通过参数估计(估计alpha)来找到一个合适的值,基于时间量程的定义(timescale definition),对于整个其它方面,其和MaxPropRouter具有相同的含义

其一个最重要的函数时updateParam()

updateParam()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
protected void updateParam()
{
double err = .01;
double ntarg = Math.ceil(timescale/meanIET);
double ee = 1;
double alphadiff = .1;
int ob = 0;
double fstable;
double fnzero;
double fnone;
double eezero;
double eeone;
double A;

/*
* the estimation algorith does not work for timescales
* shorter than the mean IET - so use defaults
*/
if (meanIET > (double)timescale) {
System.out.printf("meanIET %f > %d timescale\n",meanIET,timescale);
return;
}

if (meanIET == 0) {
System.out.printf("Mean IET == 0\n");
return;
}

if (meanENC == 0) {
System.out.printf("Mean ENC == 0\n");
return;
}

while (ee != err) {
A = Math.pow(1+alpha,meanENC+1);
fstable = alpha/(A-1);
fnzero = (alpha/A)*(1-Math.pow(A,-ntarg))/(1-1/A);
fnone = fnzero + 1/(Math.pow(A,ntarg));
eezero = Math.abs(fnzero-fstable);
eeone = Math.abs(fnone -fstable);
ee = Math.max(eezero,eeone);

if (ee > err ) {
if (ob == 2) {
alphadiff = alphadiff / 2.0;
}
ob = 1;
alpha = alpha+alphadiff;
} else {
if (ee < (err-err*0.001)) {
if (ob == 1) {
alphadiff = alphadiff / 2.0;
}
ob = 2;
alpha = alpha - alphadiff;

// double precision floating makes problems...
if ((alpha <= 0) | (((1 + alpha) - 1) == 0)) {
alpha = alphadiff;
alphadiff = alphadiff / 2.0;
ob = 0;
}
} else {
ee = err;
}
}
}
probs.setAlpha(alpha);
}

在这一段中,其主要的重点在于while循环中,即while(ee!=eer),总体而言,即通过对于meanENC 和 alphadiff 等参数来估计,最终设定所得到的alpha

ProphetRouter

实现的是PRoPHET 路由算法,同样基于的是ActiveRouter。

新增变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static final double P_INIT = 0.75;
public static final double DEFAULT_BETA = 0.25;
public static final double DEFAULT_GAMMA = 0.98;

public static final String PROPHET_NS = "ProphetRouter";
// 在time unit 中seconds 的数目,需要根据scenario进行相应的调整。
public static final String SECONDS_IN_UNIT_S ="secondsInTimeUnit";
//固定的传输比例
public static final String BETA_S = "beta";

// 相应的衰减
public static final String GAMMA_S = "gamma";

/** the value of nrof seconds in time unit -setting */
private int secondsInTimeUnit;
/** value of beta setting */
private double beta;
/** value of gamma setting */
private double gamma;

/** delivery predictabilities */
private Map<DTNHost, Double> preds;
/** last delivery predictability update (sim)time */
private double lastAgeUpdate;

总体而言,其主要增加了这样几个不同的量,即secondsInTimeUnit 以及 $\beta \quad \gamma$,以及最后增加的一个Map,和一个记录最后一次更新的时间。

updateDeliveryPredFor()

1
2
3
4
5
private void updateDeliveryPredFor(DTNHost host) {
double oldValue = getPredFor(host);
double newValue = oldValue + (1 - oldValue) * P_INIT;
preds.put(host, newValue);
}

可以看到,这是对于delivery preictions 的相应更新实际上是: $P(a,b) = P(a,b)_{old} + (1-P(a,b)_{old}) \times P_INT$

updateTransitivePreds()

关于transitive ,其具体是这样一种方式,即(A->B->C) 的传输,对于其相应的predictions的计算,具体为:$P(a,c) = P(a,c)_{old} + (1 - P(a,c)_{old}) P(a,b) P(b,c) * \beta$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void updateTransitivePreds(DTNHost host) {
MessageRouter otherRouter = host.getRouter();
assert otherRouter instanceof ProphetRouter : "PRoPHET only works " +
" with other routers of same type";

double pForHost = getPredFor(host); // P(a,b)
Map<DTNHost, Double> othersPreds =
((ProphetRouter)otherRouter).getDeliveryPreds();

for (Map.Entry<DTNHost, Double> e : othersPreds.entrySet()) {
if (e.getKey() == getHost()) {
continue; // don't add yourself
}

double pOld = getPredFor(e.getKey()); // P(a,c)_old
double pNew = pOld + ( 1 - pOld) * pForHost * e.getValue() * beta;
preds.put(e.getKey(), pNew);
}
}

可以看到,对于delivery 和 transitive的更新和相应的衰减(age),基于了两个相应的参数,即$\beta 和 \gamma$,以及$P_INT$

ProphetRouterWithEstimation

这个版本和上面所给出的ProphetRouter 的不同在于其尝试通过估计来得到一个好的值。 其具体实现函数为updateParam(),具体内容可以参考源码,可以通过其更新pinit,gamma

ProphetV2Router

关于这一个路由协议,可以具体参看:http://tools.ietf.org/html/draft-irtf-dtnrg-prophet-09

SprayAndWaitRouter

WaveRouter