技术开发 频道

二层(链路层)数据包发送过程分析

  【IT168 技术】当上层准备好一个包之后,交给链路层,链路层数据包发送主要通过dev_queue_xmit函数处理。数据包的发送可分为两种,一种是正常的传输流程,即通过网卡驱动,另一种是通过软中断(见注3)。为了理解方便,首先看一下dev_queue_xmi函数的整体调用关系图。(说明:本系列所涉及内核版本为2.6.32)

二层(链路层)数据包发送过程分析

  l dev_queue_xmit

  本函数用来将带发送的skb加入一个dev的队列(Queue),调用这个函数前必须设置好skb的device和priority,本函数可以在中断上下文中被调用。

  返回值:

  返回非0(正数或负数)表示函数出错,返回0表示成功,但是并不表示数据包被成功发送出去,因为数据包可能因为限速等原因被丢掉。

  函数执行后传入的skb将被释放,所以如果想控制数据包,实现对skb的重传时需要增加skb的引用计数。

  当调用此函数时中断必须是打开的,因为BH enable必须要求IRQ enable,否则会造成死锁。

    int dev_queue_xmit(struct sk_buff *skb)
    {
        struct net_device *dev = skb->dev;
        struct netdev_queue *txq;
        struct Qdisc *q;
        int rc = -ENOMEM;
        /* GSO will handle the following emulations directly. */
        if (netif_needs_gso(dev, skb))
            goto gso;
        if (skb_has_frags(skb) &&
            !(dev->features & NETIF_F_FRAGLIST) &&
            __skb_linearize(skb))
            goto out_kfree_skb;
    //如果skb有分片但是发送设备不支持分片,或分片中有分片在高端内存但发送设备不支持DMA,需要将所有段重新组合成一个段 ,这里__skb_linearize其实就是__pskb_pull_tail(skb, skb->data_len),这个函数基本上等同于pskb_may_pull ,pskb_may_pull的作用就是检测skb对应的主buf中是否有足够的空间来pull出len长度,如果不够就重新分配skb并将frags中的数据拷贝入新分配的主buff中,而这里将参数len设置为skb->datalen, 也就是会将所有的数据全部拷贝到主buff中,以这种方式完成skb的线性化。
        if (skb_shinfo(skb)->nr_frags &&
            (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&
            __skb_linearize(skb))
            goto out_kfree_skb;
    //如果数据包没有被计算校验和并且发送设备不支持这个协议的校验,则在此进行校验和的计算(注1)。如果上面已经线性化了一次,这里的__skb_linearize就会直接返回,注意区别frags和frag_list,前者是将多的数据放到单独分配的页面中,sk_buff只有一个。而后者则是连接多个sk_buff
        if (skb->ip_summed == CHECKSUM_PARTIAL) {
            skb_set_transport_header(skb, skb->csum_start -
                              skb_headroom(skb));
            if (!dev_can_checksum(dev, skb) && skb_checksum_help(skb))
                goto out_kfree_skb;
        }
    gso:
    //关闭软中断,禁止cpu抢占
        rcu_read_lock_bh();
    //选择一个发送队列,如果设备提供了select_queue回调函数就使用它,否则由内核选择一个队列,这里只是Linux内核多队列的实现,但是要真正的使用都队列,需要网卡支持多队列才可以,一般的网卡都只有一个队列。在调用alloc_etherdev分配net_device是,设置队列的个数
    txq = dev_pick_tx(dev, skb);
    // 从netdev_queue结构上获取设备的qdisc
        q = rcu_dereference(txq->qdisc);
    //如果该设备有队列可用,就调用__dev_xmit_skb
        if (q->enqueue) {
            rc = __dev_xmit_skb(skb, q, dev, txq);
            goto out;
        }
    //下面的处理是在没有发送队列的情况,软设备一般没有发送队列:如lo、tunnle;我们所要做的就是直接调用驱动的hard_start_xmit将它发送出去 如果发送失败就直接丢弃,因为没有队列可以保存它
        if (dev->flags & IFF_UP) { //确定设备是否开启
            int cpu = smp_processor_id(); /* ok because BHs are off */
            if (txq->xmit_lock_owner != cpu) {//是否在同一个cpu上
                HARD_TX_LOCK(dev, txq, cpu);
                if (!netif_tx_queue_stopped(txq)) {//确定队列是运行状态
                    rc = NET_XMIT_SUCCESS;
                    if (!dev_hard_start_xmit(skb, dev, txq)) {
                        HARD_TX_UNLOCK(dev, txq);
                        goto out;
                    }
                }
                HARD_TX_UNLOCK(dev, txq);
                if (net_ratelimit())
                    printk(KERN_CRIT "Virtual device %s asks to "
                           "queue packet!\n", dev->name);
            } else {// txq->xmit_lock_owner == cpu的情况,说明发生递归
                if (net_ratelimit())
                    printk(KERN_CRIT "Dead loop on virtual device "
                           "%s, fix it urgently!\n", dev->name);
            }
        }
        rc = -ENETDOWN;
        rcu_read_unlock_bh();
    out_kfree_skb:
        kfree_skb(skb);
        return rc;
    out:
        rcu_read_unlock_bh();
        return rc;
    }

  l __dev_xmit_skb

  __dev_xmit_skb函数主要做两件事情:

  (1)如果流控对象为空的,试图直接发送数据包。

  (2)如果流控对象不空,将数据包加入流控对象,并运行流控对象。

    static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                     struct net_device *dev,
                     struct netdev_queue *txq)
    {
        spinlock_t *root_lock = qdisc_lock(q);//见注2
        int rc;
        spin_lock(root_lock); //锁qdisc
        if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {//判断队列是否失效
            kfree_skb(skb);
            rc = NET_XMIT_DROP;
        } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
            /*
             * This is a work-conserving queue; there are no old skbs
             * waiting to be sent out; and the qdisc is not running -
             * xmit the skb directly.
             */
            __qdisc_update_bstats(q, skb->len);
            if (sch_direct_xmit(skb, q, dev, txq, root_lock))
                __qdisc_run(q);
            else
                clear_bit(__QDISC_STATE_RUNNING, &q->state);
            rc = NET_XMIT_SUCCESS;
        } else {
            rc = qdisc_enqueue_root(skb, q);
            qdisc_run(q);
        }
        spin_unlock(root_lock);
        return rc;
    }

  l qdisc_run

  有两个时机将会调用qdisc_run():

  1.__dev_xmit_skb()

  2. 软中断服务线程NET_TX_SOFTIRQ

    static inline void qdisc_run(struct Qdisc *q)
    {
        if (!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state))//将队列设置为运行状态
            __qdisc_run(q);
    }

  l __qdisc_run

    void __qdisc_run(struct Qdisc *q)
    {
        unsigned long start_time = jiffies;
        while (qdisc_restart(q)) { //返回值大于0,说明流控对象非空
            /*如果发现本队列运行的时间太长了,将会停止队列的运行,并将队列加入output_queue链表头
             * Postpone processing if (延迟处理)
             * 1. another process needs the CPU;
             * 2. we've been doing it for too long.
             */
            if (need_resched() || jiffies != start_time) { //已经不允许继续运行本流控对象
                __netif_schedule(q); //将本qdisc加入每cpu变量softnet_data的output_queue链表中
                break;
            }
        }
      //清除队列的运行标识
        clear_bit(__QDISC_STATE_RUNNING, &q->state);
    }

  循环调用qdisc_restart发送数据,下面这个函数qdisc_restart是真正发送数据包的函数,它从队列上取下一个帧,然后尝试将它发送出去,若发送失败则一般是重新入队。

  此函数返回值为:发送成功时返回剩余队列长度,发送失败时返回0(若发送成功且剩余队列长度为0也返回0)

  l qdisc_restart

  __QDISC_STATE_RUNNING状态保证同一时刻只有一个cpu在处理这个qdisc,qdisc_lock(q)用来保证对这个队列的顺序访问。

  通常netif_tx_lock用来确保本设备驱动的顺序(独占)访问的,qdisc_lock(q)用来保证qdisc的顺序访问,这两个是互斥的,获得其中一个必须释放另一个。

    static inline int qdisc_restart(struct Qdisc *q)
    {
        struct netdev_queue *txq;
        struct net_device *dev;
        spinlock_t *root_lock;
        struct sk_buff *skb;
        /* Dequeue packet */
        skb = dequeue_skb(q); //一开始就调用dequeue函数
        if (unlikely(!skb))
            return 0; //返回0说明队列是空的或者被限制
        root_lock = qdisc_lock(q);
        dev = qdisc_dev(q);
        txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
        return sch_direct_xmit(skb, q, dev, txq, root_lock); //用于发送数据包
    }

  l sch_direct_xmit

  发送一个skb,将队列置为__QDISC_STATE_RUNNING状态,保证只有一个cpu运行这个函数,返回0表示队列为空或者发送受限,大于0表示队列非空。

    int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
                struct net_device *dev, struct netdev_queue *txq,
                spinlock_t *root_lock)
    {
        int ret = NETDEV_TX_BUSY;
        spin_unlock(root_lock);// release qdisc,因为后面要获取设备锁
       // 调用__netif_tx_lockà spin_lock(&txq->_xmit_lock,,保证设备驱动的独占访问
        HARD_TX_LOCK(dev, txq, smp_processor_id());
        if (!netif_tx_queue_stopped(txq) && //设备没有被停止,且发送队列没有被冻结
            !netif_tx_queue_frozen(txq))
            ret = dev_hard_start_xmit(skb, dev, txq); //发送数据包
        HARD_TX_UNLOCK(dev, txq); // 调用__netif_tx_unlock
        spin_lock(root_lock);
        switch (ret) {
        case NETDEV_TX_OK: //如果设备成功将数据包发送出去
            ret = qdisc_qlen(q); //返回剩余的队列长度
            break;
        case NETDEV_TX_LOCKED: //获取设备锁失败
            ret = handle_dev_cpu_collision(skb, txq, q);
            break;
        default: //设备繁忙,重新入队发送(利用softirq)
            if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))
                printk(KERN_WARNING "BUG %s code %d qlen %d\n",
                       dev->name, ret, q->q.qlen);
            ret = dev_requeue_skb(skb, q);
            break;
        }
        if (ret && (netif_tx_queue_stopped(txq) ||
                netif_tx_queue_frozen(txq)))
            ret = 0;
        return ret;
    }

  l dev_hard_start_xmit

    int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
                struct netdev_queue *txq)
    {
        const struct net_device_ops *ops = dev->netdev_ops;
        int rc;
    if (likely(!skb->next)) {
    //从这里可以看出,对于每一个发送的包也会发给ptype_all一份, 而packet套接字创建时对于proto为ETH_P_ALL的会在ptype_all中注册一个成员,因此对于协议号为ETH_P_ALL的packet套接字来说,发送和接受的数据都能收到
            if (!list_empty(&ptype_all))
                dev_queue_xmit_nit(skb, dev);
            if (netif_needs_gso(dev, skb)) {
                if (unlikely(dev_gso_segment(skb)))
                    goto out_kfree_skb;
                if (skb->next)
                    goto gso;
            }
           //如果发送设备不需要skb->dst,则在此将其释放
            if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
                skb_dst_drop(skb);
           //调用设备注册的发送函数,即dev->netdev_ops-> ndo_start_xmit(skb, dev)
            rc = ops->ndo_start_xmit(skb, dev);
            if (rc == NETDEV_TX_OK)
    txq_trans_update(txq);
            return rc;
        }
    gso:
    ……
    }

  l dev_queue_xmit_nit

    static void dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev)
    {
        struct packet_type *ptype;
    #ifdef CONFIG_NET_CLS_ACT
        if (!(skb->tstamp.tv64 && (G_TC_FROM(skb->tc_verd) & AT_INGRESS)))
            net_timestamp(skb); //记录该数据包输入的时间戳
    #else
        net_timestamp(skb);
    #endif
        rcu_read_lock();
        list_for_each_entry_rcu(ptype, &ptype_all, list) {
            /* Never send packets back to the socket they originated from */
           //遍历ptype_all链表,查找所有符合输入条件的原始套接口,并循环将数据包输入到满足条件的套接口
            if ((ptype->dev == dev || !ptype->dev) &&
                (ptype->af_packet_priv == NULL ||
    (struct sock *)ptype->af_packet_priv != skb->sk)) {
           //由于该数据包是额外输入到这个原始套接口的,因此需要克隆一个数据包
                struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC);
                if (!skb2)
                    break;
                /* skb->nh should be correctly(确保头部偏移正确)
                   set by sender, so that the second statement is
                   just protection against buggy protocols.
                 */
                skb_reset_mac_header(skb2);
                if (skb_network_header(skb2) < skb2->data ||
                    skb2->network_header > skb2->tail) {
                    if (net_ratelimit())//net_ratelimit用来保证网络代码中printk的频率
                        printk(KERN_CRIT "protocol %04x is "
                               "buggy, dev %s\n",
                               skb2->protocol, dev->name);
                    skb_reset_network_header(skb2); //重新设置L3头部偏移
                }
                skb2->transport_header = skb2->network_header;
                skb2->pkt_type = PACKET_OUTGOING;
                ptype->func(skb2, skb->dev, ptype, skb->dev);//调用协议(ptype_all)接受函数
            }
        }
        rcu_read_unlock();
    }

  环回设备

  对于环回设备loopback,设备的ops->ndo_start_xmit被初始化为loopback_xmit函数。

    static const struct net_device_ops loopback_ops = {
        .ndo_init = loopback_dev_init,
        .ndo_start_xmit= loopback_xmit,
        .ndo_get_stats = loopback_get_stats,
    };

  drivers/net/loopback.c

    static netdev_tx_t loopback_xmit(struct sk_buff *skb,
                     struct net_device *dev)
    {
        struct pcpu_lstats *pcpu_lstats, *lb_stats;
        int len;
        skb_orphan(skb);
        skb->protocol = eth_type_trans(skb, dev);
        /* it's OK to use per_cpu_ptr() because BHs are off */
        pcpu_lstats = dev->ml_priv;
        lb_stats = per_cpu_ptr(pcpu_lstats, smp_processor_id());
        len = skb->len;
        if (likely(netif_rx(skb) == NET_RX_SUCCESS)) { //直接调用了netif_rx进行了接收处理
            lb_stats->bytes += len;
            lb_stats->packets++;
        } else
            lb_stats->drops++;
        return NETDEV_TX_OK;
    }

  注:

  1. CHECKSUM_PARTIAL表示使用硬件checksum ,L4层的伪头的校验已经完毕,并且已经加入uh->check字段中,此时只需要设备计算整个头4层头的校验值。

  2. 整个数据包发送逻辑中会涉及到三个用于互斥访问的代码:

  (1)spinlock_t *root_lock = qdisc_lock(q);

  (2)test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)

  (3)__netif_tx_lockà spin_lock(&txq->_xmit_lock)

  其中(1)(3)分别对应一个spinlock,(2)对应一个队列状态。在了解代码中如何使用这三个同步方法时,首先看一下相关数据结构的关系,如下。

