Receiving Data

2022/01/27 kernel

1、Interrupt handler

1.1、Register an interrupt handler

在函数ixgbe_open() -> ixgbe_request_irq()中会尝试注册interrupt handler:

/**
 * ixgbe_request_irq - initialize interrupts
 * @adapter: board private structure
 *
 * Attempts to configure interrupts using the best available
 * capabilities of the hardware and kernel.
 **/
static int ixgbe_request_irq(struct ixgbe_adapter *adapter)
{
    struct net_device *netdev = adapter->netdev;
    int err;

    if (adapter->flags & IXGBE_FLAG_MSIX_ENABLED)
        err = ixgbe_request_msix_irqs(adapter);
    else if (adapter->flags & IXGBE_FLAG_MSI_ENABLED)
        err = request_irq(adapter->pdev->irq, ixgbe_intr, 0,
                  netdev->name, adapter);
    else
        err = request_irq(adapter->pdev->irq, ixgbe_intr, IRQF_SHARED,
                  netdev->name, adapter);

    if (err)
        e_err(probe, "request_irq failed, Error %d\n", err);

    return err;
}

目前可以注册三种类型的interrupt:

  • MSI-X interrupt mode;
  • MSI interrupt mode;
  • legacy interrupt mode;

MSI-X interrupt handler最终会执行函数ixgbe_msix_clean_rings。

1.2、Interrupt handler

当网卡接收到frame时,通过DMA将frame搬运到对应的rx ring,然后产生一个硬中断,调用函数:

static irqreturn_t ixgbe_msix_clean_rings(int irq, void *data)
{
    struct ixgbe_q_vector *q_vector = data;

    /* EIAM disabled interrupts (on this vector) for us */

    if (q_vector->rx.ring || q_vector->tx.ring)
        napi_schedule(&q_vector->napi);

    return IRQ_HANDLED;
}

之后再经过一系列函数调用napi_schedule() -> __napi_schedule() -> ____napi_schedule():

/**
 *  napi_schedule - schedule NAPI poll
 *  @n: napi context
 *
 * Schedule NAPI poll routine to be called if it is not already
 * running.
 */
static inline void napi_schedule(struct napi_struct *n)
{
    if (napi_schedule_prep(n))
        __napi_schedule(n);
}

/**
 * __napi_schedule - schedule for receive
 * @n: entry to schedule
 *
 * The entry's receive function will be scheduled to run
 */
void __napi_schedule(struct napi_struct *n)
{
    unsigned long flags;

    local_irq_save(flags);
    ____napi_schedule(&__get_cpu_var(softnet_data), n);
    local_irq_restore(flags);
}

/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
                     struct napi_struct *napi)
{
    list_add_tail(&napi->poll_list, &sd->poll_list);
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

函数____napi_schedule完成两件事:

