Linux内核中流量控制(13)
本文档的Copyleft归yfydz所有,使用GPL发布,可以自由拷贝,转载,转载时请保持文档的完整性,严禁用于任何商业用途。
msn: yfydz_no1@hotmail.com
来源:http://yfydz.cublog.cn
5.11.9 入队static int htb_enqueue(struct sk_buff *skb, struct Qdisc *sch){ int ret;// HTB私有数据结构 struct htb_sched *q = qdisc_priv(sch);// 对数据包进行分类 struct htb_class *cl = htb_classify(skb, sch, &ret); if (cl == HTB_DIRECT) {// 分类结果是直接发送 /* enqueue to helper queue */// 如果直接发送队列中的数据包长度小于队列限制值, 将数据包添加到队列末尾 if (q->direct_queue.qlen < q->direct_qlen) { __skb_queue_tail(&q->direct_queue, skb); q->direct_pkts++; } else {// 否则丢弃数据包 kfree_skb(skb); sch->qstats.drops++; return NET_XMIT_DROP; }#ifdef CONFIG_NET_CLS_ACT// 定义了NET_CLS_ACT的情况(支持分类动作) } else if (!cl) {// 分类没有结果, 丢包 if (ret == NET_XMIT_BYPASS) sch->qstats.drops++; kfree_skb(skb); return ret;#endif// 有分类结果, 进行分类相关的叶子节点流控结构的入队操作 } else if (cl->un.leaf.q->enqueue(skb, cl->un.leaf.q) != NET_XMIT_SUCCESS) {// 入队不成功的话丢包 sch->qstats.drops++; cl->qstats.drops++; return NET_XMIT_DROP; } else {// 入队成功, 分类结构的包数字节数的统计数增加 cl->bstats.packets++; cl->bstats.bytes += skb->len;// 激活HTB类别, 建立该类别的数据提供树, 这样dequeue时可以从中取数据包// 只有类别节点的模式是可发送和可租借的情况下才会激活, 如果节点是阻塞// 模式, 则不会被激活 htb_activate(q, cl); }// HTB流控结构统计数更新, 入队成功 sch->q.qlen++; sch->bstats.packets++; sch->bstats.bytes += skb->len; return NET_XMIT_SUCCESS;} 大部分情况下数据包都不会进入直接处理队列, 而是进入各类别叶子节点, 因此入队的成功与否就在于叶子节点使用何种流控算法, 大都应该可以入队成功的, 入队不涉及类别节点模式的调整。5.11.10 重入队/* TODO: requeuing packet charges it to policers again !! */static int htb_requeue(struct sk_buff *skb, struct Qdisc *sch){// HTB私有数据结构 struct htb_sched *q = qdisc_priv(sch); int ret = NET_XMIT_SUCCESS;// 对数据包进行HTB分类 struct htb_class *cl = htb_classify(skb, sch, &ret); struct sk_buff *tskb;// 是直接处理或无类别数据包 if (cl == HTB_DIRECT || !cl) {// 如果当前直接队列没满而且是直接处理包, 就添加在直接处理队列表头 /* enqueue to helper queue */ if (q->direct_queue.qlen < q->direct_qlen && cl) { __skb_queue_head(&q->direct_queue, skb); } else {// 队列慢或者无类别包// 添加到队列头 __skb_queue_head(&q->direct_queue, skb);// 从队列尾取一个数据包丢弃 tskb = __skb_dequeue_tail(&q->direct_queue); kfree_skb(tskb); sch->qstats.drops++; return NET_XMIT_CN; }// 分类成功, 使用类别结构对应叶子流控节点的重入队操作 } else if (cl->un.leaf.q->ops->requeue(skb, cl->un.leaf.q) != NET_XMIT_SUCCESS) {// 重入队失败, 丢包 sch->qstats.drops++; cl->qstats.drops++; return NET_XMIT_DROP; } else// 重入队成功, 激活类别结构, 构造数据包提供树 htb_activate(q, cl);// 队列统计信息更新 sch->q.qlen++; sch->qstats.requeues++; return NET_XMIT_SUCCESS;} 5.11.11 出队HTB的出队是个非常复杂的处理过程, 函数调用过程为:htb_dequeue -> __skb_dequeue -> htb_do_events -> htb_safe_rb_erase -> htb_change_class_mode -> htb_add_to_wait_tree -> htb_dequeue_tree -> htb_lookup_leaf -> htb_deactivate -> q->dequeue -> htb_next_rb_node -> htb_charge_class -> htb_change_class_mode -> htb_safe_rb_erase -> htb_add_to_wait_tree -> htb_delay_by static struct sk_buff *htb_dequeue(struct Qdisc *sch){ struct sk_buff *skb = NULL;// HTB私有数据结构 struct htb_sched *q = qdisc_priv(sch); int level; long min_delay;// 保存当前时间滴答数 q->jiffies = jiffies; /* try to dequeue direct packets as high prio (!) to minimize cpu work */// 先从当前直接发送队列取数据包, 直接发送队列中的数据有最高优先级, 可以说没有流量限制 skb = __skb_dequeue(&q->direct_queue); if (skb != NULL) {// 取到数据包, 更新参数, 非阻塞, 返回 sch->flags &= ~TCQ_F_THROTTLED; sch->q.qlen--; return skb; }// 如果HTB流控结构队列长度为0, 返回空 if (!sch->q.qlen) goto fin;// 获取当前有效时间值 PSCHED_GET_TIME(q->now);// 最小延迟值初始化为最大整数 min_delay = LONG_MAX; q->nwc_hit = 0;// 遍历树的所有层次, 从叶子节点开始 for (level = 0; level < TC_HTB_MAXDEPTH; level++) { /* common case optimization - skip event handler quickly */ int m; long delay;// 计算延迟值, 是取数据包失败的情况下更新HTB定时器的延迟时间// 比较ROW树中该层节点最近的事件定时时间是否已经到了 if (time_after_eq(q->jiffies, q->near_ev_cache[level])) {// 时间到了, 处理HTB事件, 返回值是下一个事件的延迟时间 delay = htb_do_events(q, level);// 更新本层最近定时时间 q->near_ev_cache[level] = q->jiffies + (delay ? delay : HZ); } else// 时间还没到, 计算两者时间差 delay = q->near_ev_cache[level] - q->jiffies;// 更新最小延迟值, 注意这是在循环里面进行更新的, 循环找出最小的延迟时间 if (delay && min_delay > delay) min_delay = delay;// 该层次的row_mask取反, 实际是为找到row_mask[level]中为1的位, 为1表示该树有数据包可用 m = ~q->row_mask[level]; while (m != (int)(-1)) {// m的数据位中第一个0位的位置作为优先级值, 从低位开始找, 也就是prio越小, 实际数据的// 优先权越大, 越先出队 int prio = ffz(m);// 将该0位设置为1, 也就是清除该位 m |= 1 << prio;// 从该优先权值的流控树中进行出队操作// HTB的流控就在该函数中体现 skb = htb_dequeue_tree(q, prio, level); if (likely(skb != NULL)) {// 数据包出队成功, 更新参数, 退出循环, 返回数据包// 取数据包成功就要去掉流控节点的阻塞标志 sch->q.qlen--; sch->flags &= ~TCQ_F_THROTTLED; goto fin; } } }// 循环结束也没取到数据包, 队列长度非0却不能取出数据包, 表示流控节点阻塞// 进行阻塞处理, 调整HTB定时器, 最大延迟5秒 htb_delay_by(sch, min_delay > 5 * HZ ? 5 * HZ : min_delay);fin: return skb;}/* dequeues packet at given priority and level; call only if you are sure that there is active class at prio/level */// 从指定的层次和优先权的RB树节点中取数据包static struct sk_buff *htb_dequeue_tree(struct htb_sched *q, int prio, int level){ struct sk_buff *skb = NULL; struct htb_class *cl, *start; /* look initial class up in the row */// 根据层次和优先权值查找起始类别节点 start = cl = htb_lookup_leaf(q->row[level] + prio, prio, q->ptr[level] + prio, q->last_ptr_id[level] + prio); do {next: BUG_TRAP(cl);// 如果类别为空, 返回数据包为空 if (!cl) return NULL; /* class can be empty - it is unlikely but can be true if leaf qdisc drops packets in enqueue routine or if someone used graft operation on the leaf since last dequeue; simply deactivate and skip such class */// 如果队列长度为0, 队列空的情况, 可能性较小 if (unlikely(cl->un.leaf.q->q.qlen == 0)) { struct htb_class *next;// 该类别队列中没数据包了, 停止该类别结构 htb_deactivate(q, cl); /* row/level might become empty */// 掩码该位为0, 表示该层该prio的rb树为空, 没有数据提供树, 返回数据包为空 if ((q->row_mask[level] & (1 << prio)) == 0) return NULL;// 否则重新查找该层该优先权的RB树 next = htb_lookup_leaf(q->row[level] + prio, prio, q->ptr[level] + prio, q->last_ptr_id[level] + prio);// 从新找到的这个类别结构cl开始循环, 找队列非空的节点 if (cl == start) /* fix start if we just deleted it */ start = next; cl = next;// 这个goto形成了大循环中的小循环, 找队列长度非空的类别节点 goto next; }// 以下是队列长度非空的情况, 运行该类别结构的内部流控节点的出队操作,// 这主要看该节点使用那种流控算法了, 如tbf之类就可以实现流量限制 skb = cl->un.leaf.q->dequeue(cl->un.leaf.q);// 取得数据包, 中断循环准备返回 if (likely(skb != NULL)) break;// 没取得数据包, 打印警告信息, 该信息在循环中只打印一次 if (!cl->warned) { printk(KERN_WARNING "htb: class %X isn't work conserving ?!\n", cl->classid);// 作为已经打印了警告信息的标志 cl->warned = 1; }// 取到空包计数增加, 表示从非工作类别中取数据包的异常情况次数 q->nwc_hit++;// 更新到下一个rb树节点 htb_next_rb_node((level ? cl->parent->un.inner.ptr : q-> ptr[0]) + prio);// 继续查找该层该优先权的RB树中找叶子类别节点, 循环 cl = htb_lookup_leaf(q->row[level] + prio, prio, q->ptr[level] + prio, q->last_ptr_id[level] + prio);// 当找到的新节点不是起始节点就进行循环直到取得数据包, 当遍历完后会又回到start节点// 而中断循环 } while (cl != start); if (likely(skb != NULL)) {// 找到数据包的情况, 可能性很大// 计算赤字deficit, 减数据包长度, 而deficit是初始化为0的 if ((cl->un.leaf.deficit[level] -= skb->len) < 0) {// 如果该类别节点的赤字为负, 增加一个定额量, 缺省是物理网卡的队列长度 cl->un.leaf.deficit[level] += cl->un.leaf.quantum;// 更新到下一个rb树节点, 如果是中间节点, 则更新父节点的内部结构中的指针, 否则// 从流控结构中更新, 实现同一类别树中不同类别类别节点的转换, 不会一直限制在一个// 节点 htb_next_rb_node((level ? cl->parent->un.inner.ptr : q-> ptr[0]) + prio); }// 如果赤字为正就不会进行RB数节点的更换 /* this used to be after charge_class but this constelation gives us slightly better performance */// 如果队列空了, 停止该类别 if (!cl->un.leaf.q->q.qlen) htb_deactivate(q, cl);// 处理该流控节点以及其所有父节点的令牌情况, 调整该类别的模式cmode htb_charge_class(q, cl, level, skb->len); } return skb;}/** * htb_lookup_leaf - returns next leaf class in DRR order * * Find leaf where current feed pointers points to. */// 查找叶子分类节点static struct htb_class *htb_lookup_leaf(struct rb_root *tree, int prio, struct rb_node **pptr, u32 * pid){ int i; struct {// 根节点 struct rb_node *root;// 父节点地址 struct rb_node **pptr;// 父节点ID u32 *pid;// 8个数组元素 } stk[TC_HTB_MAXDEPTH], *sp = stk; BUG_TRAP(tree->rb_node);// sp是stk[]数组第0号元素指针, 初始化 sp->root = tree->rb_node; sp->pptr = pptr; sp->pid = pid;// 64K次的循环, 为什么是64K呢 for (i = 0; i < 65535; i++) {// 父节点为空, 可父ID非0, 重新查找父节点 if (!*sp->pptr && *sp->pid) { /* ptr was invalidated but id is valid - try to recover the original or next ptr */ *sp->pptr = htb_id_find_next_upper(prio, sp->root, *sp->pid); }// 父ID清零 *sp->pid = 0; /* ptr is valid now so that remove this hint as it can become out of date quickly */// 如果父节点还是为空 if (!*sp->pptr) { /* we are at right end; rewind & go up */// 父节点设置为根节点 *sp->pptr = sp->root;// 父节点设置为最下层的左叶子节点 while ((*sp->pptr)->rb_left) *sp->pptr = (*sp->pptr)->rb_left;// 如果不再是数组的0元素, 这是下面代码执行过的情况 if (sp > stk) {// 移到前一元素 sp--; BUG_TRAP(*sp->pptr);// 如果该元素的父节点为空, 返回空, 这里是循环出口1 if (!*sp->pptr) return NULL;// pptr更新为下一个节点 htb_next_rb_node(sp->pptr); } } else { struct htb_class *cl;// 提取父节点中的第prio号节点对应的HTB类别结构 cl = rb_entry(*sp->pptr, struct htb_class, node[prio]);// 如果是叶子节点, 返回, 这里是循环出口2 if (!cl->level) return cl;// 移动到stk数组的下一项// 用该HTB类别结构参数来初始化该数组项的参数重新循环 (++sp)->root = cl->un.inner.feed[prio].rb_node; sp->pptr = cl->un.inner.ptr + prio; sp->pid = cl->un.inner.last_ptr_id + prio; } }// 循环结束也没找到合适节点, 返回空 BUG_TRAP(0); return NULL;} /** * htb_charge_class - charges amount "bytes" to leaf and ancestors * * Routine assumes that packet "bytes" long was dequeued from leaf cl * borrowing from "level". It accounts bytes to ceil leaky bucket for * leaf and all ancestors and to rate bucket for ancestors at levels * "level" and higher. It also handles possible change of mode resulting * from the update. Note that mode can also increase here (MAY_BORROW to * CAN_SEND) because we can use more precise clock that event queue here. * In such case we remove class from event queue first. */// 关于类别节点的令牌和缓冲区数据的更新计算, 调整类别节点的模式static void htb_charge_class(struct htb_sched *q, struct htb_class *cl, int level, int bytes){ long toks, diff; enum htb_cmode old_mode;// HTB统计宏, T: 令牌数; B: 缓存; R: 速率// 计算令牌数#define HTB_ACCNT(T,B,R) toks = diff + cl->T; \ if (toks > cl->B) toks = cl->B; \ toks -= L2T(cl, cl->R, bytes); \ if (toks <= -cl->mbuffer) toks = 1-cl->mbuffer; \ cl->T = toks// 循环向上到根节点 while (cl) {// 时间间隔 diff = PSCHED_TDIFF_SAFE(q->now, cl->t_c, (u32) cl->mbuffer); if (cl->level >= level) {// 类别层次高的// 借出增加 if (cl->level == level) cl->xstats.lends++;// 计算普通令牌 HTB_ACCNT(tokens, buffer, rate); } else {// 类别层次低// 借入增加 cl->xstats.borrows++;// 令牌增加 cl->tokens += diff; /* we moved t_c; update tokens */ }// 计算C令牌 HTB_ACCNT(ctokens, cbuffer, ceil); cl->t_c = q->now;// 保存类别节点原来的模式 old_mode = cl->cmode; diff = 0;// 根据新的令牌,缓冲区数来更新类别节点的模式, 因为前面diff数据已经在令牌中修改过了// 所以现在diff输入值设为0了, 函数结束, 类别模式不是可发送时, diff中保存当前令牌数// 的负值 htb_change_class_mode(q, cl, &diff);// 如果类别模式发生了变化 if (old_mode != cl->cmode) {// 如果老模式不是可以直接发送的模式(HTB_CAN_SEND), 说明在等待RB树中, 要从该RB树中删除 if (old_mode != HTB_CAN_SEND) htb_safe_rb_erase(&cl->pq_node, q->wait_pq + cl->level);// 如果当前新模式不是可以直接发送的模式(HTB_CAN_SEND), 挂接到合适的等待RB树 if (cl->cmode != HTB_CAN_SEND) htb_add_to_wait_tree(q, cl, diff); }#ifdef HTB_RATECM// 更新速率计数器: 包数, 字节数 /* update rate counters */ cl->sum_bytes += bytes; cl->sum_packets++;#endif /* update byte stats except for leaves which are already updated */// 如果是中间节点, 更新其统计值, 因为对于叶子节点已经在数据包出队时处理过了 if (cl->level) { cl->bstats.bytes += bytes; cl->bstats.packets++; } cl = cl->parent; }} /** * htb_change_class_mode - changes classe's mode * * This should be the only way how to change classe's mode under normal * cirsumstances. Routine will update feed lists linkage, change mode * and add class to the wait event queue if appropriate. New mode should * be different from old one and cl->pq_key has to be valid if changing * to mode other than HTB_CAN_SEND (see htb_add_to_wait_tree). */// 调整类别节点的发送模式static voidhtb_change_class_mode(struct htb_sched *q, struct htb_class *cl, long *diff){// 根据变化值计算新模式 enum htb_cmode new_mode = htb_class_mode(cl, diff);// 模式没变, 返回 if (new_mode == cl->cmode) return;// cl->prio_activity非0表示是活动的节点, 需要停止后再更新模式 if (cl->prio_activity) { /* not necessary: speed optimization */// 如原来的模式可发送数据, 先停该节点 if (cl->cmode != HTB_CANT_SEND) htb_deactivate_prios(q, cl);// 更新模式 cl->cmode = new_mode;// 如果新模式不是禁止发送, 重新激活节点 if (new_mode != HTB_CANT_SEND) htb_activate_prios(q, cl); } else// 非活动类别节点, 直接更新模式值 cl->cmode = new_mode;}/** * htb_class_mode - computes and returns current class mode * * It computes cl's mode at time cl->t_c+diff and returns it. If mode * is not HTB_CAN_SEND then cl->pq_key is updated to time difference * from now to time when cl will change its state. * Also it is worth to note that class mode doesn't change simply * at cl->{c,}tokens == 0 but there can rather be hysteresis of * 0 .. -cl->{c,}buffer range. It is meant to limit number of * mode transitions per time unit. The speed gain is about 1/6. */// 计算类别节点的模式static inline enum htb_cmodehtb_class_mode(struct htb_class *cl, long *diff){ long toks;// 计算类别的Ceil令牌 if ((toks = (cl->ctokens + *diff)) < htb_lowater(cl)) {// 如果令牌小于低限, 模式为不可发送 *diff = -toks; return HTB_CANT_SEND; }// 计算类别的普通令牌// 如果令牌大于高限, 模式为可发送 if ((toks = (cl->tokens + *diff)) >= htb_hiwater(cl)) return HTB_CAN_SEND; *diff = -toks;// 否则模式为可借 return HTB_MAY_BORROW;}/** * htb_add_to_wait_tree - adds class to the event queue with delay * * The class is added to priority event queue to indicate that class will * change its mode in cl->pq_key microseconds. Make sure that class is not * already in the queue. */// 将类别节点添加到等待树static void htb_add_to_wait_tree(struct htb_sched *q, struct htb_class *cl, long delay){// p初始化为该层等待数的根节点 struct rb_node **p = &q->wait_pq[cl->level].rb_node, *parent = NULL;// 事件延迟时间, 增加延迟时间delay转换的jiffie数 cl->pq_key = q->jiffies + PSCHED_US2JIFFIE(delay);// 如果delay为0, 至少增加一个jiffie时间 if (cl->pq_key == q->jiffies) cl->pq_key++; /* update the nearest event cache */// near_ev_cache[]中保存最近一个事件的发生时间, 如果新计算出的pq_key比near_ev_cache[]// 时间更近, 则near_ev_cache[]更新位pq_key指示的事件时间 if (time_after(q->near_ev_cache[cl->level], cl->pq_key)) q->near_ev_cache[cl->level] = cl->pq_key;// 根据延迟时间将类别节点插入到延迟RB树的合适的位置, 该树根据时间发生时间进行排序// 早发生的走左节点, 晚发生的走右节点 while (*p) {// 循环查找合适的插入点 struct htb_class *c; parent = *p; c = rb_entry(parent, struct htb_class, pq_node); if (time_after_eq(cl->pq_key, c->pq_key)) p = &parent->rb_right; else p = &parent->rb_left; }// 将类别cl插入等待树 rb_link_node(&cl->pq_node, parent, p); rb_insert_color(&cl->pq_node, &q->wait_pq[cl->level]);} /** * htb_do_events - make mode changes to classes at the level * * Scans event queue for pending events and applies them. Returns jiffies to * next pending event (0 for no event in pq). * Note: Aplied are events whose have cl->pq_key <= jiffies. */// 对第level号等待树的类别节点进行模式调整static long htb_do_events(struct htb_sched *q, int level){ int i;// 循环500次, 为什么是500? 表示树里最多有500个节点? 对应500个事件 for (i = 0; i < 500; i++) { struct htb_class *cl; long diff;// 取等待rb树第一个节点, 每次循环都从树中删除一个节点 struct rb_node *p = rb_first(&q->wait_pq[level]);// 没节点, 事件空, 返回0表示没延迟 if (!p) return 0;// 获取该节点对应的HTB类别 cl = rb_entry(p, struct htb_class, pq_node);// 该类别延迟处理时间是在当前时间后面, 返回时间差作为延迟值 if (time_after(cl->pq_key, q->jiffies)) { return cl->pq_key - q->jiffies; }// 时间小于当前时间了, 已经超时了// 安全地将该节点从等待RB树中断开 htb_safe_rb_erase(p, q->wait_pq + level);// 当前时间和检查点时间的差值 diff = PSCHED_TDIFF_SAFE(q->now, cl->t_c, (u32) cl->mbuffer);// 根据时间差值更改该类别模式 htb_change_class_mode(q, cl, &diff);// 如果不是可发送模式, 重新插入回等待树 if (cl->cmode != HTB_CAN_SEND) htb_add_to_wait_tree(q, cl, diff); }// 超过500个事件, if (net_ratelimit()) printk(KERN_WARNING "htb: too many events !\n");// 返回0.1秒的延迟 return HZ / 10;}// HTB延迟处理static void htb_delay_by(struct Qdisc *sch, long delay){ struct htb_sched *q = qdisc_priv(sch);// 延迟至少是1个时间片, 1/HZ秒 if (delay <= 0) delay = 1;// 延迟最大是5秒 if (unlikely(delay > 5 * HZ)) { if (net_ratelimit()) printk(KERN_INFO "HTB delay %ld > 5sec\n", delay); delay = 5 * HZ; }// 修改定时器 /* why don't use jiffies here ? because expires can be in past */ mod_timer(&q->timer, q->jiffies + delay);// 设置阻塞标志 sch->flags |= TCQ_F_THROTTLED;// 流量溢出统计增加 sch->qstats.overlimits++;}HTB的流控就体现在通过令牌变化情况计算类别节点的模式, 如果是CAN_SEND就能继续发送数据, 该节点可以留在数据包提供树中; 如果阻塞CANT_SEND该类别节点就会从数据包提供树中拆除而不能发包; 如果模式是CAN_BORROW的则根据其他节点的带宽情况来确定是否能留在数据包提供树而继续发包, 不在提供树中, 即使有数据包也只能阻塞着不能发送, 这样就实现了流控管理。为理解分类型流控算法的各种参数的含义, 最好去看一下RFC 3290。...... 待续 ......