二层(链路层)数据包发送过程分析

  图中绿色部分表示(1)(3)两处spinlock。首先看(1)处对应的代码:

    static inline spinlock_t *qdisc_lock(struct Qdisc *qdisc)
    {
        return &qdisc->q.lock;
    }

  所以root_lock是用于控制qdisc中skb队列访问的锁,当需要对skb队列进行enqueue、dequeue、requeue时,就需要加锁。

  __QDISC_STATE_RUNNING标志用于保证一个流控对象(qdisc)不会同时被多个cpu访问。

  而(3)处的spinlock,即struct netdev_queue中的_xmit_lock,则用于保证dev的注册函数的互斥访问,即deriver的同步。

  另外,内核代码注释中写到,(1)和(3)是互斥的,获得(1)处的锁时必须先保证释放(3)处的锁,反之亦然,为什么要这样还没有想明白。。。。哪位大神知道还望指点

  3. 已经有了dev_queue_xmit函数,为什么还需要软中断来发送呢?

  我们可以看到在dev_queue_xmit中将skb进行了一些处理(比如合并成一个包,计算校验和等),处理完的skb是可以直接发送的了,这时dev_queue_xmit也会先将skb入队(skb一般都是在这个函数中入队的),并且调用qdisc_run尝试发送,但是有可能发送失败,这时就将skb重新入队,调度软中断,并且自己直接返回。

  软中断只是发送队列中的skb以及释放已经发送的skb,它无需再对skb进行线性化或者校验和处理。另外在队列被停止的情况下,dev_queue_xmit仍然可以把包加入队列,但是不能发送,这样在队列被唤醒的时候就需要通过软中断来发送停止期间积压的包。简而言之,dev_queue_xmit是对skb做些最后的处理并且第一次尝试发送,软中断是将前者发送失败或者没发完的包发送出去。(其实发送软中断还有一个作用,就是释放已经发送的包,因为某些情况下发送是在硬件中断中完成的,为了提高硬件中断处理效率,内核提供一种方式将释放skb放到软中断中进行,这时只要调用dev_kfree_skb_irq,它将skb加入softnet_data的completion_queue中,然后开启发送软中断,net_tx_action会在软中断中将completion_queue中的skb全部释放掉)

原文地址:http://blog.chinaunix.net/uid-28541347-id-5613919.html

0
相关文章