  • napi_struct{}结构挂载到与当前CPU相关的softnet_data{}->poll_list链表中;
  • 触发NET_RX_SOFTIRQ softirq。

 

2、Softirq handler

2.1、net_rx_action

NET_RX_SOFTIRQ对应的软中断处理函数是net_rx_action:

static void net_rx_action(struct softirq_action *h)
{
    struct softnet_data *sd = &__get_cpu_var(softnet_data);
    unsigned long time_limit = jiffies + 2;
    int budget = netdev_budget;
    void *have;

    local_irq_disable();

    while (!list_empty(&sd->poll_list)) {
        struct napi_struct *n;
        int work, weight;

        /* If softirq window is exhuasted then punt.
         * Allow this to run for 2 jiffies since which will allow
         * an average latency of 1.5/HZ.
         */
        if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
            goto softnet_break;

        local_irq_enable();

        /* Even though interrupts have been re-enabled, this
         * access is safe because interrupts can only add new
         * entries to the tail of this list, and only ->poll()
         * calls can remove this head entry from the list.
         */
        n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);

        have = netpoll_poll_lock(n);

        weight = n->weight;

        /* This NAPI_STATE_SCHED test is for avoiding a race
         * with netpoll's poll_napi().  Only the entity which
         * obtains the lock and sees NAPI_STATE_SCHED set will
         * actually make the ->poll() call.  Therefore we avoid
         * accidentally calling ->poll() when NAPI is not scheduled.
         */
        work = 0;
        if (test_bit(NAPI_STATE_SCHED, &n->state)) {
            work = n->poll(n, weight);
            trace_napi_poll(n);
        }

        WARN_ON_ONCE(work > weight);

        budget -= work;

        local_irq_disable();

        /* Drivers must not modify the NAPI state if they
         * consume the entire weight.  In such cases this code
         * still "owns" the NAPI instance and therefore can
         * move the instance around on the list at-will.
         */
        if (unlikely(work == weight)) {
            if (unlikely(napi_disable_pending(n))) {
                local_irq_enable();
                napi_complete(n);
                local_irq_disable();
            } else {
                if (n->gro_list) {
                    /* flush too old packets
                     * If HZ < 1000, flush all packets.
                     */
                    local_irq_enable();
                    napi_gro_flush(n, HZ >= 1000);
                    local_irq_disable();
                }
                list_move_tail(&n->poll_list, &sd->poll_list);
            }
        }

        netpoll_poll_unlock(have);
    }
out:
    net_rps_action_and_irq_enable(sd);

    return;

softnet_break:
    sd->time_squeeze++;
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
    goto out;
}

while循环退出的条件有:

  1. 当前CPU所属的poll list中没有再挂载有napi_struct{}结构了;
  2. 剩下的budget变为小于等于0了;
  3. 2 jiffies的时限已经到了。

2.2、ixgbe_poll

加载ixgbe驱动的时候,按照ixgbe_probe() -> ixgbe_init_interrupt_scheme() -> ixgbe_alloc_q_vectors() -> ixgbe_alloc_q_vector()的调用关系,在函数ixgbe_alloc_q_vector()中注册函数指针napi_struct{}->poll:

    /* initialize NAPI */
    netif_napi_add(adapter->netdev, &q_vector->napi,
               ixgbe_poll, 64);

这里可以看到,napi_struct{}->weight被固定设置为64。

/**
 * ixgbe_poll - NAPI Rx polling callback
 * @napi: structure for representing this polling device
 * @budget: how many packets driver is allowed to clean
 *
 * This function is used for legacy and MSI, NAPI mode
 **/
int ixgbe_poll(struct napi_struct *napi, int budget)
{
    ...

    ixgbe_for_each_ring(ring, q_vector->rx)
        clean_complete &= ixgbe_clean_rx_irq(q_vector, ring,
                             per_ring_budget);

    /* If all work not completed, return budget and keep polling */
    if (!clean_complete)
        return budget;

    /* all work done, exit the polling mode */
    napi_complete(napi);
    if (adapter->rx_itr_setting & 1)
        ixgbe_set_itr(q_vector);
    if (!test_bit(__IXGBE_DOWN, &adapter->state))
        ixgbe_irq_enable_queues(adapter, ((u64)1 << q_vector->v_idx));

    return 0;
}

2.3、ixgbe_clean_rx_irq

/**
 * ixgbe_clean_rx_irq - Clean completed descriptors from Rx ring - bounce buf
 * @q_vector: structure containing interrupt and ring information
 * @rx_ring: rx descriptor ring to transact packets on
 * @budget: Total limit on number of packets to process
 *
 * This function provides a "bounce buffer" approach to Rx interrupt
 * processing.  The advantage to this is that on systems that have
 * expensive overhead for IOMMU access this provides a means of avoiding
 * it by maintaining the mapping of the page to the syste.
 *
 * Returns true if all work is completed without reaching budget
 **/
