iroh_quinn_udp/
unix.rs

1#[cfg(not(any(apple, target_os = "openbsd", solarish)))]
2use std::ptr;
3use std::{
4    io::{self, IoSliceMut},
5    mem::{self, MaybeUninit},
6    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
7    num::NonZeroUsize,
8    os::unix::io::AsRawFd,
9    sync::{
10        Mutex,
11        atomic::{AtomicBool, AtomicUsize, Ordering},
12    },
13    time::Instant,
14};
15
16use socket2::SockRef;
17
18use super::{
19    EcnCodepoint, IO_ERROR_LOG_INTERVAL, RecvMeta, Transmit, UdpSockRef, cmsg, log_sendmsg_error,
20};
21
22// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h
23#[cfg(apple_fast)]
24#[repr(C)]
25#[allow(non_camel_case_types)]
26pub(crate) struct msghdr_x {
27    pub msg_name: *mut libc::c_void,
28    pub msg_namelen: libc::socklen_t,
29    pub msg_iov: *mut libc::iovec,
30    pub msg_iovlen: libc::c_int,
31    pub msg_control: *mut libc::c_void,
32    pub msg_controllen: libc::socklen_t,
33    pub msg_flags: libc::c_int,
34    pub msg_datalen: usize,
35}
36
37#[cfg(apple_fast)]
38unsafe extern "C" {
39    fn recvmsg_x(
40        s: libc::c_int,
41        msgp: *const msghdr_x,
42        cnt: libc::c_uint,
43        flags: libc::c_int,
44    ) -> isize;
45
46    fn sendmsg_x(
47        s: libc::c_int,
48        msgp: *const msghdr_x,
49        cnt: libc::c_uint,
50        flags: libc::c_int,
51    ) -> isize;
52}
53
54// Defined in netinet6/in6.h on OpenBSD, this is not yet exported by the libc crate
55// directly.  See https://github.com/rust-lang/libc/issues/3704 for when we might be able to
56// rely on this from the libc crate.
57#[cfg(any(target_os = "openbsd", target_os = "netbsd"))]
58const IPV6_DONTFRAG: libc::c_int = 62;
59#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))]
60const IPV6_DONTFRAG: libc::c_int = libc::IPV6_DONTFRAG;
61
62#[cfg(target_os = "freebsd")]
63type IpTosTy = libc::c_uchar;
64#[cfg(not(any(target_os = "freebsd", target_os = "netbsd")))]
65type IpTosTy = libc::c_int;
66
67/// Tokio-compatible UDP socket with some useful specializations.
68///
69/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some
70/// platforms.
71#[derive(Debug)]
72pub struct UdpSocketState {
73    last_send_error: Mutex<Instant>,
74    max_gso_segments: AtomicUsize,
75    gro_segments: NonZeroUsize,
76    may_fragment: bool,
77
78    /// True if we have received EINVAL error from `sendmsg` system call at least once.
79    ///
80    /// If enabled, we assume that old kernel is used and switch to fallback mode.
81    /// In particular, we do not use IP_TOS cmsg_type in this case,
82    /// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
83    sendmsg_einval: AtomicBool,
84}
85
86impl UdpSocketState {
87    pub fn new(sock: UdpSockRef<'_>) -> io::Result<Self> {
88        let io = sock.0;
89        let mut cmsg_platform_space = 0;
90        if cfg!(target_os = "linux")
91            || cfg!(bsd)
92            || cfg!(apple)
93            || cfg!(target_os = "android")
94            || cfg!(solarish)
95        {
96            cmsg_platform_space +=
97                unsafe { libc::CMSG_SPACE(mem::size_of::<libc::in6_pktinfo>() as _) as usize };
98        }
99
100        assert!(
101            CMSG_LEN
102                >= unsafe { libc::CMSG_SPACE(mem::size_of::<libc::c_int>() as _) as usize }
103                    + cmsg_platform_space
104        );
105        assert!(
106            mem::align_of::<libc::cmsghdr>() <= mem::align_of::<cmsg::Aligned<[u8; 0]>>(),
107            "control message buffers will be misaligned"
108        );
109
110        io.set_nonblocking(true)?;
111
112        let addr = io.local_addr()?;
113        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
114
115        // mac and ios do not support IP_RECVTOS on dual-stack sockets :(
116        // older macos versions also don't have the flag and will error out if we don't ignore it
117        #[cfg(not(any(
118            target_os = "openbsd",
119            target_os = "netbsd",
120            target_os = "dragonfly",
121            solarish
122        )))]
123        if is_ipv4 || !io.only_v6()? {
124            if let Err(_err) =
125                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON)
126            {
127                crate::log::debug!("Ignoring error setting IP_RECVTOS on socket: {_err:?}");
128            }
129        }
130
131        let mut may_fragment = false;
132        #[cfg(any(target_os = "linux", target_os = "android"))]
133        {
134            // opportunistically try to enable GRO. See gro::gro_segments().
135            let _ = set_socket_option(&*io, libc::SOL_UDP, gro::UDP_GRO, OPTION_ON);
136
137            // Forbid IPv4 fragmentation. Set even for IPv6 to account for IPv6 mapped IPv4 addresses.
138            // Set `may_fragment` to `true` if this option is not supported on the platform.
139            may_fragment |= !set_socket_option_supported(
140                &*io,
141                libc::IPPROTO_IP,
142                libc::IP_MTU_DISCOVER,
143                libc::IP_PMTUDISC_PROBE,
144            )?;
145
146            if is_ipv4 {
147                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?;
148            } else {
149                // Set `may_fragment` to `true` if this option is not supported on the platform.
150                may_fragment |= !set_socket_option_supported(
151                    &*io,
152                    libc::IPPROTO_IPV6,
153                    libc::IPV6_MTU_DISCOVER,
154                    libc::IPV6_PMTUDISC_PROBE,
155                )?;
156            }
157        }
158        #[cfg(any(target_os = "freebsd", apple))]
159        {
160            if is_ipv4 {
161                // Set `may_fragment` to `true` if this option is not supported on the platform.
162                may_fragment |= !set_socket_option_supported(
163                    &*io,
164                    libc::IPPROTO_IP,
165                    libc::IP_DONTFRAG,
166                    OPTION_ON,
167                )?;
168            }
169        }
170        #[cfg(any(bsd, apple, solarish))]
171        // IP_RECVDSTADDR == IP_SENDSRCADDR on FreeBSD
172        // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS (the same on Solaris)
173        // macOS also supports IP_PKTINFO
174        {
175            if is_ipv4 {
176                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, OPTION_ON)?;
177            }
178        }
179
180        // Options standardized in RFC 3542
181        if !is_ipv4 {
182            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, OPTION_ON)?;
183            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, OPTION_ON)?;
184            // Linux's IP_PMTUDISC_PROBE allows us to operate under interface MTU rather than the
185            // kernel's path MTU guess, but actually disabling fragmentation requires this too. See
186            // __ip6_append_data in ip6_output.c.
187            // Set `may_fragment` to `true` if this option is not supported on the platform.
188            may_fragment |=
189                !set_socket_option_supported(&*io, libc::IPPROTO_IPV6, IPV6_DONTFRAG, OPTION_ON)?;
190        }
191
192        let now = Instant::now();
193        Ok(Self {
194            last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
195            max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
196            gro_segments: gro::gro_segments(),
197            may_fragment,
198            sendmsg_einval: AtomicBool::new(false),
199        })
200    }
201
202    /// Sends a [`Transmit`] on the given socket.
203    ///
204    /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`].
205    /// All other errors will be logged and converted to `Ok`.
206    ///
207    /// UDP transmission errors are considered non-fatal because higher-level protocols must
208    /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature.
209    /// Thus, logging is most likely the only thing you can do with these errors.
210    ///
211    /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`]
212    /// instead.
213    pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
214        match send(self, socket.0, transmit) {
215            Ok(()) => Ok(()),
216            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
217            // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
218            //   these by automatically clamping the MTUD upper bound to the interface MTU.
219            Err(e) if e.raw_os_error() == Some(libc::EMSGSIZE) => Ok(()),
220            Err(e) => {
221                log_sendmsg_error(&self.last_send_error, e, transmit);
222
223                Ok(())
224            }
225        }
226    }
227
228    /// Sends a [`Transmit`] on the given socket without any additional error handling.
229    pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
230        send(self, socket.0, transmit)
231    }
232
233    pub fn recv(
234        &self,
235        socket: UdpSockRef<'_>,
236        bufs: &mut [IoSliceMut<'_>],
237        meta: &mut [RecvMeta],
238    ) -> io::Result<usize> {
239        recv(socket.0, bufs, meta)
240    }
241
242    /// The maximum amount of segments which can be transmitted if a platform
243    /// supports Generic Send Offload (GSO).
244    ///
245    /// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
246    /// while using GSO.
247    #[inline]
248    pub fn max_gso_segments(&self) -> NonZeroUsize {
249        self.max_gso_segments
250            .load(Ordering::Relaxed)
251            .try_into()
252            .expect("must have non zero GSO segments")
253    }
254
255    /// The number of segments to read when GRO is enabled. Used as a factor to
256    /// compute the receive buffer size.
257    ///
258    /// Returns 1 if the platform doesn't support GRO.
259    #[inline]
260    pub fn gro_segments(&self) -> NonZeroUsize {
261        self.gro_segments
262    }
263
264    /// Resize the send buffer of `socket` to `bytes`
265    #[inline]
266    pub fn set_send_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
267        socket.0.set_send_buffer_size(bytes)
268    }
269
270    /// Resize the receive buffer of `socket` to `bytes`
271    #[inline]
272    pub fn set_recv_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
273        socket.0.set_recv_buffer_size(bytes)
274    }
275
276    /// Get the size of the `socket` send buffer
277    #[inline]
278    pub fn send_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
279        socket.0.send_buffer_size()
280    }
281
282    /// Get the size of the `socket` receive buffer
283    #[inline]
284    pub fn recv_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
285        socket.0.recv_buffer_size()
286    }
287
288    /// Whether transmitted datagrams might get fragmented by the IP layer
289    ///
290    /// Returns `false` on targets which employ e.g. the `IPV6_DONTFRAG` socket option.
291    #[inline]
292    pub fn may_fragment(&self) -> bool {
293        self.may_fragment
294    }
295
296    /// Returns true if we previously got an EINVAL error from `sendmsg` syscall.
297    fn sendmsg_einval(&self) -> bool {
298        self.sendmsg_einval.load(Ordering::Relaxed)
299    }
300
301    /// Sets the flag indicating we got EINVAL error from `sendmsg` syscall.
302    #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
303    fn set_sendmsg_einval(&self) {
304        self.sendmsg_einval.store(true, Ordering::Relaxed)
305    }
306}
307
308#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
309fn send(
310    #[allow(unused_variables)] // only used on Linux
311    state: &UdpSocketState,
312    io: SockRef<'_>,
313    transmit: &Transmit<'_>,
314) -> io::Result<()> {
315    #[allow(unused_mut)] // only mutable on FreeBSD
316    let mut encode_src_ip = true;
317    #[cfg(target_os = "freebsd")]
318    {
319        let addr = io.local_addr()?;
320        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
321        if is_ipv4 {
322            if let Some(socket) = addr.as_socket_ipv4() {
323                encode_src_ip = socket.ip() == &Ipv4Addr::UNSPECIFIED;
324            }
325        }
326    }
327    let mut msg_hdr: libc::msghdr = unsafe { mem::zeroed() };
328    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
329    let mut cmsgs = cmsg::Aligned([0u8; CMSG_LEN]);
330    let dst_addr = socket2::SockAddr::from(transmit.destination);
331    prepare_msg(
332        transmit,
333        &dst_addr,
334        &mut msg_hdr,
335        &mut iovec,
336        &mut cmsgs,
337        encode_src_ip,
338        state.sendmsg_einval(),
339    );
340
341    loop {
342        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &msg_hdr, 0) };
343
344        if n >= 0 {
345            return Ok(());
346        }
347
348        let e = io::Error::last_os_error();
349        match e.kind() {
350            // Retry the transmission
351            io::ErrorKind::Interrupted => continue,
352            io::ErrorKind::WouldBlock => return Err(e),
353            _ => {
354                // Some network adapters and drivers do not support GSO. Unfortunately, Linux
355                // offers no easy way for us to detect this short of an EIO or sometimes EINVAL
356                // when we try to actually send datagrams using it.
357                #[cfg(any(target_os = "linux", target_os = "android"))]
358                if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error() {
359                    // Prevent new transmits from being scheduled using GSO. Existing GSO transmits
360                    // may already be in the pipeline, so we need to tolerate additional failures.
361                    if state.max_gso_segments().get() > 1 {
362                        crate::log::info!(
363                            "`libc::sendmsg` failed with {e}; halting segmentation offload"
364                        );
365                        state
366                            .max_gso_segments
367                            .store(1, std::sync::atomic::Ordering::Relaxed);
368                    }
369                }
370
371                // Some arguments to `sendmsg` are not supported. Switch to
372                // fallback mode and retry if we haven't already.
373                if e.raw_os_error() == Some(libc::EINVAL) && !state.sendmsg_einval() {
374                    state.set_sendmsg_einval();
375                    prepare_msg(
376                        transmit,
377                        &dst_addr,
378                        &mut msg_hdr,
379                        &mut iovec,
380                        &mut cmsgs,
381                        encode_src_ip,
382                        state.sendmsg_einval(),
383                    );
384                    continue;
385                }
386
387                return Err(e);
388            }
389        }
390    }
391}
392
393#[cfg(apple_fast)]
394fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
395    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
396    let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
397    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
398    let addr = socket2::SockAddr::from(transmit.destination);
399    let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len());
400    let mut cnt = 0;
401    debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE);
402    for (i, chunk) in transmit
403        .contents
404        .chunks(segment_size)
405        .enumerate()
406        .take(BATCH_SIZE)
407    {
408        prepare_msg(
409            &Transmit {
410                destination: transmit.destination,
411                ecn: transmit.ecn,
412                contents: chunk,
413                segment_size: Some(chunk.len()),
414                src_ip: transmit.src_ip,
415            },
416            &addr,
417            &mut hdrs[i],
418            &mut iovs[i],
419            &mut ctrls[i],
420            true,
421            state.sendmsg_einval(),
422        );
423        hdrs[i].msg_datalen = chunk.len();
424        cnt += 1;
425    }
426    loop {
427        let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) };
428
429        if n >= 0 {
430            return Ok(());
431        }
432
433        let e = io::Error::last_os_error();
434        match e.kind() {
435            // Retry the transmission
436            io::ErrorKind::Interrupted => continue,
437            _ => return Err(e),
438        }
439    }
440}
441
442#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))]
443fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
444    let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
445    let mut iov: libc::iovec = unsafe { mem::zeroed() };
446    let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
447    let addr = socket2::SockAddr::from(transmit.destination);
448    prepare_msg(
449        transmit,
450        &addr,
451        &mut hdr,
452        &mut iov,
453        &mut ctrl,
454        cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"),
455        state.sendmsg_einval(),
456    );
457    loop {
458        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
459
460        if n >= 0 {
461            return Ok(());
462        }
463
464        let e = io::Error::last_os_error();
465        match e.kind() {
466            // Retry the transmission
467            io::ErrorKind::Interrupted => continue,
468            _ => return Err(e),
469        }
470    }
471}
472
473#[cfg(not(any(
474    apple,
475    target_os = "openbsd",
476    target_os = "netbsd",
477    target_os = "dragonfly",
478    solarish
479)))]
480fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
481    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
482    let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
483    let mut hdrs = unsafe { mem::zeroed::<[libc::mmsghdr; BATCH_SIZE]>() };
484    let max_msg_count = bufs.len().min(BATCH_SIZE);
485    for i in 0..max_msg_count {
486        prepare_recv(
487            &mut bufs[i],
488            &mut names[i],
489            &mut ctrls[i],
490            &mut hdrs[i].msg_hdr,
491        );
492    }
493    let msg_count = loop {
494        let n = unsafe {
495            libc::recvmmsg(
496                io.as_raw_fd(),
497                hdrs.as_mut_ptr(),
498                bufs.len().min(BATCH_SIZE) as _,
499                0,
500                ptr::null_mut::<libc::timespec>(),
501            )
502        };
503
504        if n >= 0 {
505            break n;
506        }
507
508        let e = io::Error::last_os_error();
509        match e.kind() {
510            // Retry receiving
511            io::ErrorKind::Interrupted => continue,
512            _ => return Err(e),
513        }
514    };
515    for i in 0..(msg_count as usize) {
516        meta[i] = decode_recv(&names[i], &hdrs[i].msg_hdr, hdrs[i].msg_len as usize)?;
517    }
518    Ok(msg_count as usize)
519}
520
521#[cfg(apple_fast)]
522fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
523    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
524    // MacOS 10.15 `recvmsg_x` does not override the `msghdr_x`
525    // `msg_controllen`. Thus, after the call to `recvmsg_x`, one does not know
526    // which control messages have been written to. To prevent reading
527    // uninitialized memory, do not use `MaybeUninit` for `ctrls`, instead
528    // initialize `ctrls` with `0`s. A control message of all `0`s is
529    // automatically skipped by `libc::CMSG_NXTHDR`.
530    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
531    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
532    let max_msg_count = bufs.len().min(BATCH_SIZE);
533    for i in 0..max_msg_count {
534        prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
535    }
536    let msg_count = loop {
537        let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) };
538
539        if n >= 0 {
540            break n;
541        }
542
543        let e = io::Error::last_os_error();
544        match e.kind() {
545            // Retry receiving
546            io::ErrorKind::Interrupted => continue,
547            _ => return Err(e),
548        }
549    };
550    for i in 0..(msg_count as usize) {
551        meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize)?;
552    }
553    Ok(msg_count as usize)
554}
555
556#[cfg(any(
557    target_os = "openbsd",
558    target_os = "netbsd",
559    target_os = "dragonfly",
560    solarish,
561    apple_slow
562))]
563fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
564    let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
565    let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
566    let mut hdr = unsafe { mem::zeroed::<libc::msghdr>() };
567    prepare_recv(&mut bufs[0], &mut name, &mut ctrl, &mut hdr);
568    let n = loop {
569        let n = unsafe { libc::recvmsg(io.as_raw_fd(), &mut hdr, 0) };
570
571        if hdr.msg_flags & libc::MSG_TRUNC != 0 {
572            continue;
573        }
574
575        if n >= 0 {
576            break n;
577        }
578
579        let e = io::Error::last_os_error();
580        match e.kind() {
581            // Retry receiving
582            io::ErrorKind::Interrupted => continue,
583            _ => return Err(e),
584        }
585    };
586    meta[0] = decode_recv(&name, &hdr, n as usize)?;
587    Ok(1)
588}
589
590const CMSG_LEN: usize = 88;
591
592fn prepare_msg(
593    transmit: &Transmit<'_>,
594    dst_addr: &socket2::SockAddr,
595    #[cfg(not(apple_fast))] hdr: &mut libc::msghdr,
596    #[cfg(apple_fast)] hdr: &mut msghdr_x,
597    iov: &mut libc::iovec,
598    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
599    #[allow(unused_variables)] // only used on FreeBSD & macOS
600    encode_src_ip: bool,
601    sendmsg_einval: bool,
602) {
603    iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
604    iov.iov_len = transmit.contents.len();
605
606    // SAFETY: Casting the pointer to a mutable one is legal,
607    // as sendmsg is guaranteed to not alter the mutable pointer
608    // as per the POSIX spec. See the section on the sys/socket.h
609    // header for details. The type is only mutable in the first
610    // place because it is reused by recvmsg as well.
611    let name = dst_addr.as_ptr() as *mut libc::c_void;
612    let namelen = dst_addr.len();
613    hdr.msg_name = name as *mut _;
614    hdr.msg_namelen = namelen;
615    hdr.msg_iov = iov;
616    hdr.msg_iovlen = 1;
617
618    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
619    hdr.msg_controllen = CMSG_LEN as _;
620    let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
621    let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
622    // True for IPv4 or IPv4-Mapped IPv6
623    let is_ipv4 = transmit.destination.is_ipv4()
624        || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
625    if is_ipv4 {
626        if !sendmsg_einval {
627            #[cfg(not(target_os = "netbsd"))]
628            {
629                encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
630            }
631        }
632    } else {
633        encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
634    }
635
636    // Only set the segment size if it is less than the size of the contents.
637    // Some network drivers don't like being told to do GSO even if there is effectively only a single segment (i.e. `segment_size == transmit.contents.len()`)
638    // Additionally, a `segment_size` that is greater than the content also means there is effectively only a single segment.
639    // This case is actually quite common when splitting up a prepared GSO batch again after GSO has been disabled because the last datagram in a GSO batch is allowed to be smaller than the segment size.
640    if let Some(segment_size) = transmit
641        .segment_size
642        .filter(|segment_size| *segment_size < transmit.contents.len())
643    {
644        gso::set_segment_size(&mut encoder, segment_size as u16);
645    }
646
647    if let Some(ip) = &transmit.src_ip {
648        match ip {
649            IpAddr::V4(v4) => {
650                #[cfg(any(target_os = "linux", target_os = "android"))]
651                {
652                    let pktinfo = libc::in_pktinfo {
653                        ipi_ifindex: 0,
654                        ipi_spec_dst: libc::in_addr {
655                            s_addr: u32::from_ne_bytes(v4.octets()),
656                        },
657                        ipi_addr: libc::in_addr { s_addr: 0 },
658                    };
659                    encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo);
660                }
661                #[cfg(any(bsd, apple, solarish))]
662                {
663                    if encode_src_ip {
664                        let addr = libc::in_addr {
665                            s_addr: u32::from_ne_bytes(v4.octets()),
666                        };
667                        encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
668                    }
669                }
670            }
671            IpAddr::V6(v6) => {
672                let pktinfo = libc::in6_pktinfo {
673                    ipi6_ifindex: 0,
674                    ipi6_addr: libc::in6_addr {
675                        s6_addr: v6.octets(),
676                    },
677                };
678                encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
679            }
680        }
681    }
682
683    encoder.finish();
684}
685
686#[cfg(not(apple_fast))]
687fn prepare_recv(
688    buf: &mut IoSliceMut,
689    name: &mut MaybeUninit<libc::sockaddr_storage>,
690    ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
691    hdr: &mut libc::msghdr,
692) {
693    hdr.msg_name = name.as_mut_ptr() as _;
694    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
695    hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec;
696    hdr.msg_iovlen = 1;
697    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
698    hdr.msg_controllen = CMSG_LEN as _;
699    hdr.msg_flags = 0;
700}
701
702#[cfg(apple_fast)]
703fn prepare_recv(
704    buf: &mut IoSliceMut,
705    name: &mut MaybeUninit<libc::sockaddr_storage>,
706    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
707    hdr: &mut msghdr_x,
708) {
709    hdr.msg_name = name.as_mut_ptr() as _;
710    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
711    hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec;
712    hdr.msg_iovlen = 1;
713    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
714    hdr.msg_controllen = CMSG_LEN as _;
715    hdr.msg_flags = 0;
716    hdr.msg_datalen = buf.len();
717}
718
719fn decode_recv(
720    name: &MaybeUninit<libc::sockaddr_storage>,
721    #[cfg(not(apple_fast))] hdr: &libc::msghdr,
722    #[cfg(apple_fast)] hdr: &msghdr_x,
723    len: usize,
724) -> io::Result<RecvMeta> {
725    let name = unsafe { name.assume_init() };
726    let mut ecn_bits = 0;
727    let mut dst_ip = None;
728    let mut interface_index = None;
729    #[allow(unused_mut)] // only mutable on Linux
730    let mut stride = len;
731
732    let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
733    for cmsg in cmsg_iter {
734        match (cmsg.cmsg_level, cmsg.cmsg_type) {
735            (libc::IPPROTO_IP, libc::IP_TOS) => unsafe {
736                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
737            },
738            // FreeBSD uses IP_RECVTOS here, and we can be liberal because cmsgs are opt-in.
739            #[cfg(not(any(
740                target_os = "openbsd",
741                target_os = "netbsd",
742                target_os = "dragonfly",
743                solarish
744            )))]
745            (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe {
746                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
747            },
748            (libc::IPPROTO_IPV6, libc::IPV6_TCLASS) => unsafe {
749                // Temporary hack around broken macos ABI. Remove once upstream fixes it.
750                // https://bugreport.apple.com/web/?problemID=48761855
751                #[allow(clippy::unnecessary_cast)] // cmsg.cmsg_len defined as size_t
752                if cfg!(apple)
753                    && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::<u8>() as _) as usize
754                {
755                    ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
756                } else {
757                    ecn_bits = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as u8;
758                }
759            },
760            #[cfg(any(target_os = "linux", target_os = "android"))]
761            (libc::IPPROTO_IP, libc::IP_PKTINFO) => {
762                let pktinfo = unsafe { cmsg::decode::<libc::in_pktinfo, libc::cmsghdr>(cmsg) };
763                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(
764                    pktinfo.ipi_addr.s_addr.to_ne_bytes(),
765                )));
766                interface_index = Some(pktinfo.ipi_ifindex as u32);
767            }
768            #[cfg(any(bsd, apple))]
769            (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => {
770                let in_addr = unsafe { cmsg::decode::<libc::in_addr, libc::cmsghdr>(cmsg) };
771                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes())));
772            }
773            (libc::IPPROTO_IPV6, libc::IPV6_PKTINFO) => {
774                let pktinfo = unsafe { cmsg::decode::<libc::in6_pktinfo, libc::cmsghdr>(cmsg) };
775                dst_ip = Some(IpAddr::V6(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr)));
776                interface_index = Some(pktinfo.ipi6_ifindex as u32);
777            }
778            #[cfg(any(target_os = "linux", target_os = "android"))]
779            (libc::SOL_UDP, gro::UDP_GRO) => unsafe {
780                stride = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as usize;
781            },
782            _ => {}
783        }
784    }
785
786    let addr = match libc::c_int::from(name.ss_family) {
787        libc::AF_INET => {
788            // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
789            let addr: &libc::sockaddr_in =
790                unsafe { &*(&name as *const _ as *const libc::sockaddr_in) };
791            SocketAddr::V4(SocketAddrV4::new(
792                Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
793                u16::from_be(addr.sin_port),
794            ))
795        }
796        libc::AF_INET6 => {
797            // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
798            let addr: &libc::sockaddr_in6 =
799                unsafe { &*(&name as *const _ as *const libc::sockaddr_in6) };
800            SocketAddr::V6(SocketAddrV6::new(
801                Ipv6Addr::from(addr.sin6_addr.s6_addr),
802                u16::from_be(addr.sin6_port),
803                addr.sin6_flowinfo,
804                addr.sin6_scope_id,
805            ))
806        }
807        f => {
808            return Err(io::Error::other(format!(
809                "expected AF_INET or AF_INET6, got {f} in decode_recv"
810            )));
811        }
812    };
813
814    Ok(RecvMeta {
815        len,
816        stride,
817        addr,
818        ecn: EcnCodepoint::from_bits(ecn_bits),
819        dst_ip,
820        interface_index,
821    })
822}
823
824#[cfg(not(apple_slow))]
825// Chosen somewhat arbitrarily; might benefit from additional tuning.
826pub(crate) const BATCH_SIZE: usize = 32;
827
828#[cfg(apple_slow)]
829pub(crate) const BATCH_SIZE: usize = 1;
830
831#[cfg(any(target_os = "linux", target_os = "android"))]
832mod gso {
833    use super::*;
834    use std::{ffi::CStr, mem, str::FromStr, sync::OnceLock};
835
836    #[cfg(not(target_os = "android"))]
837    const UDP_SEGMENT: libc::c_int = libc::UDP_SEGMENT;
838    #[cfg(target_os = "android")]
839    // TODO: Add this to libc
840    const UDP_SEGMENT: libc::c_int = 103;
841
842    // Support for UDP GSO has been added to linux kernel in version 4.18
843    // https://github.com/torvalds/linux/commit/cb586c63e3fc5b227c51fd8c4cb40b34d3750645
844    const SUPPORTED_SINCE: KernelVersion = KernelVersion {
845        version: 4,
846        major_revision: 18,
847    };
848
849    /// Checks whether GSO support is available by checking the kernel version followed by setting
850    /// the UDP_SEGMENT option on a socket
851    pub(crate) fn max_gso_segments() -> usize {
852        const GSO_SIZE: libc::c_int = 1500;
853
854        if !SUPPORTED_BY_CURRENT_KERNEL.get_or_init(supported_by_current_kernel) {
855            return 1;
856        }
857
858        let socket = match std::net::UdpSocket::bind("[::]:0")
859            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
860        {
861            Ok(socket) => socket,
862            Err(_) => return 1,
863        };
864
865        // As defined in linux/udp.h
866        // #define UDP_MAX_SEGMENTS        (1 << 6UL)
867        match set_socket_option(&socket, libc::SOL_UDP, UDP_SEGMENT, GSO_SIZE) {
868            Ok(()) => 64,
869            Err(_e) => {
870                crate::log::debug!(
871                    "failed to set `UDP_SEGMENT` socket option ({_e}); setting `max_gso_segments = 1`"
872                );
873
874                1
875            }
876        }
877    }
878
879    pub(crate) fn set_segment_size(encoder: &mut cmsg::Encoder<libc::msghdr>, segment_size: u16) {
880        encoder.push(libc::SOL_UDP, UDP_SEGMENT, segment_size);
881    }
882
883    // Avoid calling `supported_by_current_kernel` for each socket by using `OnceLock`.
884    static SUPPORTED_BY_CURRENT_KERNEL: OnceLock<bool> = OnceLock::new();
885
886    fn supported_by_current_kernel() -> bool {
887        let kernel_version_string = match kernel_version_string() {
888            Ok(kernel_version_string) => kernel_version_string,
889            Err(_e) => {
890                crate::log::warn!("GSO disabled: uname returned {_e}");
891                return false;
892            }
893        };
894
895        let Some(kernel_version) = KernelVersion::from_str(&kernel_version_string) else {
896            crate::log::warn!(
897                "GSO disabled: failed to parse kernel version ({kernel_version_string})"
898            );
899            return false;
900        };
901
902        if kernel_version < SUPPORTED_SINCE {
903            crate::log::info!("GSO disabled: kernel too old ({kernel_version_string}); need 4.18+",);
904            return false;
905        }
906
907        true
908    }
909
910    fn kernel_version_string() -> io::Result<String> {
911        let mut n = unsafe { mem::zeroed() };
912        let r = unsafe { libc::uname(&mut n) };
913        if r != 0 {
914            return Err(io::Error::last_os_error());
915        }
916        Ok(unsafe {
917            CStr::from_ptr(n.release[..].as_ptr())
918                .to_string_lossy()
919                .into_owned()
920        })
921    }
922
923    // https://www.linfo.org/kernel_version_numbering.html
924    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
925    struct KernelVersion {
926        version: u8,
927        major_revision: u8,
928    }
929
930    impl KernelVersion {
931        fn from_str(release: &str) -> Option<Self> {
932            let mut split = release
933                .split_once('-')
934                .map(|pair| pair.0)
935                .unwrap_or(release)
936                .split('.');
937
938            let version = u8::from_str(split.next()?).ok()?;
939            let major_revision = u8::from_str(split.next()?).ok()?;
940
941            Some(Self {
942                version,
943                major_revision,
944            })
945        }
946    }
947
948    #[cfg(test)]
949    mod test {
950        use super::*;
951
952        #[test]
953        fn parse_current_kernel_version_release_string() {
954            let release = kernel_version_string().unwrap();
955            KernelVersion::from_str(&release).unwrap();
956        }
957
958        #[test]
959        fn parse_kernel_version_release_string() {
960            // These are made up for the test
961            assert_eq!(
962                KernelVersion::from_str("4.14"),
963                Some(KernelVersion {
964                    version: 4,
965                    major_revision: 14
966                })
967            );
968            assert_eq!(
969                KernelVersion::from_str("4.18"),
970                Some(KernelVersion {
971                    version: 4,
972                    major_revision: 18
973                })
974            );
975            // These were seen in the wild
976            assert_eq!(
977                KernelVersion::from_str("4.14.186-27095505"),
978                Some(KernelVersion {
979                    version: 4,
980                    major_revision: 14
981                })
982            );
983            assert_eq!(
984                KernelVersion::from_str("6.8.0-59-generic"),
985                Some(KernelVersion {
986                    version: 6,
987                    major_revision: 8
988                })
989            );
990        }
991    }
992}
993
994// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not
995// offloaded to the NIC or even the kernel, but instead done here in user space in
996// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`).
997#[cfg(not(any(target_os = "linux", target_os = "android")))]
998mod gso {
999    use super::*;
1000
1001    pub(super) fn max_gso_segments() -> usize {
1002        #[cfg(apple_fast)]
1003        {
1004            BATCH_SIZE
1005        }
1006        #[cfg(not(apple_fast))]
1007        {
1008            1
1009        }
1010    }
1011
1012    pub(super) fn set_segment_size(
1013        #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder<libc::msghdr>,
1014        #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder<msghdr_x>,
1015        _segment_size: u16,
1016    ) {
1017    }
1018}
1019
1020#[cfg(any(target_os = "linux", target_os = "android"))]
1021mod gro {
1022    use super::*;
1023
1024    #[cfg(not(target_os = "android"))]
1025    pub(crate) const UDP_GRO: libc::c_int = libc::UDP_GRO;
1026    #[cfg(target_os = "android")]
1027    // TODO: Add this to libc
1028    pub(crate) const UDP_GRO: libc::c_int = 104;
1029
1030    pub(crate) fn gro_segments() -> NonZeroUsize {
1031        let socket = match std::net::UdpSocket::bind("[::]:0")
1032            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
1033        {
1034            Ok(socket) => socket,
1035            Err(_) => return NonZeroUsize::MIN,
1036        };
1037
1038        // As defined in net/ipv4/udp_offload.c
1039        // #define UDP_GRO_CNT_MAX 64
1040        //
1041        // NOTE: this MUST be set to UDP_GRO_CNT_MAX to ensure that the receive buffer size
1042        // (get_max_udp_payload_size() * gro_segments()) is large enough to hold the largest GRO
1043        // list the kernel might potentially produce. See
1044        // https://github.com/quinn-rs/quinn/pull/1354.
1045        match set_socket_option(&socket, libc::SOL_UDP, UDP_GRO, OPTION_ON) {
1046            Ok(()) => NonZeroUsize::new(64).expect("known"),
1047            Err(_) => NonZeroUsize::MIN,
1048        }
1049    }
1050}
1051
1052/// Returns whether the given socket option is supported on the current platform
1053///
1054/// Yields `Ok(true)` if the option was set successfully, `Ok(false)` if setting
1055/// the option raised an `ENOPROTOOPT` or `EOPNOTSUPP` error, and `Err` for any other error.
1056fn set_socket_option_supported(
1057    socket: &impl AsRawFd,
1058    level: libc::c_int,
1059    name: libc::c_int,
1060    value: libc::c_int,
1061) -> io::Result<bool> {
1062    match set_socket_option(socket, level, name, value) {
1063        Ok(()) => Ok(true),
1064        Err(err) if err.raw_os_error() == Some(libc::ENOPROTOOPT) => Ok(false),
1065        Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => Ok(false),
1066        Err(err) => Err(err),
1067    }
1068}
1069
1070fn set_socket_option(
1071    socket: &impl AsRawFd,
1072    level: libc::c_int,
1073    name: libc::c_int,
1074    value: libc::c_int,
1075) -> io::Result<()> {
1076    let rc = unsafe {
1077        libc::setsockopt(
1078            socket.as_raw_fd(),
1079            level,
1080            name,
1081            &value as *const _ as _,
1082            mem::size_of_val(&value) as _,
1083        )
1084    };
1085
1086    match rc == 0 {
1087        true => Ok(()),
1088        false => Err(io::Error::last_os_error()),
1089    }
1090}
1091
1092const OPTION_ON: libc::c_int = 1;
1093
1094#[cfg(not(any(target_os = "linux", target_os = "android")))]
1095mod gro {
1096    use std::num::NonZeroUsize;
1097
1098    pub(super) fn gro_segments() -> NonZeroUsize {
1099        NonZeroUsize::MIN
1100    }
1101}