iroh_quinn_udp/
unix.rs

1#[cfg(not(any(apple, target_os = "openbsd", solarish)))]
2use std::ptr;
3use std::{
4    io::{self, IoSliceMut},
5    mem::{self, MaybeUninit},
6    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
7    num::NonZeroUsize,
8    os::unix::io::AsRawFd,
9    sync::{
10        Mutex,
11        atomic::{AtomicBool, AtomicUsize, Ordering},
12    },
13    time::Instant,
14};
15
16use socket2::SockRef;
17
18use super::{
19    EcnCodepoint, IO_ERROR_LOG_INTERVAL, RecvMeta, Transmit, UdpSockRef, cmsg, log_sendmsg_error,
20};
21
22// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h
23#[cfg(apple_fast)]
24#[repr(C)]
25#[allow(non_camel_case_types)]
26pub(crate) struct msghdr_x {
27    pub msg_name: *mut libc::c_void,
28    pub msg_namelen: libc::socklen_t,
29    pub msg_iov: *mut libc::iovec,
30    pub msg_iovlen: libc::c_int,
31    pub msg_control: *mut libc::c_void,
32    pub msg_controllen: libc::socklen_t,
33    pub msg_flags: libc::c_int,
34    pub msg_datalen: usize,
35}
36
37#[cfg(apple_fast)]
38unsafe extern "C" {
39    fn recvmsg_x(
40        s: libc::c_int,
41        msgp: *const msghdr_x,
42        cnt: libc::c_uint,
43        flags: libc::c_int,
44    ) -> isize;
45
46    fn sendmsg_x(
47        s: libc::c_int,
48        msgp: *const msghdr_x,
49        cnt: libc::c_uint,
50        flags: libc::c_int,
51    ) -> isize;
52}
53
54#[cfg(target_os = "freebsd")]
55type IpTosTy = libc::c_uchar;
56#[cfg(not(any(target_os = "freebsd", target_os = "netbsd")))]
57type IpTosTy = libc::c_int;
58
59/// Tokio-compatible UDP socket with some useful specializations.
60///
61/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some
62/// platforms.
63#[derive(Debug)]
64pub struct UdpSocketState {
65    last_send_error: Mutex<Instant>,
66    max_gso_segments: AtomicUsize,
67    gro_segments: NonZeroUsize,
68    may_fragment: bool,
69
70    /// True if we have received EINVAL error from `sendmsg` system call at least once.
71    ///
72    /// If enabled, we assume that old kernel is used and switch to fallback mode.
73    /// In particular, we do not use IP_TOS cmsg_type in this case,
74    /// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
75    sendmsg_einval: AtomicBool,
76}
77
78impl UdpSocketState {
79    pub fn new(sock: UdpSockRef<'_>) -> io::Result<Self> {
80        let io = sock.0;
81        let mut cmsg_platform_space = 0;
82        if cfg!(target_os = "linux")
83            || cfg!(bsd)
84            || cfg!(apple)
85            || cfg!(target_os = "android")
86            || cfg!(solarish)
87        {
88            cmsg_platform_space +=
89                unsafe { libc::CMSG_SPACE(mem::size_of::<libc::in6_pktinfo>() as _) as usize };
90        }
91
92        assert!(
93            CMSG_LEN
94                >= unsafe { libc::CMSG_SPACE(mem::size_of::<libc::c_int>() as _) as usize }
95                    + cmsg_platform_space
96        );
97        assert!(
98            mem::align_of::<libc::cmsghdr>() <= mem::align_of::<cmsg::Aligned<[u8; 0]>>(),
99            "control message buffers will be misaligned"
100        );
101
102        io.set_nonblocking(true)?;
103
104        let addr = io.local_addr()?;
105        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
106
107        // mac and ios do not support IP_RECVTOS on dual-stack sockets :(
108        // older macos versions also don't have the flag and will error out if we don't ignore it
109        #[cfg(not(any(
110            target_os = "openbsd",
111            target_os = "netbsd",
112            target_os = "dragonfly",
113            solarish
114        )))]
115        if is_ipv4 || !io.only_v6()? {
116            if let Err(_err) =
117                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON)
118            {
119                crate::log::debug!("Ignoring error setting IP_RECVTOS on socket: {_err:?}");
120            }
121        }
122
123        let mut may_fragment = false;
124        #[cfg(any(target_os = "linux", target_os = "android"))]
125        {
126            // opportunistically try to enable GRO. See gro::gro_segments().
127            let _ = set_socket_option(&*io, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON);
128
129            // Forbid IPv4 fragmentation. Set even for IPv6 to account for IPv6 mapped IPv4 addresses.
130            // Set `may_fragment` to `true` if this option is not supported on the platform.
131            may_fragment |= !set_socket_option_supported(
132                &*io,
133                libc::IPPROTO_IP,
134                libc::IP_MTU_DISCOVER,
135                libc::IP_PMTUDISC_PROBE,
136            )?;
137
138            if is_ipv4 {
139                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?;
140            } else {
141                // Set `may_fragment` to `true` if this option is not supported on the platform.
142                may_fragment |= !set_socket_option_supported(
143                    &*io,
144                    libc::IPPROTO_IPV6,
145                    libc::IPV6_MTU_DISCOVER,
146                    libc::IPV6_PMTUDISC_PROBE,
147                )?;
148            }
149        }
150        #[cfg(any(target_os = "freebsd", apple))]
151        {
152            if is_ipv4 {
153                // Set `may_fragment` to `true` if this option is not supported on the platform.
154                may_fragment |= !set_socket_option_supported(
155                    &*io,
156                    libc::IPPROTO_IP,
157                    libc::IP_DONTFRAG,
158                    OPTION_ON,
159                )?;
160            }
161        }
162        #[cfg(any(bsd, apple, solarish))]
163        // IP_RECVDSTADDR == IP_SENDSRCADDR on FreeBSD
164        // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS (the same on Solaris)
165        // macOS also supports IP_PKTINFO
166        {
167            if is_ipv4 {
168                set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, OPTION_ON)?;
169            }
170        }
171
172        // Options standardized in RFC 3542
173        if !is_ipv4 {
174            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, OPTION_ON)?;
175            set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, OPTION_ON)?;
176            // Linux's IP_PMTUDISC_PROBE allows us to operate under interface MTU rather than the
177            // kernel's path MTU guess, but actually disabling fragmentation requires this too. See
178            // __ip6_append_data in ip6_output.c.
179            // Set `may_fragment` to `true` if this option is not supported on the platform.
180            may_fragment |= !set_socket_option_supported(
181                &*io,
182                libc::IPPROTO_IPV6,
183                libc::IPV6_DONTFRAG,
184                OPTION_ON,
185            )?;
186        }
187
188        let now = Instant::now();
189        Ok(Self {
190            last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
191            max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
192            gro_segments: gro::gro_segments(),
193            may_fragment,
194            sendmsg_einval: AtomicBool::new(false),
195        })
196    }
197
198    /// Sends a [`Transmit`] on the given socket.
199    ///
200    /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`].
201    /// All other errors will be logged and converted to `Ok`.
202    ///
203    /// UDP transmission errors are considered non-fatal because higher-level protocols must
204    /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature.
205    /// Thus, logging is most likely the only thing you can do with these errors.
206    ///
207    /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`]
208    /// instead.
209    pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
210        match send(self, socket.0, transmit) {
211            Ok(()) => Ok(()),
212            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
213            // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
214            //   these by automatically clamping the MTUD upper bound to the interface MTU.
215            Err(e) if e.raw_os_error() == Some(libc::EMSGSIZE) => Ok(()),
216            Err(e) => {
217                log_sendmsg_error(&self.last_send_error, e, transmit);
218
219                Ok(())
220            }
221        }
222    }
223
224    /// Sends a [`Transmit`] on the given socket without any additional error handling.
225    pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
226        send(self, socket.0, transmit)
227    }
228
229    pub fn recv(
230        &self,
231        socket: UdpSockRef<'_>,
232        bufs: &mut [IoSliceMut<'_>],
233        meta: &mut [RecvMeta],
234    ) -> io::Result<usize> {
235        recv(socket.0, bufs, meta)
236    }
237
238    /// The maximum amount of segments which can be transmitted if a platform
239    /// supports Generic Send Offload (GSO).
240    ///
241    /// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
242    /// while using GSO.
243    #[inline]
244    pub fn max_gso_segments(&self) -> NonZeroUsize {
245        self.max_gso_segments
246            .load(Ordering::Relaxed)
247            .try_into()
248            .expect("must have non zero GSO segments")
249    }
250
251    /// The number of segments to read when GRO is enabled. Used as a factor to
252    /// compute the receive buffer size.
253    ///
254    /// Returns 1 if the platform doesn't support GRO.
255    #[inline]
256    pub fn gro_segments(&self) -> NonZeroUsize {
257        self.gro_segments
258    }
259
260    /// Resize the send buffer of `socket` to `bytes`
261    #[inline]
262    pub fn set_send_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
263        socket.0.set_send_buffer_size(bytes)
264    }
265
266    /// Resize the receive buffer of `socket` to `bytes`
267    #[inline]
268    pub fn set_recv_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
269        socket.0.set_recv_buffer_size(bytes)
270    }
271
272    /// Get the size of the `socket` send buffer
273    #[inline]
274    pub fn send_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
275        socket.0.send_buffer_size()
276    }
277
278    /// Get the size of the `socket` receive buffer
279    #[inline]
280    pub fn recv_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
281        socket.0.recv_buffer_size()
282    }
283
284    /// Whether transmitted datagrams might get fragmented by the IP layer
285    ///
286    /// Returns `false` on targets which employ e.g. the `IPV6_DONTFRAG` socket option.
287    #[inline]
288    pub fn may_fragment(&self) -> bool {
289        self.may_fragment
290    }
291
292    /// Returns true if we previously got an EINVAL error from `sendmsg` syscall.
293    fn sendmsg_einval(&self) -> bool {
294        self.sendmsg_einval.load(Ordering::Relaxed)
295    }
296
297    /// Sets the flag indicating we got EINVAL error from `sendmsg` syscall.
298    #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
299    fn set_sendmsg_einval(&self) {
300        self.sendmsg_einval.store(true, Ordering::Relaxed)
301    }
302}
303
304#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
305fn send(
306    #[allow(unused_variables)] // only used on Linux
307    state: &UdpSocketState,
308    io: SockRef<'_>,
309    transmit: &Transmit<'_>,
310) -> io::Result<()> {
311    #[allow(unused_mut)] // only mutable on FreeBSD
312    let mut encode_src_ip = true;
313    #[cfg(target_os = "freebsd")]
314    {
315        let addr = io.local_addr()?;
316        let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
317        if is_ipv4 {
318            if let Some(socket) = addr.as_socket_ipv4() {
319                encode_src_ip = socket.ip() == &Ipv4Addr::UNSPECIFIED;
320            }
321        }
322    }
323    let mut msg_hdr: libc::msghdr = unsafe { mem::zeroed() };
324    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
325    let mut cmsgs = cmsg::Aligned([0u8; CMSG_LEN]);
326    let dst_addr = socket2::SockAddr::from(transmit.destination);
327    prepare_msg(
328        transmit,
329        &dst_addr,
330        &mut msg_hdr,
331        &mut iovec,
332        &mut cmsgs,
333        encode_src_ip,
334        state.sendmsg_einval(),
335    );
336
337    loop {
338        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &msg_hdr, 0) };
339
340        if n >= 0 {
341            return Ok(());
342        }
343
344        let e = io::Error::last_os_error();
345        match e.kind() {
346            // Retry the transmission
347            io::ErrorKind::Interrupted => continue,
348            io::ErrorKind::WouldBlock => return Err(e),
349            _ => {
350                // Some network adapters and drivers do not support GSO. Unfortunately, Linux
351                // offers no easy way for us to detect this short of an EIO or sometimes EINVAL
352                // when we try to actually send datagrams using it.
353                #[cfg(any(target_os = "linux", target_os = "android"))]
354                if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error() {
355                    // Prevent new transmits from being scheduled using GSO. Existing GSO transmits
356                    // may already be in the pipeline, so we need to tolerate additional failures.
357                    if state.max_gso_segments().get() > 1 {
358                        crate::log::info!(
359                            "`libc::sendmsg` failed with {e}; halting segmentation offload"
360                        );
361                        state
362                            .max_gso_segments
363                            .store(1, std::sync::atomic::Ordering::Relaxed);
364                    }
365                }
366
367                // Some arguments to `sendmsg` are not supported. Switch to
368                // fallback mode and retry if we haven't already.
369                if e.raw_os_error() == Some(libc::EINVAL) && !state.sendmsg_einval() {
370                    state.set_sendmsg_einval();
371                    prepare_msg(
372                        transmit,
373                        &dst_addr,
374                        &mut msg_hdr,
375                        &mut iovec,
376                        &mut cmsgs,
377                        encode_src_ip,
378                        state.sendmsg_einval(),
379                    );
380                    continue;
381                }
382
383                return Err(e);
384            }
385        }
386    }
387}
388
389#[cfg(apple_fast)]
390fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
391    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
392    let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
393    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
394    let addr = socket2::SockAddr::from(transmit.destination);
395    let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len());
396    let mut cnt = 0;
397    debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE);
398    for (i, chunk) in transmit
399        .contents
400        .chunks(segment_size)
401        .enumerate()
402        .take(BATCH_SIZE)
403    {
404        prepare_msg(
405            &Transmit {
406                destination: transmit.destination,
407                ecn: transmit.ecn,
408                contents: chunk,
409                segment_size: Some(chunk.len()),
410                src_ip: transmit.src_ip,
411            },
412            &addr,
413            &mut hdrs[i],
414            &mut iovs[i],
415            &mut ctrls[i],
416            true,
417            state.sendmsg_einval(),
418        );
419        hdrs[i].msg_datalen = chunk.len();
420        cnt += 1;
421    }
422    loop {
423        let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) };
424
425        if n >= 0 {
426            return Ok(());
427        }
428
429        let e = io::Error::last_os_error();
430        match e.kind() {
431            // Retry the transmission
432            io::ErrorKind::Interrupted => continue,
433            _ => return Err(e),
434        }
435    }
436}
437
438#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))]
439fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
440    let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
441    let mut iov: libc::iovec = unsafe { mem::zeroed() };
442    let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
443    let addr = socket2::SockAddr::from(transmit.destination);
444    prepare_msg(
445        transmit,
446        &addr,
447        &mut hdr,
448        &mut iov,
449        &mut ctrl,
450        cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"),
451        state.sendmsg_einval(),
452    );
453    loop {
454        let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
455
456        if n >= 0 {
457            return Ok(());
458        }
459
460        let e = io::Error::last_os_error();
461        match e.kind() {
462            // Retry the transmission
463            io::ErrorKind::Interrupted => continue,
464            _ => return Err(e),
465        }
466    }
467}
468
469#[cfg(not(any(
470    apple,
471    target_os = "openbsd",
472    target_os = "netbsd",
473    target_os = "dragonfly",
474    solarish
475)))]
476fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
477    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
478    let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
479    let mut hdrs = unsafe { mem::zeroed::<[libc::mmsghdr; BATCH_SIZE]>() };
480    let max_msg_count = bufs.len().min(BATCH_SIZE);
481    for i in 0..max_msg_count {
482        prepare_recv(
483            &mut bufs[i],
484            &mut names[i],
485            &mut ctrls[i],
486            &mut hdrs[i].msg_hdr,
487        );
488    }
489    let msg_count = loop {
490        let n = unsafe {
491            libc::recvmmsg(
492                io.as_raw_fd(),
493                hdrs.as_mut_ptr(),
494                bufs.len().min(BATCH_SIZE) as _,
495                0,
496                ptr::null_mut::<libc::timespec>(),
497            )
498        };
499
500        if n >= 0 {
501            break n;
502        }
503
504        let e = io::Error::last_os_error();
505        match e.kind() {
506            // Retry receiving
507            io::ErrorKind::Interrupted => continue,
508            _ => return Err(e),
509        }
510    };
511    for i in 0..(msg_count as usize) {
512        meta[i] = decode_recv(&names[i], &hdrs[i].msg_hdr, hdrs[i].msg_len as usize)?;
513    }
514    Ok(msg_count as usize)
515}
516
517#[cfg(apple_fast)]
518fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
519    let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
520    // MacOS 10.15 `recvmsg_x` does not override the `msghdr_x`
521    // `msg_controllen`. Thus, after the call to `recvmsg_x`, one does not know
522    // which control messages have been written to. To prevent reading
523    // uninitialized memory, do not use `MaybeUninit` for `ctrls`, instead
524    // initialize `ctrls` with `0`s. A control message of all `0`s is
525    // automatically skipped by `libc::CMSG_NXTHDR`.
526    let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
527    let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
528    let max_msg_count = bufs.len().min(BATCH_SIZE);
529    for i in 0..max_msg_count {
530        prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
531    }
532    let msg_count = loop {
533        let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) };
534
535        if n >= 0 {
536            break n;
537        }
538
539        let e = io::Error::last_os_error();
540        match e.kind() {
541            // Retry receiving
542            io::ErrorKind::Interrupted => continue,
543            _ => return Err(e),
544        }
545    };
546    for i in 0..(msg_count as usize) {
547        meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize)?;
548    }
549    Ok(msg_count as usize)
550}
551
552#[cfg(any(
553    target_os = "openbsd",
554    target_os = "netbsd",
555    target_os = "dragonfly",
556    solarish,
557    apple_slow
558))]
559fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
560    let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
561    let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
562    let mut hdr = unsafe { mem::zeroed::<libc::msghdr>() };
563    prepare_recv(&mut bufs[0], &mut name, &mut ctrl, &mut hdr);
564    let n = loop {
565        let n = unsafe { libc::recvmsg(io.as_raw_fd(), &mut hdr, 0) };
566
567        if hdr.msg_flags & libc::MSG_TRUNC != 0 {
568            continue;
569        }
570
571        if n >= 0 {
572            break n;
573        }
574
575        let e = io::Error::last_os_error();
576        match e.kind() {
577            // Retry receiving
578            io::ErrorKind::Interrupted => continue,
579            _ => return Err(e),
580        }
581    };
582    meta[0] = decode_recv(&name, &hdr, n as usize)?;
583    Ok(1)
584}
585
586const CMSG_LEN: usize = 88;
587
588fn prepare_msg(
589    transmit: &Transmit<'_>,
590    dst_addr: &socket2::SockAddr,
591    #[cfg(not(apple_fast))] hdr: &mut libc::msghdr,
592    #[cfg(apple_fast)] hdr: &mut msghdr_x,
593    iov: &mut libc::iovec,
594    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
595    #[allow(unused_variables)] // only used on FreeBSD & macOS
596    encode_src_ip: bool,
597    sendmsg_einval: bool,
598) {
599    iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
600    iov.iov_len = transmit.contents.len();
601
602    // SAFETY: Casting the pointer to a mutable one is legal,
603    // as sendmsg is guaranteed to not alter the mutable pointer
604    // as per the POSIX spec. See the section on the sys/socket.h
605    // header for details. The type is only mutable in the first
606    // place because it is reused by recvmsg as well.
607    let name = dst_addr.as_ptr() as *mut libc::c_void;
608    let namelen = dst_addr.len();
609    hdr.msg_name = name as *mut _;
610    hdr.msg_namelen = namelen;
611    hdr.msg_iov = iov;
612    hdr.msg_iovlen = 1;
613
614    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
615    hdr.msg_controllen = CMSG_LEN as _;
616    let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
617    let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
618    // True for IPv4 or IPv4-Mapped IPv6
619    let is_ipv4 = transmit.destination.is_ipv4()
620        || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
621    if is_ipv4 {
622        if !sendmsg_einval {
623            #[cfg(not(target_os = "netbsd"))]
624            {
625                encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
626            }
627        }
628    } else {
629        encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
630    }
631
632    if let Some(segment_size) = transmit.effective_segment_size() {
633        gso::set_segment_size(&mut encoder, segment_size as u16);
634    }
635
636    if let Some(ip) = &transmit.src_ip {
637        match ip {
638            IpAddr::V4(v4) => {
639                #[cfg(any(target_os = "linux", target_os = "android"))]
640                {
641                    let pktinfo = libc::in_pktinfo {
642                        ipi_ifindex: 0,
643                        ipi_spec_dst: libc::in_addr {
644                            s_addr: u32::from_ne_bytes(v4.octets()),
645                        },
646                        ipi_addr: libc::in_addr { s_addr: 0 },
647                    };
648                    encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo);
649                }
650                #[cfg(any(bsd, apple, solarish))]
651                {
652                    if encode_src_ip {
653                        let addr = libc::in_addr {
654                            s_addr: u32::from_ne_bytes(v4.octets()),
655                        };
656                        encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
657                    }
658                }
659            }
660            IpAddr::V6(v6) => {
661                let pktinfo = libc::in6_pktinfo {
662                    ipi6_ifindex: 0,
663                    ipi6_addr: libc::in6_addr {
664                        s6_addr: v6.octets(),
665                    },
666                };
667                encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
668            }
669        }
670    }
671
672    encoder.finish();
673}
674
675#[cfg(not(apple_fast))]
676fn prepare_recv(
677    buf: &mut IoSliceMut<'_>,
678    name: &mut MaybeUninit<libc::sockaddr_storage>,
679    ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
680    hdr: &mut libc::msghdr,
681) {
682    hdr.msg_name = name.as_mut_ptr() as _;
683    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
684    hdr.msg_iov = buf as *mut IoSliceMut<'_> as *mut libc::iovec;
685    hdr.msg_iovlen = 1;
686    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
687    hdr.msg_controllen = CMSG_LEN as _;
688    hdr.msg_flags = 0;
689}
690
691#[cfg(apple_fast)]
692fn prepare_recv(
693    buf: &mut IoSliceMut<'_>,
694    name: &mut MaybeUninit<libc::sockaddr_storage>,
695    ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
696    hdr: &mut msghdr_x,
697) {
698    hdr.msg_name = name.as_mut_ptr() as _;
699    hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
700    hdr.msg_iov = buf as *mut IoSliceMut<'_> as *mut libc::iovec;
701    hdr.msg_iovlen = 1;
702    hdr.msg_control = ctrl.0.as_mut_ptr() as _;
703    hdr.msg_controllen = CMSG_LEN as _;
704    hdr.msg_flags = 0;
705    hdr.msg_datalen = buf.len();
706}
707
708fn decode_recv<M: cmsg::MsgHdr<ControlMessage = libc::cmsghdr>>(
709    name: &MaybeUninit<libc::sockaddr_storage>,
710    hdr: &M,
711    len: usize,
712) -> io::Result<RecvMeta> {
713    let name = unsafe { name.assume_init() };
714    let mut ecn_bits = 0;
715    let mut dst_ip = None;
716    let mut interface_index = None;
717    #[allow(unused_mut)] // only mutable on Linux
718    let mut stride = len;
719
720    let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
721    for cmsg in cmsg_iter {
722        match (cmsg.cmsg_level, cmsg.cmsg_type) {
723            (libc::IPPROTO_IP, libc::IP_TOS) => unsafe {
724                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
725            },
726            // FreeBSD uses IP_RECVTOS here, and we can be liberal because cmsgs are opt-in.
727            #[cfg(not(any(
728                target_os = "openbsd",
729                target_os = "netbsd",
730                target_os = "dragonfly",
731                solarish
732            )))]
733            (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe {
734                ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
735            },
736            (libc::IPPROTO_IPV6, libc::IPV6_TCLASS) => unsafe {
737                // Temporary hack around broken macos ABI. Remove once upstream fixes it.
738                // https://bugreport.apple.com/web/?problemID=48761855
739                #[allow(clippy::unnecessary_cast)] // cmsg.cmsg_len defined as size_t
740                if cfg!(apple)
741                    && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::<u8>() as _) as usize
742                {
743                    ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
744                } else {
745                    ecn_bits = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as u8;
746                }
747            },
748            #[cfg(any(target_os = "linux", target_os = "android"))]
749            (libc::IPPROTO_IP, libc::IP_PKTINFO) => {
750                let pktinfo = unsafe { cmsg::decode::<libc::in_pktinfo, libc::cmsghdr>(cmsg) };
751                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(
752                    pktinfo.ipi_addr.s_addr.to_ne_bytes(),
753                )));
754                interface_index = Some(pktinfo.ipi_ifindex as u32);
755            }
756            #[cfg(any(bsd, apple))]
757            (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => {
758                let in_addr = unsafe { cmsg::decode::<libc::in_addr, libc::cmsghdr>(cmsg) };
759                dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes())));
760            }
761            (libc::IPPROTO_IPV6, libc::IPV6_PKTINFO) => {
762                let pktinfo = unsafe { cmsg::decode::<libc::in6_pktinfo, libc::cmsghdr>(cmsg) };
763                dst_ip = Some(IpAddr::V6(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr)));
764                interface_index = Some(pktinfo.ipi6_ifindex as u32);
765            }
766            #[cfg(any(target_os = "linux", target_os = "android"))]
767            (libc::SOL_UDP, libc::UDP_GRO) => unsafe {
768                stride = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as usize;
769            },
770            _ => {}
771        }
772    }
773
774    let addr = match libc::c_int::from(name.ss_family) {
775        libc::AF_INET => {
776            // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
777            let addr: &libc::sockaddr_in =
778                unsafe { &*(&name as *const _ as *const libc::sockaddr_in) };
779            SocketAddr::V4(SocketAddrV4::new(
780                Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
781                u16::from_be(addr.sin_port),
782            ))
783        }
784        libc::AF_INET6 => {
785            // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
786            let addr: &libc::sockaddr_in6 =
787                unsafe { &*(&name as *const _ as *const libc::sockaddr_in6) };
788            SocketAddr::V6(SocketAddrV6::new(
789                Ipv6Addr::from(addr.sin6_addr.s6_addr),
790                u16::from_be(addr.sin6_port),
791                addr.sin6_flowinfo,
792                addr.sin6_scope_id,
793            ))
794        }
795        f => {
796            return Err(io::Error::other(format!(
797                "expected AF_INET or AF_INET6, got {f} in decode_recv"
798            )));
799        }
800    };
801
802    Ok(RecvMeta {
803        len,
804        stride,
805        addr,
806        ecn: EcnCodepoint::from_bits(ecn_bits),
807        dst_ip,
808        interface_index,
809    })
810}
811
812#[cfg(not(apple_slow))]
813// Chosen somewhat arbitrarily; might benefit from additional tuning.
814pub(crate) const BATCH_SIZE: usize = 32;
815
816#[cfg(apple_slow)]
817pub(crate) const BATCH_SIZE: usize = 1;
818
819#[cfg(any(target_os = "linux", target_os = "android"))]
820mod gso {
821    use super::*;
822    use std::{ffi::CStr, mem, str::FromStr, sync::OnceLock};
823
824    // Support for UDP GSO has been added to linux kernel in version 4.18
825    // https://github.com/torvalds/linux/commit/cb586c63e3fc5b227c51fd8c4cb40b34d3750645
826    const SUPPORTED_SINCE: KernelVersion = KernelVersion {
827        version: 4,
828        major_revision: 18,
829    };
830
831    /// Checks whether GSO support is available by checking the kernel version followed by setting
832    /// the UDP_SEGMENT option on a socket
833    pub(crate) fn max_gso_segments() -> usize {
834        const GSO_SIZE: libc::c_int = 1500;
835
836        if !SUPPORTED_BY_CURRENT_KERNEL.get_or_init(supported_by_current_kernel) {
837            return 1;
838        }
839
840        let socket = match std::net::UdpSocket::bind("[::]:0")
841            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
842        {
843            Ok(socket) => socket,
844            Err(_) => return 1,
845        };
846
847        // As defined in linux/udp.h
848        // #define UDP_MAX_SEGMENTS        (1 << 6UL)
849        match set_socket_option(&socket, libc::SOL_UDP, libc::UDP_SEGMENT, GSO_SIZE) {
850            Ok(()) => 64,
851            Err(_e) => {
852                crate::log::debug!(
853                    "failed to set `UDP_SEGMENT` socket option ({_e}); setting `max_gso_segments = 1`"
854                );
855
856                1
857            }
858        }
859    }
860
861    pub(crate) fn set_segment_size(
862        encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
863        segment_size: u16,
864    ) {
865        encoder.push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size);
866    }
867
868    // Avoid calling `supported_by_current_kernel` for each socket by using `OnceLock`.
869    static SUPPORTED_BY_CURRENT_KERNEL: OnceLock<bool> = OnceLock::new();
870
871    fn supported_by_current_kernel() -> bool {
872        let kernel_version_string = match kernel_version_string() {
873            Ok(kernel_version_string) => kernel_version_string,
874            Err(_e) => {
875                crate::log::warn!("GSO disabled: uname returned {_e}");
876                return false;
877            }
878        };
879
880        let Some(kernel_version) = KernelVersion::from_str(&kernel_version_string) else {
881            crate::log::warn!(
882                "GSO disabled: failed to parse kernel version ({kernel_version_string})"
883            );
884            return false;
885        };
886
887        if kernel_version < SUPPORTED_SINCE {
888            crate::log::info!("GSO disabled: kernel too old ({kernel_version_string}); need 4.18+",);
889            return false;
890        }
891
892        true
893    }
894
895    fn kernel_version_string() -> io::Result<String> {
896        let mut n = unsafe { mem::zeroed() };
897        let r = unsafe { libc::uname(&mut n) };
898        if r != 0 {
899            return Err(io::Error::last_os_error());
900        }
901        Ok(unsafe {
902            CStr::from_ptr(n.release[..].as_ptr())
903                .to_string_lossy()
904                .into_owned()
905        })
906    }
907
908    // https://www.linfo.org/kernel_version_numbering.html
909    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
910    struct KernelVersion {
911        version: u8,
912        major_revision: u8,
913    }
914
915    impl KernelVersion {
916        fn from_str(release: &str) -> Option<Self> {
917            let mut split = release
918                .split_once('-')
919                .map(|pair| pair.0)
920                .unwrap_or(release)
921                .split('.');
922
923            let version = u8::from_str(split.next()?).ok()?;
924            let major_revision = u8::from_str(split.next()?).ok()?;
925
926            Some(Self {
927                version,
928                major_revision,
929            })
930        }
931    }
932
933    #[cfg(test)]
934    mod test {
935        use super::*;
936
937        #[test]
938        fn parse_current_kernel_version_release_string() {
939            let release = kernel_version_string().unwrap();
940            KernelVersion::from_str(&release).unwrap();
941        }
942
943        #[test]
944        fn parse_kernel_version_release_string() {
945            // These are made up for the test
946            assert_eq!(
947                KernelVersion::from_str("4.14"),
948                Some(KernelVersion {
949                    version: 4,
950                    major_revision: 14
951                })
952            );
953            assert_eq!(
954                KernelVersion::from_str("4.18"),
955                Some(KernelVersion {
956                    version: 4,
957                    major_revision: 18
958                })
959            );
960            // These were seen in the wild
961            assert_eq!(
962                KernelVersion::from_str("4.14.186-27095505"),
963                Some(KernelVersion {
964                    version: 4,
965                    major_revision: 14
966                })
967            );
968            assert_eq!(
969                KernelVersion::from_str("6.8.0-59-generic"),
970                Some(KernelVersion {
971                    version: 6,
972                    major_revision: 8
973                })
974            );
975        }
976    }
977}
978
979// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not
980// offloaded to the NIC or even the kernel, but instead done here in user space in
981// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`).
982#[cfg(not(any(target_os = "linux", target_os = "android")))]
983mod gso {
984    use super::*;
985
986    pub(super) fn max_gso_segments() -> usize {
987        #[cfg(apple_fast)]
988        {
989            BATCH_SIZE
990        }
991        #[cfg(not(apple_fast))]
992        {
993            1
994        }
995    }
996
997    pub(super) fn set_segment_size(
998        #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
999        #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder<'_, msghdr_x>,
1000        _segment_size: u16,
1001    ) {
1002    }
1003}
1004
1005#[cfg(any(target_os = "linux", target_os = "android"))]
1006mod gro {
1007    use super::*;
1008
1009    pub(crate) fn gro_segments() -> NonZeroUsize {
1010        let socket = match std::net::UdpSocket::bind("[::]:0")
1011            .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
1012        {
1013            Ok(socket) => socket,
1014            Err(_) => return NonZeroUsize::MIN,
1015        };
1016
1017        // As defined in net/ipv4/udp_offload.c
1018        // #define UDP_GRO_CNT_MAX 64
1019        //
1020        // NOTE: this MUST be set to UDP_GRO_CNT_MAX to ensure that the receive buffer size
1021        // (get_max_udp_payload_size() * gro_segments()) is large enough to hold the largest GRO
1022        // list the kernel might potentially produce. See
1023        // https://github.com/quinn-rs/quinn/pull/1354.
1024        match set_socket_option(&socket, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON) {
1025            Ok(()) => NonZeroUsize::new(64).expect("known"),
1026            Err(_) => NonZeroUsize::MIN,
1027        }
1028    }
1029}
1030
1031/// Returns whether the given socket option is supported on the current platform
1032///
1033/// Yields `Ok(true)` if the option was set successfully, `Ok(false)` if setting
1034/// the option raised an `ENOPROTOOPT` or `EOPNOTSUPP` error, and `Err` for any other error.
1035fn set_socket_option_supported(
1036    socket: &impl AsRawFd,
1037    level: libc::c_int,
1038    name: libc::c_int,
1039    value: libc::c_int,
1040) -> io::Result<bool> {
1041    match set_socket_option(socket, level, name, value) {
1042        Ok(()) => Ok(true),
1043        Err(err) if err.raw_os_error() == Some(libc::ENOPROTOOPT) => Ok(false),
1044        Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => Ok(false),
1045        Err(err) => Err(err),
1046    }
1047}
1048
1049fn set_socket_option(
1050    socket: &impl AsRawFd,
1051    level: libc::c_int,
1052    name: libc::c_int,
1053    value: libc::c_int,
1054) -> io::Result<()> {
1055    let rc = unsafe {
1056        libc::setsockopt(
1057            socket.as_raw_fd(),
1058            level,
1059            name,
1060            &value as *const _ as _,
1061            mem::size_of_val(&value) as _,
1062        )
1063    };
1064
1065    match rc == 0 {
1066        true => Ok(()),
1067        false => Err(io::Error::last_os_error()),
1068    }
1069}
1070
1071const OPTION_ON: libc::c_int = 1;
1072
1073#[cfg(not(any(target_os = "linux", target_os = "android")))]
1074mod gro {
1075    use std::num::NonZeroUsize;
1076
1077    pub(super) fn gro_segments() -> NonZeroUsize {
1078        NonZeroUsize::MIN
1079    }
1080}