static bool ixgbe_clean_rx_irq(struct ixgbe_q_vector *q_vector,
                   struct ixgbe_ring *rx_ring,
                   const int budget)
{
    unsigned int total_rx_bytes = 0, total_rx_packets = 0;
    u16 cleaned_count = ixgbe_desc_unused(rx_ring);

    do {
        union ixgbe_adv_rx_desc *rx_desc;
        struct sk_buff *skb;

        /* return some buffers to hardware, one at a time is too slow */
        if (cleaned_count >= IXGBE_RX_BUFFER_WRITE) {
            ixgbe_alloc_rx_buffers(rx_ring, cleaned_count);
            cleaned_count = 0;
        }

        rx_desc = IXGBE_RX_DESC(rx_ring, rx_ring->next_to_clean);

        if (!ixgbe_test_staterr(rx_desc, IXGBE_RXD_STAT_DD))
            break;

        /*
         * This memory barrier is needed to keep us from reading
         * any other fields out of the rx_desc until we know the
         * RXD_STAT_DD bit is set
         */
        rmb();

        /* retrieve a buffer from the ring */
        skb = ixgbe_fetch_rx_buffer(rx_ring, rx_desc);

        /* exit if we failed to retrieve a buffer */
        if (!skb)
            break;

        cleaned_count++;

        /* place incomplete frames back on ring for completion */
        if (ixgbe_is_non_eop(rx_ring, rx_desc, skb))
            continue;

        /* verify the packet layout is correct */
        if (ixgbe_cleanup_headers(rx_ring, rx_desc, skb))
            continue;

        /* probably a little skewed due to removing CRC */
        total_rx_bytes += skb->len;

        /* populate checksum, timestamp, VLAN, and protocol */
        ixgbe_process_skb_fields(rx_ring, rx_desc, skb);

        ixgbe_rx_skb(q_vector, skb);

        /* update budget accounting */
        total_rx_packets++;
    } while (likely(total_rx_packets < budget));

    u64_stats_update_begin(&rx_ring->syncp);
    rx_ring->stats.packets += total_rx_packets;
    rx_ring->stats.bytes += total_rx_bytes;
    u64_stats_update_end(&rx_ring->syncp);
    q_vector->rx.total_packets += total_rx_packets;
    q_vector->rx.total_bytes += total_rx_bytes;

    if (cleaned_count)
        ixgbe_alloc_rx_buffers(rx_ring, cleaned_count);

    return (total_rx_packets < budget);
}

函数ixgbe_clean_rx_irq中的循环,一次处理一个接收报文,直至budget用完或者没有接收报文需要处理,循环所完成的工作有:

  1. 调用函数ixgbe_alloc_rx_buffers分配额外的接收缓冲区,接收IXGBE_RX_BUFFER_WRITE(16)个报文才会执行一次(首次除外),每次分配的数量为IXGBE_RX_BUFFER_WRITE(16);
  2. 获取rx descriptor,进而获取rx buffer,赋值给skb;
  3. 检查当前是否是个”End of Packet” buffer,如果是,就继续往下处理;如果不是,把它重新到rx queue的末尾;
  4. 调用函数ixgbe_cleanup_headers检查layout;
  5. 调用函数ixgbe_process_skb_fields,设置hash、checksum、timestamp、vlan、protocol等信息;
    • 如果硬件校验checksum成功,并且接收报文是TCP或者UDP,sk_buff{}->ip_summed会被设置为CHECKSUM_UNNECESSARY;
    • 调用函数eth_type_trans会获取L2 header中的protocol信息,然后记录到sk_buff{}->protocol中。
  6. 调用函数ixgbe_rx_skb,将skb上送给协议栈继续处理。
    • 一般情况下,函数ixgbe_rx_skb会直接调用函数napi_gro_receive。

2.4、napi_gro_receive

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
    skb_gro_reset_offset(skb);

    return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}

