iroh_quinn_udp/
unix.rs

1#[cfg(not(any(apple, target_os = "openbsd", solarish)))]
2use std::ptr;
3use std::{
4    io::{self, IoSliceMut},
5    mem::{self, MaybeUninit},
6    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
7    os::unix::io::AsRawFd,
8    sync::{
9        Mutex,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    time::Instant,
13};
14
15use socket2::SockRef;
16
17use super::{
18    EcnCodepoint, IO_ERROR_LOG_INTERVAL, RecvMeta, Transmit, UdpSockRef, cmsg, log_sendmsg_error,
19};
20
21// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h
22#[cfg(apple_fast)]
23#[repr(C)]
24#[allow(non_camel_case_types)]
25pub(crate) struct msghdr_x {
26    pub msg_name: *mut libc::c_void,
27    pub msg_namelen: libc::socklen_t,
28    pub msg_iov: *mut libc::iovec,
29    pub msg_iovlen: libc::c_int,
30    pub msg_control: *mut libc::c_void,
31    pub msg_controllen: libc::socklen_t,
32    pub msg_flags: libc::c_int,
33    pub msg_datalen: usize,
34}
35
36#[cfg(apple_fast)]
37unsafe extern "C" {
38    fn recvmsg_x(
39        s: libc::c_int,
40        msgp: *const msghdr_x,
41        cnt: libc::c_uint,
42        flags: libc::c_int,
43    ) -> isize;
44
45    fn sendmsg_x(
46        s: libc::c_int,
47        msgp: *const msghdr_x,
48        cnt: libc::c_uint,
49        flags: libc::c_int,
50    ) -> isize;
51}
52
53// Defined in netinet6/in6.h on OpenBSD, this is not yet exported by the libc crate
54// directly.  See https://github.com/rust-lang/libc/issues/3704 for when we might be able to
55// rely on this from the libc crate.
56#[cfg(any(target_os = "openbsd", target_os = "netbsd"))]
57const IPV6_DONTFRAG: libc::c_int = 62;
58#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))]
59const IPV6_DONTFRAG: libc::c_int = libc::IPV6_DONTFRAG;
60
61#[cfg(target_os = "freebsd")]
62type IpTosTy = libc::c_uchar;
63#[cfg(not(any(target_os = "freebsd", target_os = "netbsd")))]
64type IpTosTy = libc::c_int;
65
66/// Tokio-compatible UDP socket with some useful specializations.
67///
68/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some
69/// platforms.
70#[derive(Debug)]
71pub struct UdpSocketState {
72    last_send_error: Mutex<Instant>,
73    max_gso_segments: AtomicUsize,
74    gro_segments: usize,
75    may_fragment: bool,
76
77    /// True if we have received EINVAL error from `sendmsg` system call at least once.
78    ///
79    /// If enabled, we assume that old kernel is used and switch to fallback mode.
80    /// In particular, we do not use IP_TOS cmsg_type in this case,
81    /// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
82    sendmsg_einval: AtomicBool,
83}
84
85impl UdpSocketState {
86    pub fn new(sock: UdpSockRef<'_>) -> io::Result<Self> {
87        let io = sock.0;
88        let mut cmsg_platform_space = 0;
89        if cfg!(target_os = "linux")
90            || cfg!(bsd)
91            || cfg!(apple)
92            || cfg!(target_os = "android")
93            || cfg!(solarish)
94        {
95            cmsg_platform_space +=
96                unsafe { libc::CMSG_SPACE(mem::size_of::<libc::in6_pktinfo>() as _) as usize };
97        }
98
99        assert!(
100            CMSG_LEN
101                >= unsafe { libc::CMSG_SPACE(mem::size_of::<libc::c_int>() as _) as usize }
102                    + cmsg_platform_space
103        );
104        assert!(
105            mem::align_of::<libc::cmsghdr>() <= mem::align_of::<cmsg::Aligned<[u8; 0]>>(),
106            "control message buffers will be misaligned"
107        );
108
109        io.set_nonblocking(true)?;
110
111        let addr = io.local_addr()?;
112        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
113
114        // mac and ios do not support IP_RECVTOS on dual-stack sockets :(
115        // older macos versions also don't have the flag and will error out if we don't ignore it
116        #[cfg(not(any(
117            target_os = "openbsd",
118            target_os = "netbsd",
119            target_os = "dragonfly",
120            solarish
121        )))]
122        if is_ipv4 || !io.only_v6()? {
123            if let Err(_err) =
124                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON)
125            {
126                crate::log::debug!("Ignoring error setting IP_RECVTOS on socket: {_err:?}");
127            }
128        }
129
130        let mut may_fragment = false;
131        #[cfg(any(target_os = "linux", target_os = "android"))]
132        {
133            // opportunistically try to enable GRO. See gro::gro_segments().
134            let _ = set_socket_option(&*io, libc::SOL_UDP, gro::UDP_GRO, OPTION_ON);
135
136            // Forbid IPv4 fragmentation. Set even for IPv6 to account for IPv6 mapped IPv4 addresses.
137            // Set `may_fragment` to `true` if this option is not supported on the platform.
138            may_fragment |= !set_socket_option_supported(
139                &*io,
140                libc::IPPROTO_IP,
141                libc::IP_MTU_DISCOVER,
142                libc::IP_PMTUDISC_PROBE,
143            )?;
144
145            if is_ipv4 {
146                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?;
147            } else {
148                // Set `may_fragment` to `true` if this option is not supported on the platform.
149                may_fragment |= !set_socket_option_supported(
150                    &*io,
151                    libc::IPPROTO_IPV6,
152                    libc::IPV6_MTU_DISCOVER,
153                    libc::IPV6_PMTUDISC_PROBE,
154                )?;
155            }
156        }
157        #[cfg(any(target_os = "freebsd", apple))]
158        {
159            if is_ipv4 {
160                // Set `may_fragment` to `true` if this option is not supported on the platform.
161                may_fragment |= !set_socket_option_supported(
162                    &*io,
163                    libc::IPPROTO_IP,
164                    libc::IP_DONTFRAG,
165                    OPTION_ON,
166                )?;
167            }
168        }
169        #[cfg(any(bsd, apple, solarish))]
170        // IP_RECVDSTADDR == IP_SENDSRCADDR on FreeBSD
171        // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS (the same on Solaris)
172        // macOS also supports IP_PKTINFO
173        {
174            if is_ipv4 {
175                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, OPTION_ON)?;
176            }
177        }
178
179        // Options standardized in RFC 3542
180        if !is_ipv4 {
181            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, OPTION_ON)?;
182            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, OPTION_ON)?;
183            // Linux's IP_PMTUDISC_PROBE allows us to operate under interface MTU rather than the
184            // kernel's path MTU guess, but actually disabling fragmentation requires this too. See
185            // __ip6_append_data in ip6_output.c.
186            // Set `may_fragment` to `true` if this option is not supported on the platform.
187            may_fragment |=
188                !set_socket_option_supported(&*io, libc::IPPROTO_IPV6, IPV6_DONTFRAG, OPTION_ON)?;
189        }
190
191        let now = Instant::now();
192        Ok(Self {
193            last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
194            max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
195            gro_segments: gro::gro_segments(),
196            may_fragment,
197            sendmsg_einval: AtomicBool::new(false),
198        })
199    }
200
201    /// Sends a [`Transmit`] on the given socket.
202    ///
203    /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`].
204    /// All other errors will be logged and converted to `Ok`.
205    ///
206    /// UDP transmission errors are considered non-fatal because higher-level protocols must
207    /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature.
208    /// Thus, logging is most likely the only thing you can do with these errors.
209    ///
210    /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`]
211    /// instead.
212    pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
213        match send(self, socket.0, transmit) {
214            Ok(()) => Ok(()),
215            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
216            // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
217            //   these by automatically clamping the MTUD upper bound to the interface MTU.
218            Err(e) if e.raw_os_error() == Some(libc::EMSGSIZE) => Ok(()),
219            Err(e) => {
220                log_sendmsg_error(&self.last_send_error, e, transmit);
221
222                Ok(())
223            }
224        }
225    }
226
227    /// Sends a [`Transmit`] on the given socket without any additional error handling.
228    pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
229        send(self, socket.0, transmit)
230    }
231
232    pub fn recv(
233        &self,
234        socket: UdpSockRef<'_>,
235        bufs: &mut [IoSliceMut<'_>],
236        meta: &mut [RecvMeta],
237    ) -> io::Result<usize> {
238        recv(socket.0, bufs, meta)
239    }
240
241    /// The maximum amount of segments which can be transmitted if a platform
242    /// supports Generic Send Offload (GSO).
243    ///
244    /// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
245    /// while using GSO.
246    #[inline]
247    pub fn max_gso_segments(&self) -> usize {
248        self.max_gso_segments.load(Ordering::Relaxed)
249    }
250
251    /// The number of segments to read when GRO is enabled. Used as a factor to
252    /// compute the receive buffer size.
253    ///
254    /// Returns 1 if the platform doesn't support GRO.
255    #[inline]
256    pub fn gro_segments(&self) -> usize {
257        self.gro_segments
258    }
259
260    /// Resize the send buffer of `socket` to `bytes`
261    #[inline]
262    pub fn set_send_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
263        socket.0.set_send_buffer_size(bytes)
264    }
265
266    /// Resize the receive buffer of `socket` to `bytes`
267    #[inline]
268    pub fn set_recv_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
269        socket.0.set_recv_buffer_size(bytes)
270    }
271
272    /// Get the size of the `socket` send buffer
273    #[inline]
274    pub fn send_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
275        socket.0.send_buffer_size()
276    }
277
278    /// Get the size of the `socket` receive buffer
279    #[inline]
280    pub fn recv_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
281        socket.0.recv_buffer_size()
282    }
283
284    /// Whether transmitted datagrams might get fragmented by the IP layer
285    ///
286    /// Returns `false` on targets which employ e.g. the `IPV6_DONTFRAG` socket option.
287    #[inline]
288    pub fn may_fragment(&self) -> bool {
289        self.may_fragment
290    }
291
292    /// Returns true if we previously got an EINVAL error from `sendmsg` syscall.
293    fn sendmsg_einval(&self) -> bool {
294        self.sendmsg_einval.load(Ordering::Relaxed)
295    }
296
297    /// Sets the flag indicating we got EINVAL error from `sendmsg` syscall.
298    #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
299    fn set_sendmsg_einval(&self) {
300        self.sendmsg_einval.store(true, Ordering::Relaxed)
301    }
302}
303
304#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
305fn send(
306    #[allow(unused_variables)] // only used on Linux
307    state: &UdpSocketState,
308    io: SockRef<'_>,
309    transmit: &Transmit<'_>,
310) -> io::Result<()> {
311    #[allow(unused_mut)] // only mutable on FreeBSD
312    let mut encode_src_ip = true;
313    #[cfg(target_os = "freebsd")]
314    {
315        let addr = io.local_addr()?;
316        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
317        if is_ipv4 {
318            if let Some(socket) = addr.as_socket_ipv4() {
319                encode_src_ip = socket.ip() == &Ipv4Addr::UNSPECIFIED;
320            }
321        }
322    }
323    let mut msg_hdr: libc::msghdr = unsafe { mem::zeroed() };
324    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
325    let mut cmsgs = cmsg::Aligned([0u8; CMSG_LEN]);
326    let dst_addr = socket2::SockAddr::from(transmit.destination);
327    prepare_msg(
328        transmit,
329        &dst_addr,
330        &mut msg_hdr,
331        &mut iovec,
332        &mut cmsgs,
333        encode_src_ip,
334        state.sendmsg_einval(),
335    );
336
337    loop {
338        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &msg_hdr, 0) };
339
340        if n >= 0 {
341            return Ok(());
342        }
343
344        let e = io::Error::last_os_error();
345        match e.kind() {
346            // Retry the transmission
347            io::ErrorKind::Interrupted => continue,
348            io::ErrorKind::WouldBlock => return Err(e),
349            _ => {
350                // Some network adapters and drivers do not support GSO. Unfortunately, Linux
351                // offers no easy way for us to detect this short of an EIO or sometimes EINVAL
352                // when we try to actually send datagrams using it.
353                #[cfg(any(target_os = "linux", target_os = "android"))]
354                if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error() {
355                    // Prevent new transmits from being scheduled using GSO. Existing GSO transmits
356                    // may already be in the pipeline, so we need to tolerate additional failures.
357                    if state.max_gso_segments() > 1 {
358                        crate::log::info!(
359                            "`libc::sendmsg` failed with {e}; halting segmentation offload"
360                        );
361                        state
362                            .max_gso_segments
363                            .store(1, std::sync::atomic::Ordering::Relaxed);
364                    }
365                }
366
367                // Some arguments to `sendmsg` are not supported. Switch to
368                // fallback mode and retry if we haven't already.
369                if e.raw_os_error() == Some(libc::EINVAL) && !state.sendmsg_einval() {
370                    state.set_sendmsg_einval();
371                    prepare_msg(
372                        transmit,
373                        &dst_addr,
374                        &mut msg_hdr,
375                        &mut iovec,
376                        &mut cmsgs,
377                        encode_src_ip,
378                        state.sendmsg_einval(),
379                    );
380                    continue;
381                }
382
383                return Err(e);
384            }
385        }
386    }
387}
388
389#[cfg(apple_fast)]
390fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
391    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
392    let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
393    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
394    let addr = socket2::SockAddr::from(transmit.destination);
395    let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len());
396    let mut cnt = 0;
397    debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE);
398    for (i, chunk) in transmit
399        .contents
400        .chunks(segment_size)
401        .enumerate()
402        .take(BATCH_SIZE)
403    {
404        prepare_msg(
405            &Transmit {
406                destination: transmit.destination,
407                ecn: transmit.ecn,
408                contents: chunk,
409                segment_size: Some(chunk.len()),
410                src_ip: transmit.src_ip,
411            },
412            &addr,
413            &mut hdrs[i],
414            &mut iovs[i],
415            &mut ctrls[i],
416            true,
417            state.sendmsg_einval(),
418        );
419        hdrs[i].msg_datalen = chunk.len();
420        cnt += 1;
421    }
422    loop {
423        let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) };
424
425        if n >= 0 {
426            return Ok(());
427        }
428
429        let e = io::Error::last_os_error();
430        match e.kind() {
431            // Retry the transmission
432            io::ErrorKind::Interrupted => continue,
433            _ => return Err(e),
434        }
435    }
436}
437
438#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))]
439fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
440    let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
441    let mut iov: libc::iovec = unsafe { mem::zeroed() };
442    let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
443    let addr = socket2::SockAddr::from(transmit.destination);
444    prepare_msg(
445        transmit,
446        &addr,
447        &mut hdr,
448        &mut iov,
449        &mut ctrl,
450        cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"),
451        state.sendmsg_einval(),
452    );
453    loop {
454        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
455
456        if n >= 0 {
457            return Ok(());
458        }
459
460        let e = io::Error::last_os_error();
461        match e.kind() {
462            // Retry the transmission
463            io::ErrorKind::Interrupted => continue,
464            _ => return Err(e),
465        }
466    }
467}
468
469#[cfg(not(any(
470    apple,
471    target_os = "openbsd",
472    target_os = "netbsd",
473    target_os = "dragonfly",
474    solarish
475)))]
476fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
477    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
478    let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
479    let mut hdrs = unsafe { mem::zeroed::<[libc::mmsghdr; BATCH_SIZE]>() };
480    let max_msg_count = bufs.len().min(BATCH_SIZE);
481    for i in 0..max_msg_count {
482        prepare_recv(
483            &mut bufs[i],
484            &mut names[i],
485            &mut ctrls[i],
486            &mut hdrs[i].msg_hdr,
487        );
488    }
489    let msg_count = loop {
490        let n = unsafe {
491            libc::recvmmsg(
492                io.as_raw_fd(),
493                hdrs.as_mut_ptr(),
494                bufs.len().min(BATCH_SIZE) as _,
495                0,
496                ptr::null_mut::<libc::timespec>(),
497            )
498        };
499
500        if n >= 0 {
501            break n;
502        }
503
504        let e = io::Error::last_os_error();
505        match e.kind() {
506            // Retry receiving
507            io::ErrorKind::Interrupted => continue,
508            _ => return Err(e),
509        }
510    };
511    for i in 0..(msg_count as usize) {
512        meta[i] = decode_recv(&names[i], &hdrs[i].msg_hdr, hdrs[i].msg_len as usize)?;
513    }
514    Ok(msg_count as usize)
515}
516
517#[cfg(apple_fast)]
518fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
519    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
520    // MacOS 10.15 `recvmsg_x` does not override the `msghdr_x`
521    // `msg_controllen`. Thus, after the call to `recvmsg_x`, one does not know
522    // which control messages have been written to. To prevent reading
523    // uninitialized memory, do not use `MaybeUninit` for `ctrls`, instead
524    // initialize `ctrls` with `0`s. A control message of all `0`s is
525    // automatically skipped by `libc::CMSG_NXTHDR`.
526    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
527    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
528    let max_msg_count = bufs.len().min(BATCH_SIZE);
529    for i in 0..max_msg_count {
530        prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
531    }
532    let msg_count = loop {
533        let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) };
534
535        if n >= 0 {
536            break n;
537        }
538
539        let e = io::Error::last_os_error();
540        match e.kind() {
541            // Retry receiving
542            io::ErrorKind::Interrupted => continue,
543            _ => return Err(e),
544        }
545    };
546    for i in 0..(msg_count as usize) {
547        meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize)?;
548    }
549    Ok(msg_count as usize)
550}
551
552#[cfg(any(
553    target_os = "openbsd",
554    target_os = "netbsd",
555    target_os = "dragonfly",
556    solarish,
557    apple_slow
558))]
559fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
560    let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
561    let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
562    let mut hdr = unsafe { mem::zeroed::<libc::msghdr>() };
563    prepare_recv(&mut bufs[0], &mut name, &mut ctrl, &mut hdr);
564    let n = loop {
565        let n = unsafe { libc::recvmsg(io.as_raw_fd(), &mut hdr, 0) };
566
567        if hdr.msg_flags & libc::MSG_TRUNC != 0 {
568            continue;
569        }
570
571        if n >= 0 {
572            break n;
573        }
574
575        let e = io::Error::last_os_error();
576        match e.kind() {
577            // Retry receiving
578            io::ErrorKind::Interrupted => continue,
579            _ => return Err(e),
580        }
581    };
582    meta[0] = decode_recv(&name, &hdr, n as usize)?;
583    Ok(1)
584}
585
586const CMSG_LEN: usize = 88;
587
588fn prepare_msg(
589    transmit: &Transmit<'_>,
590    dst_addr: &socket2::SockAddr,
591    #[cfg(not(apple_fast))] hdr: &mut libc::msghdr,
592    #[cfg(apple_fast)] hdr: &mut msghdr_x,
593    iov: &mut libc::iovec,
594    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
595    #[allow(unused_variables)] // only used on FreeBSD & macOS
596    encode_src_ip: bool,
597    sendmsg_einval: bool,
598) {
599    iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
600    iov.iov_len = transmit.contents.len();
601
602    // SAFETY: Casting the pointer to a mutable one is legal,
603    // as sendmsg is guaranteed to not alter the mutable pointer
604    // as per the POSIX spec. See the section on the sys/socket.h
605    // header for details. The type is only mutable in the first
606    // place because it is reused by recvmsg as well.
607    let name = dst_addr.as_ptr() as *mut libc::c_void;
608    let namelen = dst_addr.len();
609    hdr.msg_name = name as *mut _;
610    hdr.msg_namelen = namelen;
611    hdr.msg_iov = iov;
612    hdr.msg_iovlen = 1;
613
614    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
615    hdr.msg_controllen = CMSG_LEN as _;
616    let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
617    let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
618    // True for IPv4 or IPv4-Mapped IPv6
619    let is_ipv4 = transmit.destination.is_ipv4()
620        || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
621    if is_ipv4 {
622        if !sendmsg_einval {
623            #[cfg(not(target_os = "netbsd"))]
624            {
625                encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
626            }
627        }
628    } else {
629        encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
630    }
631
632    // Only set the segment size if it is less than the size of the contents.
633    // Some network drivers don't like being told to do GSO even if there is effectively only a single segment (i.e. `segment_size == transmit.contents.len()`)
634    // Additionally, a `segment_size` that is greater than the content also means there is effectively only a single segment.
635    // This case is actually quite common when splitting up a prepared GSO batch again after GSO has been disabled because the last datagram in a GSO batch is allowed to be smaller than the segment size.
636    if let Some(segment_size) = transmit
637        .segment_size
638        .filter(|segment_size| *segment_size < transmit.contents.len())
639    {
640        gso::set_segment_size(&mut encoder, segment_size as u16);
641    }
642
643    if let Some(ip) = &transmit.src_ip {
644        match ip {
645            IpAddr::V4(v4) => {
646                #[cfg(any(target_os = "linux", target_os = "android"))]
647                {
648                    let pktinfo = libc::in_pktinfo {
649                        ipi_ifindex: 0,
650                        ipi_spec_dst: libc::in_addr {
651                            s_addr: u32::from_ne_bytes(v4.octets()),
652                        },
653                        ipi_addr: libc::in_addr { s_addr: 0 },
654                    };
655                    encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo);
656                }
657                #[cfg(any(bsd, apple, solarish))]
658                {
659                    if encode_src_ip {
660                        let addr = libc::in_addr {
661                            s_addr: u32::from_ne_bytes(v4.octets()),
662                        };
663                        encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
664                    }
665                }
666            }
667            IpAddr::V6(v6) => {
668                let pktinfo = libc::in6_pktinfo {
669                    ipi6_ifindex: 0,
670                    ipi6_addr: libc::in6_addr {
671                        s6_addr: v6.octets(),
672                    },
673                };
674                encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
675            }
676        }
677    }
678
679    encoder.finish();
680}
681
682#[cfg(not(apple_fast))]
683fn prepare_recv(
684    buf: &mut IoSliceMut,
685    name: &mut MaybeUninit<libc::sockaddr_storage>,
686    ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
687    hdr: &mut libc::msghdr,
688) {
689    hdr.msg_name = name.as_mut_ptr() as _;
690    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
691    hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec;
692    hdr.msg_iovlen = 1;
693    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
694    hdr.msg_controllen = CMSG_LEN as _;
695    hdr.msg_flags = 0;
696}
697
698#[cfg(apple_fast)]
699fn prepare_recv(
700    buf: &mut IoSliceMut,
701    name: &mut MaybeUninit<libc::sockaddr_storage>,
702    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
703    hdr: &mut msghdr_x,
704) {
705    hdr.msg_name = name.as_mut_ptr() as _;
706    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
707    hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec;
708    hdr.msg_iovlen = 1;
709    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
710    hdr.msg_controllen = CMSG_LEN as _;
711    hdr.msg_flags = 0;
712    hdr.msg_datalen = buf.len();
713}
714
715fn decode_recv(
716    name: &MaybeUninit<libc::sockaddr_storage>,
717    #[cfg(not(apple_fast))] hdr: &libc::msghdr,
718    #[cfg(apple_fast)] hdr: &msghdr_x,
719    len: usize,
720) -> io::Result<RecvMeta> {
721    let name = unsafe { name.assume_init() };
722    let mut ecn_bits = 0;
723    let mut dst_ip = None;
724    let mut interface_index = None;
725    #[allow(unused_mut)] // only mutable on Linux
726    let mut stride = len;
727
728    let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
729    for cmsg in cmsg_iter {
730        match (cmsg.cmsg_level, cmsg.cmsg_type) {
731            (libc::IPPROTO_IP, libc::IP_TOS) => unsafe {
732                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
733            },
734            // FreeBSD uses IP_RECVTOS here, and we can be liberal because cmsgs are opt-in.
735            #[cfg(not(any(
736                target_os = "openbsd",
737                target_os = "netbsd",
738                target_os = "dragonfly",
739                solarish
740            )))]
741            (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe {
742                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
743            },
744            (libc::IPPROTO_IPV6, libc::IPV6_TCLASS) => unsafe {
745                // Temporary hack around broken macos ABI. Remove once upstream fixes it.
746                // https://bugreport.apple.com/web/?problemID=48761855
747                #[allow(clippy::unnecessary_cast)] // cmsg.cmsg_len defined as size_t
748                if cfg!(apple)
749                    && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::<u8>() as _) as usize
750                {
751                    ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
752                } else {
753                    ecn_bits = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as u8;
754                }
755            },
756            #[cfg(any(target_os = "linux", target_os = "android"))]
757            (libc::IPPROTO_IP, libc::IP_PKTINFO) => {
758                let pktinfo = unsafe { cmsg::decode::<libc::in_pktinfo, libc::cmsghdr>(cmsg) };
759                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(
760                    pktinfo.ipi_addr.s_addr.to_ne_bytes(),
761                )));
762                interface_index = Some(pktinfo.ipi_ifindex as u32);
763            }
764            #[cfg(any(bsd, apple))]
765            (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => {
766                let in_addr = unsafe { cmsg::decode::<libc::in_addr, libc::cmsghdr>(cmsg) };
767                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes())));
768            }
769            (libc::IPPROTO_IPV6, libc::IPV6_PKTINFO) => {
770                let pktinfo = unsafe { cmsg::decode::<libc::in6_pktinfo, libc::cmsghdr>(cmsg) };
771                dst_ip = Some(IpAddr::V6(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr)));
772                interface_index = Some(pktinfo.ipi6_ifindex as u32);
773            }
774            #[cfg(any(target_os = "linux", target_os = "android"))]
775            (libc::SOL_UDP, gro::UDP_GRO) => unsafe {
776                stride = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as usize;
777            },
778            _ => {}
779        }
780    }
781
782    let addr = match libc::c_int::from(name.ss_family) {
783        libc::AF_INET => {
784            // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
785            let addr: &libc::sockaddr_in =
786                unsafe { &*(&name as *const _ as *const libc::sockaddr_in) };
787            SocketAddr::V4(SocketAddrV4::new(
788                Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
789                u16::from_be(addr.sin_port),
790            ))
791        }
792        libc::AF_INET6 => {
793            // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
794            let addr: &libc::sockaddr_in6 =
795                unsafe { &*(&name as *const _ as *const libc::sockaddr_in6) };
796            SocketAddr::V6(SocketAddrV6::new(
797                Ipv6Addr::from(addr.sin6_addr.s6_addr),
798                u16::from_be(addr.sin6_port),
799                addr.sin6_flowinfo,
800                addr.sin6_scope_id,
801            ))
802        }
803        f => {
804            return Err(io::Error::other(format!(
805                "expected AF_INET or AF_INET6, got {f} in decode_recv"
806            )));
807        }
808    };
809
810    Ok(RecvMeta {
811        len,
812        stride,
813        addr,
814        ecn: EcnCodepoint::from_bits(ecn_bits),
815        dst_ip,
816        interface_index,
817    })
818}
819
820#[cfg(not(apple_slow))]
821// Chosen somewhat arbitrarily; might benefit from additional tuning.
822pub(crate) const BATCH_SIZE: usize = 32;
823
824#[cfg(apple_slow)]
825pub(crate) const BATCH_SIZE: usize = 1;
826
827#[cfg(any(target_os = "linux", target_os = "android"))]
828mod gso {
829    use super::*;
830    use std::{ffi::CStr, mem, str::FromStr, sync::OnceLock};
831
832    #[cfg(not(target_os = "android"))]
833    const UDP_SEGMENT: libc::c_int = libc::UDP_SEGMENT;
834    #[cfg(target_os = "android")]
835    // TODO: Add this to libc
836    const UDP_SEGMENT: libc::c_int = 103;
837
838    // Support for UDP GSO has been added to linux kernel in version 4.18
839    // https://github.com/torvalds/linux/commit/cb586c63e3fc5b227c51fd8c4cb40b34d3750645
840    const SUPPORTED_SINCE: KernelVersion = KernelVersion {
841        version: 4,
842        major_revision: 18,
843    };
844
845    /// Checks whether GSO support is available by checking the kernel version followed by setting
846    /// the UDP_SEGMENT option on a socket
847    pub(crate) fn max_gso_segments() -> usize {
848        const GSO_SIZE: libc::c_int = 1500;
849
850        if !SUPPORTED_BY_CURRENT_KERNEL.get_or_init(supported_by_current_kernel) {
851            return 1;
852        }
853
854        let socket = match std::net::UdpSocket::bind("[::]:0")
855            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
856        {
857            Ok(socket) => socket,
858            Err(_) => return 1,
859        };
860
861        // As defined in linux/udp.h
862        // #define UDP_MAX_SEGMENTS        (1 << 6UL)
863        match set_socket_option(&socket, libc::SOL_UDP, UDP_SEGMENT, GSO_SIZE) {
864            Ok(()) => 64,
865            Err(_e) => {
866                crate::log::debug!(
867                    "failed to set `UDP_SEGMENT` socket option ({_e}); setting `max_gso_segments = 1`"
868                );
869
870                1
871            }
872        }
873    }
874
875    pub(crate) fn set_segment_size(encoder: &mut cmsg::Encoder<libc::msghdr>, segment_size: u16) {
876        encoder.push(libc::SOL_UDP, UDP_SEGMENT, segment_size);
877    }
878
879    // Avoid calling `supported_by_current_kernel` for each socket by using `OnceLock`.
880    static SUPPORTED_BY_CURRENT_KERNEL: OnceLock<bool> = OnceLock::new();
881
882    fn supported_by_current_kernel() -> bool {
883        let kernel_version_string = match kernel_version_string() {
884            Ok(kernel_version_string) => kernel_version_string,
885            Err(_e) => {
886                crate::log::warn!("GSO disabled: uname returned {_e}");
887                return false;
888            }
889        };
890
891        let Some(kernel_version) = KernelVersion::from_str(&kernel_version_string) else {
892            crate::log::warn!(
893                "GSO disabled: failed to parse kernel version ({kernel_version_string})"
894            );
895            return false;
896        };
897
898        if kernel_version < SUPPORTED_SINCE {
899            crate::log::info!("GSO disabled: kernel too old ({kernel_version_string}); need 4.18+",);
900            return false;
901        }
902
903        true
904    }
905
906    fn kernel_version_string() -> io::Result<String> {
907        let mut n = unsafe { mem::zeroed() };
908        let r = unsafe { libc::uname(&mut n) };
909        if r != 0 {
910            return Err(io::Error::last_os_error());
911        }
912        Ok(unsafe {
913            CStr::from_ptr(n.release[..].as_ptr())
914                .to_string_lossy()
915                .into_owned()
916        })
917    }
918
919    // https://www.linfo.org/kernel_version_numbering.html
920    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
921    struct KernelVersion {
922        version: u8,
923        major_revision: u8,
924    }
925
926    impl KernelVersion {
927        fn from_str(release: &str) -> Option<Self> {
928            let mut split = release
929                .split_once('-')
930                .map(|pair| pair.0)
931                .unwrap_or(release)
932                .split('.');
933
934            let version = u8::from_str(split.next()?).ok()?;
935            let major_revision = u8::from_str(split.next()?).ok()?;
936
937            Some(Self {
938                version,
939                major_revision,
940            })
941        }
942    }
943
944    #[cfg(test)]
945    mod test {
946        use super::*;
947
948        #[test]
949        fn parse_current_kernel_version_release_string() {
950            let release = kernel_version_string().unwrap();
951            KernelVersion::from_str(&release).unwrap();
952        }
953
954        #[test]
955        fn parse_kernel_version_release_string() {
956            // These are made up for the test
957            assert_eq!(
958                KernelVersion::from_str("4.14"),
959                Some(KernelVersion {
960                    version: 4,
961                    major_revision: 14
962                })
963            );
964            assert_eq!(
965                KernelVersion::from_str("4.18"),
966                Some(KernelVersion {
967                    version: 4,
968                    major_revision: 18
969                })
970            );
971            // These were seen in the wild
972            assert_eq!(
973                KernelVersion::from_str("4.14.186-27095505"),
974                Some(KernelVersion {
975                    version: 4,
976                    major_revision: 14
977                })
978            );
979            assert_eq!(
980                KernelVersion::from_str("6.8.0-59-generic"),
981                Some(KernelVersion {
982                    version: 6,
983                    major_revision: 8
984                })
985            );
986        }
987    }
988}
989
990// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not
991// offloaded to the NIC or even the kernel, but instead done here in user space in
992// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`).
993#[cfg(not(any(target_os = "linux", target_os = "android")))]
994mod gso {
995    use super::*;
996
997    pub(super) fn max_gso_segments() -> usize {
998        #[cfg(apple_fast)]
999        {
1000            BATCH_SIZE
1001        }
1002        #[cfg(not(apple_fast))]
1003        {
1004            1
1005        }
1006    }
1007
1008    pub(super) fn set_segment_size(
1009        #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder<libc::msghdr>,
1010        #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder<msghdr_x>,
1011        _segment_size: u16,
1012    ) {
1013    }
1014}
1015
1016#[cfg(any(target_os = "linux", target_os = "android"))]
1017mod gro {
1018    use super::*;
1019
1020    #[cfg(not(target_os = "android"))]
1021    pub(crate) const UDP_GRO: libc::c_int = libc::UDP_GRO;
1022    #[cfg(target_os = "android")]
1023    // TODO: Add this to libc
1024    pub(crate) const UDP_GRO: libc::c_int = 104;
1025
1026    pub(crate) fn gro_segments() -> usize {
1027        let socket = match std::net::UdpSocket::bind("[::]:0")
1028            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
1029        {
1030            Ok(socket) => socket,
1031            Err(_) => return 1,
1032        };
1033
1034        // As defined in net/ipv4/udp_offload.c
1035        // #define UDP_GRO_CNT_MAX 64
1036        //
1037        // NOTE: this MUST be set to UDP_GRO_CNT_MAX to ensure that the receive buffer size
1038        // (get_max_udp_payload_size() * gro_segments()) is large enough to hold the largest GRO
1039        // list the kernel might potentially produce. See
1040        // https://github.com/quinn-rs/quinn/pull/1354.
1041        match set_socket_option(&socket, libc::SOL_UDP, UDP_GRO, OPTION_ON) {
1042            Ok(()) => 64,
1043            Err(_) => 1,
1044        }
1045    }
1046}
1047
1048/// Returns whether the given socket option is supported on the current platform
1049///
1050/// Yields `Ok(true)` if the option was set successfully, `Ok(false)` if setting
1051/// the option raised an `ENOPROTOOPT` or `EOPNOTSUPP` error, and `Err` for any other error.
1052fn set_socket_option_supported(
1053    socket: &impl AsRawFd,
1054    level: libc::c_int,
1055    name: libc::c_int,
1056    value: libc::c_int,
1057) -> io::Result<bool> {
1058    match set_socket_option(socket, level, name, value) {
1059        Ok(()) => Ok(true),
1060        Err(err) if err.raw_os_error() == Some(libc::ENOPROTOOPT) => Ok(false),
1061        Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => Ok(false),
1062        Err(err) => Err(err),
1063    }
1064}
1065
1066fn set_socket_option(
1067    socket: &impl AsRawFd,
1068    level: libc::c_int,
1069    name: libc::c_int,
1070    value: libc::c_int,
1071) -> io::Result<()> {
1072    let rc = unsafe {
1073        libc::setsockopt(
1074            socket.as_raw_fd(),
1075            level,
1076            name,
1077            &value as *const _ as _,
1078            mem::size_of_val(&value) as _,
1079        )
1080    };
1081
1082    match rc == 0 {
1083        true => Ok(()),
1084        false => Err(io::Error::last_os_error()),
1085    }
1086}
1087
1088const OPTION_ON: libc::c_int = 1;
1089
1090#[cfg(not(any(target_os = "linux", target_os = "android")))]
1091mod gro {
1092    pub(super) fn gro_segments() -> usize {
1093        1
1094    }
1095}