noq_udp/
unix.rs

1#[cfg(not(any(apple, target_os = "openbsd", solarish)))]
2use std::ptr;
3use std::{
4    io::{self, IoSliceMut},
5    mem::{self, MaybeUninit},
6    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
7    num::NonZeroUsize,
8    os::unix::io::AsRawFd,
9    sync::{
10        Mutex,
11        atomic::{AtomicBool, AtomicUsize, Ordering},
12    },
13    time::Instant,
14};
15
16use socket2::SockRef;
17
18use super::{
19    EcnCodepoint, IO_ERROR_LOG_INTERVAL, RecvMeta, Transmit, UdpSockRef, cmsg, log_sendmsg_error,
20};
21
22// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h
23#[cfg(apple_fast)]
24#[repr(C)]
25#[allow(non_camel_case_types)]
26pub(crate) struct msghdr_x {
27    pub msg_name: *mut libc::c_void,
28    pub msg_namelen: libc::socklen_t,
29    pub msg_iov: *mut libc::iovec,
30    pub msg_iovlen: libc::c_int,
31    pub msg_control: *mut libc::c_void,
32    pub msg_controllen: libc::socklen_t,
33    pub msg_flags: libc::c_int,
34    pub msg_datalen: usize,
35}
36
37#[cfg(apple_fast)]
38unsafe extern "C" {
39    fn recvmsg_x(
40        s: libc::c_int,
41        msgp: *const msghdr_x,
42        cnt: libc::c_uint,
43        flags: libc::c_int,
44    ) -> isize;
45
46    fn sendmsg_x(
47        s: libc::c_int,
48        msgp: *const msghdr_x,
49        cnt: libc::c_uint,
50        flags: libc::c_int,
51    ) -> isize;
52}
53
54#[cfg(target_os = "freebsd")]
55type IpTosTy = libc::c_uchar;
56#[cfg(not(any(target_os = "freebsd", target_os = "netbsd")))]
57type IpTosTy = libc::c_int;
58
59/// Tokio-compatible UDP socket with some useful specializations.
60///
61/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some
62/// platforms.
63#[derive(Debug)]
64pub struct UdpSocketState {
65    last_send_error: Mutex<Instant>,
66    max_gso_segments: AtomicUsize,
67    gro_segments: NonZeroUsize,
68    may_fragment: bool,
69
70    /// True if we have received EINVAL error from `sendmsg` system call at least once.
71    ///
72    /// If enabled, we assume that old kernel is used and switch to fallback mode.
73    /// In particular, we do not use IP_TOS cmsg_type in this case,
74    /// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
75    sendmsg_einval: AtomicBool,
76}
77
78impl UdpSocketState {
79    pub fn new(sock: UdpSockRef<'_>) -> io::Result<Self> {
80        let io = sock.0;
81        let mut cmsg_platform_space = 0;
82        if cfg!(target_os = "linux")
83            || cfg!(bsd)
84            || cfg!(apple)
85            || cfg!(target_os = "android")
86            || cfg!(solarish)
87        {
88            cmsg_platform_space +=
89                unsafe { libc::CMSG_SPACE(mem::size_of::<libc::in6_pktinfo>() as _) as usize };
90        }
91
92        assert!(
93            CMSG_LEN
94                >= unsafe { libc::CMSG_SPACE(mem::size_of::<libc::c_int>() as _) as usize }
95                    + cmsg_platform_space
96        );
97        assert!(
98            mem::align_of::<libc::cmsghdr>() <= mem::align_of::<cmsg::Aligned<[u8; 0]>>(),
99            "control message buffers will be misaligned"
100        );
101
102        io.set_nonblocking(true)?;
103
104        let addr = io.local_addr()?;
105        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
106
107        // mac and ios do not support IP_RECVTOS on dual-stack sockets :(
108        // older macos versions also don't have the flag and will error out if we don't ignore it
109        #[cfg(not(any(
110            target_os = "openbsd",
111            target_os = "netbsd",
112            target_os = "dragonfly",
113            solarish
114        )))]
115        if (is_ipv4 || !io.only_v6()?)
116            && let Err(_err) =
117                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON)
118        {
119            crate::log::debug!("Ignoring error setting IP_RECVTOS on socket: {_err:?}");
120        }
121
122        let mut may_fragment = false;
123        #[cfg(any(target_os = "linux", target_os = "android"))]
124        {
125            // opportunistically try to enable GRO. See gro::gro_segments().
126            let _ = set_socket_option(&*io, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON);
127
128            // Forbid IPv4 fragmentation. Set even for IPv6 to account for IPv6 mapped IPv4 addresses.
129            // Set `may_fragment` to `true` if this option is not supported on the platform.
130            may_fragment |= !set_socket_option_supported(
131                &*io,
132                libc::IPPROTO_IP,
133                libc::IP_MTU_DISCOVER,
134                libc::IP_PMTUDISC_PROBE,
135            )?;
136
137            if is_ipv4 {
138                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?;
139            } else {
140                // Set `may_fragment` to `true` if this option is not supported on the platform.
141                may_fragment |= !set_socket_option_supported(
142                    &*io,
143                    libc::IPPROTO_IPV6,
144                    libc::IPV6_MTU_DISCOVER,
145                    libc::IPV6_PMTUDISC_PROBE,
146                )?;
147            }
148        }
149        #[cfg(any(target_os = "freebsd", apple))]
150        {
151            if is_ipv4 {
152                // Set `may_fragment` to `true` if this option is not supported on the platform.
153                may_fragment |= !set_socket_option_supported(
154                    &*io,
155                    libc::IPPROTO_IP,
156                    libc::IP_DONTFRAG,
157                    OPTION_ON,
158                )?;
159            }
160        }
161        #[cfg(any(bsd, apple, solarish))]
162        // IP_RECVDSTADDR == IP_SENDSRCADDR on FreeBSD
163        // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS (the same on Solaris)
164        // macOS also supports IP_PKTINFO
165        {
166            if is_ipv4 {
167                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, OPTION_ON)?;
168            }
169        }
170
171        // Options standardized in RFC 3542
172        if !is_ipv4 {
173            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, OPTION_ON)?;
174            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, OPTION_ON)?;
175            // Linux's IP_PMTUDISC_PROBE allows us to operate under interface MTU rather than the
176            // kernel's path MTU guess, but actually disabling fragmentation requires this too. See
177            // __ip6_append_data in ip6_output.c.
178            // Set `may_fragment` to `true` if this option is not supported on the platform.
179            may_fragment |= !set_socket_option_supported(
180                &*io,
181                libc::IPPROTO_IPV6,
182                libc::IPV6_DONTFRAG,
183                OPTION_ON,
184            )?;
185        }
186
187        let now = Instant::now();
188        Ok(Self {
189            last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
190            max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
191            gro_segments: gro::gro_segments(),
192            may_fragment,
193            sendmsg_einval: AtomicBool::new(false),
194        })
195    }
196
197    /// Sends a [`Transmit`] on the given socket.
198    ///
199    /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`].
200    /// All other errors will be logged and converted to `Ok`.
201    ///
202    /// UDP transmission errors are considered non-fatal because higher-level protocols must
203    /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature.
204    /// Thus, logging is most likely the only thing you can do with these errors.
205    ///
206    /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`]
207    /// instead.
208    pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
209        match send(self, socket.0, transmit) {
210            Ok(()) => Ok(()),
211            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
212            // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
213            //   these by automatically clamping the MTUD upper bound to the interface MTU.
214            Err(e) if e.raw_os_error() == Some(libc::EMSGSIZE) => Ok(()),
215            Err(e) => {
216                log_sendmsg_error(&self.last_send_error, e, transmit);
217
218                Ok(())
219            }
220        }
221    }
222
223    /// Sends a [`Transmit`] on the given socket without any additional error handling.
224    pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
225        send(self, socket.0, transmit)
226    }
227
228    pub fn recv(
229        &self,
230        socket: UdpSockRef<'_>,
231        bufs: &mut [IoSliceMut<'_>],
232        meta: &mut [RecvMeta],
233    ) -> io::Result<usize> {
234        recv(socket.0, bufs, meta)
235    }
236
237    /// The maximum amount of segments which can be transmitted if a platform
238    /// supports Generic Send Offload (GSO).
239    ///
240    /// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
241    /// while using GSO.
242    #[inline]
243    pub fn max_gso_segments(&self) -> NonZeroUsize {
244        self.max_gso_segments
245            .load(Ordering::Relaxed)
246            .try_into()
247            .expect("must have non zero GSO segments")
248    }
249
250    /// The number of segments to read when GRO is enabled. Used as a factor to
251    /// compute the receive buffer size.
252    ///
253    /// Returns 1 if the platform doesn't support GRO.
254    #[inline]
255    pub fn gro_segments(&self) -> NonZeroUsize {
256        self.gro_segments
257    }
258
259    /// Resize the send buffer of `socket` to `bytes`
260    #[inline]
261    pub fn set_send_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
262        socket.0.set_send_buffer_size(bytes)
263    }
264
265    /// Resize the receive buffer of `socket` to `bytes`
266    #[inline]
267    pub fn set_recv_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
268        socket.0.set_recv_buffer_size(bytes)
269    }
270
271    /// Get the size of the `socket` send buffer
272    #[inline]
273    pub fn send_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
274        socket.0.send_buffer_size()
275    }
276
277    /// Get the size of the `socket` receive buffer
278    #[inline]
279    pub fn recv_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
280        socket.0.recv_buffer_size()
281    }
282
283    /// Whether transmitted datagrams might get fragmented by the IP layer
284    ///
285    /// Returns `false` on targets which employ e.g. the `IPV6_DONTFRAG` socket option.
286    #[inline]
287    pub fn may_fragment(&self) -> bool {
288        self.may_fragment
289    }
290
291    /// Returns true if we previously got an EINVAL error from `sendmsg` syscall.
292    fn sendmsg_einval(&self) -> bool {
293        self.sendmsg_einval.load(Ordering::Relaxed)
294    }
295
296    /// Sets the flag indicating we got EINVAL error from `sendmsg` syscall.
297    #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
298    fn set_sendmsg_einval(&self) {
299        self.sendmsg_einval.store(true, Ordering::Relaxed)
300    }
301}
302
303#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
304fn send(
305    #[allow(unused_variables)] // only used on Linux
306    state: &UdpSocketState,
307    io: SockRef<'_>,
308    transmit: &Transmit<'_>,
309) -> io::Result<()> {
310    #[allow(unused_mut)] // only mutable on FreeBSD
311    let mut encode_src_ip = true;
312    #[cfg(target_os = "freebsd")]
313    {
314        let addr = io.local_addr()?;
315        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
316        if is_ipv4 {
317            if let Some(socket) = addr.as_socket_ipv4() {
318                encode_src_ip = socket.ip() == &Ipv4Addr::UNSPECIFIED;
319            }
320        }
321    }
322    let mut msg_hdr: libc::msghdr = unsafe { mem::zeroed() };
323    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
324    let mut cmsgs = cmsg::Aligned([0u8; CMSG_LEN]);
325    let dst_addr = socket2::SockAddr::from(transmit.destination);
326    prepare_msg(
327        transmit,
328        &dst_addr,
329        &mut msg_hdr,
330        &mut iovec,
331        &mut cmsgs,
332        encode_src_ip,
333        state.sendmsg_einval(),
334    );
335
336    loop {
337        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &msg_hdr, 0) };
338
339        if n >= 0 {
340            return Ok(());
341        }
342
343        let e = io::Error::last_os_error();
344        match e.kind() {
345            // Retry the transmission
346            io::ErrorKind::Interrupted => continue,
347            io::ErrorKind::WouldBlock => return Err(e),
348            _ => {
349                // Some network adapters and drivers do not support GSO. Unfortunately, Linux
350                // offers no easy way for us to detect this short of an EIO or sometimes EINVAL
351                // when we try to actually send datagrams using it.
352                #[cfg(any(target_os = "linux", target_os = "android"))]
353                if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error() {
354                    // Prevent new transmits from being scheduled using GSO. Existing GSO transmits
355                    // may already be in the pipeline, so we need to tolerate additional failures.
356                    if state.max_gso_segments().get() > 1 {
357                        crate::log::info!(
358                            "`libc::sendmsg` failed with {e}; halting segmentation offload"
359                        );
360                        state
361                            .max_gso_segments
362                            .store(1, std::sync::atomic::Ordering::Relaxed);
363                    }
364                }
365
366                // Some arguments to `sendmsg` are not supported. Switch to
367                // fallback mode and retry if we haven't already.
368                if e.raw_os_error() == Some(libc::EINVAL) && !state.sendmsg_einval() {
369                    state.set_sendmsg_einval();
370                    prepare_msg(
371                        transmit,
372                        &dst_addr,
373                        &mut msg_hdr,
374                        &mut iovec,
375                        &mut cmsgs,
376                        encode_src_ip,
377                        state.sendmsg_einval(),
378                    );
379                    continue;
380                }
381
382                return Err(e);
383            }
384        }
385    }
386}
387
388#[cfg(apple_fast)]
389fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
390    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
391    let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
392    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
393    let addr = socket2::SockAddr::from(transmit.destination);
394    let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len());
395    let mut cnt = 0;
396    debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE);
397    for (i, chunk) in transmit
398        .contents
399        .chunks(segment_size)
400        .enumerate()
401        .take(BATCH_SIZE)
402    {
403        prepare_msg(
404            &Transmit {
405                destination: transmit.destination,
406                ecn: transmit.ecn,
407                contents: chunk,
408                segment_size: Some(chunk.len()),
409                src_ip: transmit.src_ip,
410            },
411            &addr,
412            &mut hdrs[i],
413            &mut iovs[i],
414            &mut ctrls[i],
415            true,
416            state.sendmsg_einval(),
417        );
418        hdrs[i].msg_datalen = chunk.len();
419        cnt += 1;
420    }
421    loop {
422        let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) };
423
424        if n >= 0 {
425            return Ok(());
426        }
427
428        let e = io::Error::last_os_error();
429        match e.kind() {
430            // Retry the transmission
431            io::ErrorKind::Interrupted => continue,
432            _ => return Err(e),
433        }
434    }
435}
436
437#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))]
438fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
439    let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
440    let mut iov: libc::iovec = unsafe { mem::zeroed() };
441    let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
442    let addr = socket2::SockAddr::from(transmit.destination);
443    prepare_msg(
444        transmit,
445        &addr,
446        &mut hdr,
447        &mut iov,
448        &mut ctrl,
449        cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"),
450        state.sendmsg_einval(),
451    );
452    loop {
453        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
454
455        if n >= 0 {
456            return Ok(());
457        }
458
459        let e = io::Error::last_os_error();
460        match e.kind() {
461            // Retry the transmission
462            io::ErrorKind::Interrupted => continue,
463            _ => return Err(e),
464        }
465    }
466}
467
468#[cfg(not(any(
469    apple,
470    target_os = "openbsd",
471    target_os = "netbsd",
472    target_os = "dragonfly",
473    solarish
474)))]
475fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
476    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
477    let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
478    let mut hdrs = unsafe { mem::zeroed::<[libc::mmsghdr; BATCH_SIZE]>() };
479    let max_msg_count = bufs.len().min(BATCH_SIZE);
480    for i in 0..max_msg_count {
481        prepare_recv(
482            &mut bufs[i],
483            &mut names[i],
484            &mut ctrls[i],
485            &mut hdrs[i].msg_hdr,
486        );
487    }
488    let msg_count = loop {
489        let n = unsafe {
490            libc::recvmmsg(
491                io.as_raw_fd(),
492                hdrs.as_mut_ptr(),
493                bufs.len().min(BATCH_SIZE) as _,
494                0,
495                ptr::null_mut::<libc::timespec>(),
496            )
497        };
498
499        if n >= 0 {
500            break n;
501        }
502
503        let e = io::Error::last_os_error();
504        match e.kind() {
505            // Retry receiving
506            io::ErrorKind::Interrupted => continue,
507            _ => return Err(e),
508        }
509    };
510    for i in 0..(msg_count as usize) {
511        meta[i] = decode_recv(&names[i], &hdrs[i].msg_hdr, hdrs[i].msg_len as usize)?;
512    }
513    Ok(msg_count as usize)
514}
515
516#[cfg(apple_fast)]
517fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
518    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
519    // MacOS 10.15 `recvmsg_x` does not override the `msghdr_x`
520    // `msg_controllen`. Thus, after the call to `recvmsg_x`, one does not know
521    // which control messages have been written to. To prevent reading
522    // uninitialized memory, do not use `MaybeUninit` for `ctrls`, instead
523    // initialize `ctrls` with `0`s. A control message of all `0`s is
524    // automatically skipped by `libc::CMSG_NXTHDR`.
525    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
526    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
527    let max_msg_count = bufs.len().min(BATCH_SIZE);
528    for i in 0..max_msg_count {
529        prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
530    }
531    let msg_count = loop {
532        let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) };
533
534        if n >= 0 {
535            break n;
536        }
537
538        let e = io::Error::last_os_error();
539        match e.kind() {
540            // Retry receiving
541            io::ErrorKind::Interrupted => continue,
542            _ => return Err(e),
543        }
544    };
545    for i in 0..(msg_count as usize) {
546        meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize)?;
547    }
548    Ok(msg_count as usize)
549}
550
551#[cfg(any(
552    target_os = "openbsd",
553    target_os = "netbsd",
554    target_os = "dragonfly",
555    solarish,
556    apple_slow
557))]
558fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
559    let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
560    let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
561    let mut hdr = unsafe { mem::zeroed::<libc::msghdr>() };
562    prepare_recv(&mut bufs[0], &mut name, &mut ctrl, &mut hdr);
563    let n = loop {
564        let n = unsafe { libc::recvmsg(io.as_raw_fd(), &mut hdr, 0) };
565
566        if hdr.msg_flags & libc::MSG_TRUNC != 0 {
567            continue;
568        }
569
570        if n >= 0 {
571            break n;
572        }
573
574        let e = io::Error::last_os_error();
575        match e.kind() {
576            // Retry receiving
577            io::ErrorKind::Interrupted => continue,
578            _ => return Err(e),
579        }
580    };
581    meta[0] = decode_recv(&name, &hdr, n as usize)?;
582    Ok(1)
583}
584
585const CMSG_LEN: usize = 88;
586
587fn prepare_msg(
588    transmit: &Transmit<'_>,
589    dst_addr: &socket2::SockAddr,
590    #[cfg(not(apple_fast))] hdr: &mut libc::msghdr,
591    #[cfg(apple_fast)] hdr: &mut msghdr_x,
592    iov: &mut libc::iovec,
593    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
594    #[allow(unused_variables)] // only used on FreeBSD & macOS
595    encode_src_ip: bool,
596    sendmsg_einval: bool,
597) {
598    iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
599    iov.iov_len = transmit.contents.len();
600
601    // SAFETY: Casting the pointer to a mutable one is legal,
602    // as sendmsg is guaranteed to not alter the mutable pointer
603    // as per the POSIX spec. See the section on the sys/socket.h
604    // header for details. The type is only mutable in the first
605    // place because it is reused by recvmsg as well.
606    let name = dst_addr.as_ptr() as *mut libc::c_void;
607    let namelen = dst_addr.len();
608    hdr.msg_name = name as *mut _;
609    hdr.msg_namelen = namelen;
610    hdr.msg_iov = iov;
611    hdr.msg_iovlen = 1;
612
613    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
614    hdr.msg_controllen = CMSG_LEN as _;
615    let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
616    let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
617    // True for IPv4 or IPv4-Mapped IPv6
618    let is_ipv4 = transmit.destination.is_ipv4()
619        || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
620    if is_ipv4 {
621        if !sendmsg_einval {
622            #[cfg(not(target_os = "netbsd"))]
623            {
624                encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
625            }
626        }
627    } else {
628        encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
629    }
630
631    if let Some(segment_size) = transmit.effective_segment_size() {
632        gso::set_segment_size(&mut encoder, segment_size as u16);
633    }
634
635    if let Some(ip) = &transmit.src_ip {
636        match ip {
637            IpAddr::V4(v4) => {
638                #[cfg(any(target_os = "linux", target_os = "android"))]
639                {
640                    let pktinfo = libc::in_pktinfo {
641                        ipi_ifindex: 0,
642                        ipi_spec_dst: libc::in_addr {
643                            s_addr: u32::from_ne_bytes(v4.octets()),
644                        },
645                        ipi_addr: libc::in_addr { s_addr: 0 },
646                    };
647                    encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo);
648                }
649                #[cfg(any(bsd, apple, solarish))]
650                {
651                    if encode_src_ip {
652                        let addr = libc::in_addr {
653                            s_addr: u32::from_ne_bytes(v4.octets()),
654                        };
655                        encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
656                    }
657                }
658            }
659            IpAddr::V6(v6) => {
660                let pktinfo = libc::in6_pktinfo {
661                    ipi6_ifindex: 0,
662                    ipi6_addr: libc::in6_addr {
663                        s6_addr: v6.octets(),
664                    },
665                };
666                encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
667            }
668        }
669    }
670
671    encoder.finish();
672}
673
674#[cfg(not(apple_fast))]
675fn prepare_recv(
676    buf: &mut IoSliceMut<'_>,
677    name: &mut MaybeUninit<libc::sockaddr_storage>,
678    ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
679    hdr: &mut libc::msghdr,
680) {
681    hdr.msg_name = name.as_mut_ptr() as _;
682    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
683    hdr.msg_iov = buf as *mut IoSliceMut<'_> as *mut libc::iovec;
684    hdr.msg_iovlen = 1;
685    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
686    hdr.msg_controllen = CMSG_LEN as _;
687    hdr.msg_flags = 0;
688}
689
690#[cfg(apple_fast)]
691fn prepare_recv(
692    buf: &mut IoSliceMut<'_>,
693    name: &mut MaybeUninit<libc::sockaddr_storage>,
694    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
695    hdr: &mut msghdr_x,
696) {
697    hdr.msg_name = name.as_mut_ptr() as _;
698    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
699    hdr.msg_iov = buf as *mut IoSliceMut<'_> as *mut libc::iovec;
700    hdr.msg_iovlen = 1;
701    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
702    hdr.msg_controllen = CMSG_LEN as _;
703    hdr.msg_flags = 0;
704    hdr.msg_datalen = buf.len();
705}
706
707fn decode_recv<M: cmsg::MsgHdr<ControlMessage = libc::cmsghdr>>(
708    name: &MaybeUninit<libc::sockaddr_storage>,
709    hdr: &M,
710    len: usize,
711) -> io::Result<RecvMeta> {
712    let name = unsafe { name.assume_init() };
713    let mut ecn_bits = 0;
714    let mut dst_ip = None;
715    let mut interface_index = None;
716    #[allow(unused_mut)] // only mutable on Linux
717    let mut stride = len;
718
719    let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
720    for cmsg in cmsg_iter {
721        match (cmsg.cmsg_level, cmsg.cmsg_type) {
722            (libc::IPPROTO_IP, libc::IP_TOS) => unsafe {
723                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
724            },
725            // FreeBSD uses IP_RECVTOS here, and we can be liberal because cmsgs are opt-in.
726            #[cfg(not(any(
727                target_os = "openbsd",
728                target_os = "netbsd",
729                target_os = "dragonfly",
730                solarish
731            )))]
732            (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe {
733                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
734            },
735            (libc::IPPROTO_IPV6, libc::IPV6_TCLASS) => unsafe {
736                // Temporary hack around broken macos ABI. Remove once upstream fixes it.
737                // https://bugreport.apple.com/web/?problemID=48761855
738                #[allow(clippy::unnecessary_cast)] // cmsg.cmsg_len defined as size_t
739                if cfg!(apple)
740                    && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::<u8>() as _) as usize
741                {
742                    ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
743                } else {
744                    ecn_bits = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as u8;
745                }
746            },
747            #[cfg(any(target_os = "linux", target_os = "android"))]
748            (libc::IPPROTO_IP, libc::IP_PKTINFO) => {
749                let pktinfo = unsafe { cmsg::decode::<libc::in_pktinfo, libc::cmsghdr>(cmsg) };
750                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(
751                    pktinfo.ipi_addr.s_addr.to_ne_bytes(),
752                )));
753                interface_index = Some(pktinfo.ipi_ifindex as u32);
754            }
755            #[cfg(any(bsd, apple))]
756            (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => {
757                let in_addr = unsafe { cmsg::decode::<libc::in_addr, libc::cmsghdr>(cmsg) };
758                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes())));
759            }
760            (libc::IPPROTO_IPV6, libc::IPV6_PKTINFO) => {
761                let pktinfo = unsafe { cmsg::decode::<libc::in6_pktinfo, libc::cmsghdr>(cmsg) };
762                dst_ip = Some(IpAddr::V6(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr)));
763                interface_index = Some(pktinfo.ipi6_ifindex as u32);
764            }
765            #[cfg(any(target_os = "linux", target_os = "android"))]
766            (libc::SOL_UDP, libc::UDP_GRO) => unsafe {
767                stride = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as usize;
768            },
769            _ => {}
770        }
771    }
772
773    let addr = match libc::c_int::from(name.ss_family) {
774        libc::AF_INET => {
775            // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
776            let addr: &libc::sockaddr_in =
777                unsafe { &*(&name as *const _ as *const libc::sockaddr_in) };
778            SocketAddr::V4(SocketAddrV4::new(
779                Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
780                u16::from_be(addr.sin_port),
781            ))
782        }
783        libc::AF_INET6 => {
784            // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
785            let addr: &libc::sockaddr_in6 =
786                unsafe { &*(&name as *const _ as *const libc::sockaddr_in6) };
787            SocketAddr::V6(SocketAddrV6::new(
788                Ipv6Addr::from(addr.sin6_addr.s6_addr),
789                u16::from_be(addr.sin6_port),
790                addr.sin6_flowinfo,
791                addr.sin6_scope_id,
792            ))
793        }
794        f => {
795            return Err(io::Error::other(format!(
796                "expected AF_INET or AF_INET6, got {f} in decode_recv"
797            )));
798        }
799    };
800
801    Ok(RecvMeta {
802        len,
803        stride,
804        addr,
805        ecn: EcnCodepoint::from_bits(ecn_bits),
806        dst_ip,
807        interface_index,
808    })
809}
810
811#[cfg(not(apple_slow))]
812// Chosen somewhat arbitrarily; might benefit from additional tuning.
813pub(crate) const BATCH_SIZE: usize = 32;
814
815#[cfg(apple_slow)]
816pub(crate) const BATCH_SIZE: usize = 1;
817
818#[cfg(any(target_os = "linux", target_os = "android"))]
819mod gso {
820    use super::*;
821    use std::{ffi::CStr, mem, str::FromStr, sync::OnceLock};
822
823    // Support for UDP GSO has been added to linux kernel in version 4.18
824    // https://github.com/torvalds/linux/commit/cb586c63e3fc5b227c51fd8c4cb40b34d3750645
825    const SUPPORTED_SINCE: KernelVersion = KernelVersion {
826        version: 4,
827        major_revision: 18,
828    };
829
830    /// Checks whether GSO support is available by checking the kernel version followed by setting
831    /// the UDP_SEGMENT option on a socket
832    pub(crate) fn max_gso_segments() -> usize {
833        const GSO_SIZE: libc::c_int = 1500;
834
835        if !SUPPORTED_BY_CURRENT_KERNEL.get_or_init(supported_by_current_kernel) {
836            return 1;
837        }
838
839        let socket = match std::net::UdpSocket::bind("[::]:0")
840            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
841        {
842            Ok(socket) => socket,
843            Err(_) => return 1,
844        };
845
846        // As defined in linux/udp.h
847        // #define UDP_MAX_SEGMENTS        (1 << 6UL)
848        match set_socket_option(&socket, libc::SOL_UDP, libc::UDP_SEGMENT, GSO_SIZE) {
849            Ok(()) => 64,
850            Err(_e) => {
851                crate::log::debug!(
852                    "failed to set `UDP_SEGMENT` socket option ({_e}); setting `max_gso_segments = 1`"
853                );
854
855                1
856            }
857        }
858    }
859
860    pub(crate) fn set_segment_size(
861        encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
862        segment_size: u16,
863    ) {
864        encoder.push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size);
865    }
866
867    // Avoid calling `supported_by_current_kernel` for each socket by using `OnceLock`.
868    static SUPPORTED_BY_CURRENT_KERNEL: OnceLock<bool> = OnceLock::new();
869
870    fn supported_by_current_kernel() -> bool {
871        let kernel_version_string = match kernel_version_string() {
872            Ok(kernel_version_string) => kernel_version_string,
873            Err(_e) => {
874                crate::log::warn!("GSO disabled: uname returned {_e}");
875                return false;
876            }
877        };
878
879        let Some(kernel_version) = KernelVersion::from_str(&kernel_version_string) else {
880            crate::log::warn!(
881                "GSO disabled: failed to parse kernel version ({kernel_version_string})"
882            );
883            return false;
884        };
885
886        if kernel_version < SUPPORTED_SINCE {
887            crate::log::info!("GSO disabled: kernel too old ({kernel_version_string}); need 4.18+",);
888            return false;
889        }
890
891        true
892    }
893
894    fn kernel_version_string() -> io::Result<String> {
895        let mut n = unsafe { mem::zeroed() };
896        let r = unsafe { libc::uname(&mut n) };
897        if r != 0 {
898            return Err(io::Error::last_os_error());
899        }
900        Ok(unsafe {
901            CStr::from_ptr(n.release[..].as_ptr())
902                .to_string_lossy()
903                .into_owned()
904        })
905    }
906
907    // https://www.linfo.org/kernel_version_numbering.html
908    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
909    struct KernelVersion {
910        version: u8,
911        major_revision: u8,
912    }
913
914    impl KernelVersion {
915        fn from_str(release: &str) -> Option<Self> {
916            let mut split = release
917                .split_once('-')
918                .map(|pair| pair.0)
919                .unwrap_or(release)
920                .split('.');
921
922            let version = u8::from_str(split.next()?).ok()?;
923            let major_revision = u8::from_str(split.next()?).ok()?;
924
925            Some(Self {
926                version,
927                major_revision,
928            })
929        }
930    }
931
932    #[cfg(test)]
933    mod test {
934        use super::*;
935
936        #[test]
937        fn parse_current_kernel_version_release_string() {
938            let release = kernel_version_string().unwrap();
939            KernelVersion::from_str(&release).unwrap();
940        }
941
942        #[test]
943        fn parse_kernel_version_release_string() {
944            // These are made up for the test
945            assert_eq!(
946                KernelVersion::from_str("4.14"),
947                Some(KernelVersion {
948                    version: 4,
949                    major_revision: 14
950                })
951            );
952            assert_eq!(
953                KernelVersion::from_str("4.18"),
954                Some(KernelVersion {
955                    version: 4,
956                    major_revision: 18
957                })
958            );
959            // These were seen in the wild
960            assert_eq!(
961                KernelVersion::from_str("4.14.186-27095505"),
962                Some(KernelVersion {
963                    version: 4,
964                    major_revision: 14
965                })
966            );
967            assert_eq!(
968                KernelVersion::from_str("6.8.0-59-generic"),
969                Some(KernelVersion {
970                    version: 6,
971                    major_revision: 8
972                })
973            );
974        }
975    }
976}
977
978// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not
979// offloaded to the NIC or even the kernel, but instead done here in user space in
980// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`).
981#[cfg(not(any(target_os = "linux", target_os = "android")))]
982mod gso {
983    use super::*;
984
985    pub(super) fn max_gso_segments() -> usize {
986        #[cfg(apple_fast)]
987        {
988            BATCH_SIZE
989        }
990        #[cfg(not(apple_fast))]
991        {
992            1
993        }
994    }
995
996    pub(super) fn set_segment_size(
997        #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
998        #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder<'_, msghdr_x>,
999        _segment_size: u16,
1000    ) {
1001    }
1002}
1003
1004#[cfg(any(target_os = "linux", target_os = "android"))]
1005mod gro {
1006    use super::*;
1007
1008    pub(crate) fn gro_segments() -> NonZeroUsize {
1009        let socket = match std::net::UdpSocket::bind("[::]:0")
1010            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
1011        {
1012            Ok(socket) => socket,
1013            Err(_) => return NonZeroUsize::MIN,
1014        };
1015
1016        // As defined in net/ipv4/udp_offload.c
1017        // #define UDP_GRO_CNT_MAX 64
1018        //
1019        // NOTE: this MUST be set to UDP_GRO_CNT_MAX to ensure that the receive buffer size
1020        // (get_max_udp_payload_size() * gro_segments()) is large enough to hold the largest GRO
1021        // list the kernel might potentially produce. See
1022        // https://github.com/quinn-rs/quinn/pull/1354.
1023        match set_socket_option(&socket, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON) {
1024            Ok(()) => NonZeroUsize::new(64).expect("known"),
1025            Err(_) => NonZeroUsize::MIN,
1026        }
1027    }
1028}
1029
1030/// Returns whether the given socket option is supported on the current platform
1031///
1032/// Yields `Ok(true)` if the option was set successfully, `Ok(false)` if setting
1033/// the option raised an `ENOPROTOOPT` or `EOPNOTSUPP` error, and `Err` for any other error.
1034fn set_socket_option_supported(
1035    socket: &impl AsRawFd,
1036    level: libc::c_int,
1037    name: libc::c_int,
1038    value: libc::c_int,
1039) -> io::Result<bool> {
1040    match set_socket_option(socket, level, name, value) {
1041        Ok(()) => Ok(true),
1042        Err(err) if err.raw_os_error() == Some(libc::ENOPROTOOPT) => Ok(false),
1043        Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => Ok(false),
1044        Err(err) => Err(err),
1045    }
1046}
1047
1048fn set_socket_option(
1049    socket: &impl AsRawFd,
1050    level: libc::c_int,
1051    name: libc::c_int,
1052    value: libc::c_int,
1053) -> io::Result<()> {
1054    let rc = unsafe {
1055        libc::setsockopt(
1056            socket.as_raw_fd(),
1057            level,
1058            name,
1059            &value as *const _ as _,
1060            mem::size_of_val(&value) as _,
1061        )
1062    };
1063
1064    match rc == 0 {
1065        true => Ok(()),
1066        false => Err(io::Error::last_os_error()),
1067    }
1068}
1069
1070const OPTION_ON: libc::c_int = 1;
1071
1072#[cfg(not(any(target_os = "linux", target_os = "android")))]
1073mod gro {
1074    use std::num::NonZeroUsize;
1075
1076    pub(super) fn gro_segments() -> NonZeroUsize {
1077        NonZeroUsize::MIN
1078    }
1079}