函数napi_gro_receive主要执行了两个函数:

  1. dev_gro_receive:处理GRO;

    Generic Receive Offloading (GRO) is a software implementation of a hardware optimization that is known as Large Receive Offloading (LRO).

  2. napi_skb_finish:调用函数netif_receive_skb继续处理,或者释放报文skb。

 

3、netif_receive_skb

如果不考虑RPS,函数netif_receive_skb只是简单地调用了函数__netif_receive_skb_core。

函数netif_receive_skb和之后调用的其它函数仍然是在softirq上下文中被处理的。

3.1、Packet tap delivery

这部分代码涉及捕包的实现:

static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
    ...
    list_for_each_entry_rcu(ptype, &ptype_all, list) {
        if (!ptype->dev || ptype->dev == skb->dev) {
            if (pt_prev)
                ret = deliver_skb(skb, pt_prev, orig_dev);
            pt_prev = ptype;
        }
    }
    ...
}

如果执行tcpdump,可以观察到如下的系统调用:

# strace tcpdump -i eth3
...
socket(PF_PACKET, SOCK_RAW, 768)        = 3
ioctl(3, SIOCGIFINDEX, {ifr_name="lo", ifr_index=1}) = 0
ioctl(3, SIOCGIFHWADDR, {ifr_name="eth3", ifr_hwaddr=0c:42:a1:10:65:cd}) = 0
ioctl(3, SIOCGIFINDEX, {ifr_name="eth3", ifr_index=11}) = 0
bind(3, {sa_family=AF_PACKET, proto=0x03, if11, pkttype=PACKET_HOST, addr(0)={0, }, 20) = 0
...

这里可以看到,创建socket时,family指定为PF_PACKET,type指定为SOCK_RAW,protocol指定为768(实际就是htons(ETH_P_ALL))。

追踪socket()系统调用过程,针对family指定为PF_PACKET时,会调用函数packet_create来完成特定的初始化工作:

/*
 *  Create a packet of type SOCK_PACKET.
 */

static int packet_create(struct net *net, struct socket *sock, int protocol,
             int kern)
{
    struct sock *sk;
    struct packet_sock *po;
    __be16 proto = (__force __be16)protocol; /* weird, but documented */
    int err;
    ...
    po = pkt_sk(sk);
    sk->sk_family = PF_PACKET;
    ...
    po->prot_hook.func = packet_rcv;
    ...
    if (proto) {
        po->prot_hook.type = proto;
        register_prot_hook(sk);
    }
    ...
}

继续追踪函数调用:

static void register_prot_hook(struct sock *sk)
{
    struct packet_sock *po = pkt_sk(sk);
    if (!po->running) {
        if (po->fanout)
            __fanout_link(sk, po);
        else
            dev_add_pack(&po->prot_hook);
        sock_hold(sk);
        po->running = 1;
    }
}

void dev_add_pack(struct packet_type *pt)
{
    struct list_head *head = ptype_head(pt);

    spin_lock(&ptype_lock);
    list_add_rcu(&pt->list, head);
    spin_unlock(&ptype_lock);
}

static inline struct list_head *ptype_head(const struct packet_type *pt)
{
    if (pt->type == htons(ETH_P_ALL))
        return &ptype_all;
    else
        return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}

最终会在以ptype_all为头部的链表中添加一个packet_type{}结构。

回到函数__netif_receive_skb_core的执行路径上,函数deliver_skb会执行之前注册的回调函数packet_type{}->func:

static inline int deliver_skb(struct sk_buff *skb,
                  struct packet_type *pt_prev,
                  struct net_device *orig_dev)
{
    if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
        return -ENOMEM;
    atomic_inc(&skb->users);
    return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}

对于捕包的场景,最终会执行函数packet_rcv。

3.2、Protocol layer delivery

这部分代码涉及一般报文的处理,由接收报文的协议(例如IPv4 / ARP)来决定后续的处理路径:

static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
    ...
    type = skb->protocol;
    list_for_each_entry_rcu(ptype,
            &ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
        if (ptype->type == type &&
            (ptype->dev == null_or_dev || ptype->dev == skb->dev ||
             ptype->dev == orig_dev)) {
            if (pt_prev)
                ret = deliver_skb(skb, pt_prev, orig_dev);
            pt_prev = ptype;
        }
    }

    if (pt_prev) {
        if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
            goto drop;
        else
            ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
    } else {
drop:
        atomic_long_inc(&skb->dev->rx_dropped);
        kfree_skb(skb);
        /* Jamal, now you will not able to escape explaining
         * me how you were going to use this. :-)
         */
        ret = NET_RX_DROP;
    }
    ...
}

和捕包类似,在函数inet_init的末尾会注册IPv4报文的处理路径:

static int __init inet_init(void)
{
    ...
    dev_add_pack(&ip_packet_type);
    ...
}

static struct packet_type ip_packet_type __read_mostly = {
    .type = cpu_to_be16(ETH_P_IP),
    .func = ip_rcv,
};

对于IPv4报文,之后内核会执行函数ip_rcv继续处理。

 

4、IP Layer

/*
 *  Main IP Receive routine.
 */
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
    ...
    return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL,
               ip_rcv_finish);
    ...
}

这里会涉及netfilter的第一个hook点 — NF_INET_PRE_ROUTING,如果netfilter未丢弃报文,会继续执行函数ip_rcv_finish。

static int ip_rcv_finish(struct sk_buff *skb)
{
    ...
    return dst_input(skb);
    ...
}

最后会执行dst_entry{}->input所指的函数,对这个函数指针的赋值来自于之前调用函数ip_route_input_noref,执行路由查找所完成的。对于目标地址是本机的,实际会设置为函数ip_local_deliver。

/*
 *  Deliver IP Packets to the higher protocol layers.
 */
int ip_local_deliver(struct sk_buff *skb)
{
    /*
     *  Reassemble IP fragments.
     */

    if (ip_is_fragment(ip_hdr(skb))) {
        if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
            return 0;
    }

    return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
               ip_local_deliver_finish);
}

这里会涉及netfilter的另一个hook点 — NF_INET_LOCAL_IN,如果netfilter未丢弃报文,会继续执行函数ip_local_deliver_finish。

static int ip_local_deliver_finish(struct sk_buff *skb)
{
    struct net *net = dev_net(skb->dev);

    __skb_pull(skb, ip_hdrlen(skb));

    /* Point into the IP datagram, just past the header. */
    skb_reset_transport_header(skb);

    rcu_read_lock();
    {
        int protocol = ip_hdr(skb)->protocol;
        const struct net_protocol *ipprot;
        int raw;

        ...
        ipprot = rcu_dereference(inet_protos[protocol]);
        if (ipprot != NULL) {
            int ret;

            ...
            ret = ipprot->handler(skb);
            if (ret < 0) {
                protocol = -ret;
                goto resubmit;
            }
            ...
        }
    }
 out:
    rcu_read_unlock();

    return 0;
}

最终会根据IP Header中所指定的L4 Layer协议,来执行net_protocol{}->handler所指的函数。

在函数inet_init中同样会注册几种L4 Layer协议报文的处理路径:

static int __init inet_init(void)
{
    ...
    if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
        pr_crit("%s: Cannot add ICMP protocol\n", __func__);
    if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
        pr_crit("%s: Cannot add UDP protocol\n", __func__);
    if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
        pr_crit("%s: Cannot add TCP protocol\n", __func__);
    ...
}

对于UDP报文,之后内核会执行函数udp_rcv继续处理;而对于TCP报文,之后内核会执行函数tcp_v4_rcv继续处理。

 

5、Links

  1. https://blog.packagecloud.io/monitoring-tuning-linux-networking-stack-receiving-data/

 

Search

    Table of Contents