1#[cfg(not(any(apple, target_os = "openbsd", solarish)))]
2use std::ptr;
3use std::{
4 io::{self, IoSliceMut},
5 mem::{self, MaybeUninit},
6 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
7 num::NonZeroUsize,
8 os::fd::AsRawFd,
9 sync::{
10 Mutex,
11 atomic::{AtomicBool, AtomicUsize, Ordering},
12 },
13 time::Instant,
14};
15
16use socket2::SockRef;
17
18use super::{
19 EcnCodepoint, IO_ERROR_LOG_INTERVAL, RecvMeta, Transmit, UdpSockRef, cmsg, log_sendmsg_error,
20};
21
22#[cfg(apple_fast)]
24#[repr(C)]
25#[allow(non_camel_case_types)]
26pub(crate) struct msghdr_x {
27 pub msg_name: *mut libc::c_void,
28 pub msg_namelen: libc::socklen_t,
29 pub msg_iov: *mut libc::iovec,
30 pub msg_iovlen: libc::c_int,
31 pub msg_control: *mut libc::c_void,
32 pub msg_controllen: libc::socklen_t,
33 pub msg_flags: libc::c_int,
34 pub msg_datalen: usize,
35}
36
37#[cfg(target_os = "freebsd")]
38type IpTosTy = libc::c_uchar;
39#[cfg(not(any(target_os = "freebsd", target_os = "netbsd")))]
40type IpTosTy = libc::c_int;
41
42#[derive(Debug)]
47pub struct UdpSocketState {
48 last_send_error: Mutex<Instant>,
49 max_gso_segments: AtomicUsize,
50 gro_segments: NonZeroUsize,
51 may_fragment: bool,
52
53 sendmsg_einval: AtomicBool,
59
60 #[cfg(apple_fast)]
65 apple_fast_path: AtomicBool,
66}
67
68impl UdpSocketState {
69 pub fn new(sock: UdpSockRef<'_>) -> io::Result<Self> {
70 let io = sock.0;
71 let mut cmsg_platform_space = 0;
72 if cfg!(target_os = "linux")
73 || cfg!(bsd)
74 || cfg!(apple)
75 || cfg!(target_os = "android")
76 || cfg!(solarish)
77 {
78 cmsg_platform_space +=
79 unsafe { libc::CMSG_SPACE(mem::size_of::<libc::in6_pktinfo>() as _) as usize };
80 }
81
82 assert!(
83 CMSG_LEN
84 >= unsafe { libc::CMSG_SPACE(mem::size_of::<libc::c_int>() as _) as usize }
85 + cmsg_platform_space
86 );
87 assert!(
88 mem::align_of::<libc::cmsghdr>() <= mem::align_of::<cmsg::Aligned<[u8; 0]>>(),
89 "control message buffers will be misaligned"
90 );
91
92 io.set_nonblocking(true)?;
93
94 let addr = io.local_addr()?;
95 let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
96
97 #[cfg(not(any(
100 target_os = "openbsd",
101 target_os = "netbsd",
102 target_os = "dragonfly",
103 solarish
104 )))]
105 if (is_ipv4 || !io.only_v6()?)
106 && let Err(_err) =
107 set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON)
108 {
109 crate::log::debug!("Ignoring error setting IP_RECVTOS on socket: {_err:?}");
110 }
111
112 let mut may_fragment = false;
113 #[cfg_attr(
114 not(any(target_os = "linux", target_os = "android")),
115 expect(unused_mut)
116 )]
117 let mut gro_segments = NonZeroUsize::MIN;
118
119 #[cfg(any(target_os = "linux", target_os = "android"))]
120 {
121 may_fragment |= !set_socket_option_supported(
124 &*io,
125 libc::IPPROTO_IP,
126 libc::IP_MTU_DISCOVER,
127 libc::IP_PMTUDISC_PROBE,
128 )?;
129
130 if is_ipv4 {
131 set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?;
132 } else {
133 may_fragment |= !set_socket_option_supported(
135 &*io,
136 libc::IPPROTO_IPV6,
137 libc::IPV6_MTU_DISCOVER,
138 libc::IPV6_PMTUDISC_PROBE,
139 )?;
140 }
141
142 if set_socket_option(&*io, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON).is_ok() {
143 gro_segments = NonZeroUsize::new(64).expect("known");
151 }
152 }
153 #[cfg(any(target_os = "freebsd", apple))]
154 {
155 if is_ipv4 {
156 may_fragment |= !set_socket_option_supported(
158 &*io,
159 libc::IPPROTO_IP,
160 libc::IP_DONTFRAG,
161 OPTION_ON,
162 )?;
163 }
164 }
165 #[cfg(any(bsd, apple, solarish))]
166 {
170 if is_ipv4 {
171 set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, OPTION_ON)?;
172 }
173 }
174
175 if !is_ipv4 {
177 set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, OPTION_ON)?;
178 set_socket_option(&*io, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, OPTION_ON)?;
179 may_fragment |= !set_socket_option_supported(
184 &*io,
185 libc::IPPROTO_IPV6,
186 libc::IPV6_DONTFRAG,
187 OPTION_ON,
188 )?;
189 }
190
191 let now = Instant::now();
192 Ok(Self {
193 last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
194 max_gso_segments: AtomicUsize::new(gso::max_gso_segments(&*io)),
195 gro_segments,
196 may_fragment,
197 sendmsg_einval: AtomicBool::new(false),
198 #[cfg(apple_fast)]
199 apple_fast_path: AtomicBool::new(false),
200 })
201 }
202
203 pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
215 match send(self, socket.0, transmit) {
216 Ok(()) => Ok(()),
217 Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
218 Err(e) if e.raw_os_error() == Some(libc::EMSGSIZE) => Ok(()),
221 Err(e) => {
222 log_sendmsg_error(&self.last_send_error, e, transmit);
223
224 Ok(())
225 }
226 }
227 }
228
229 pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
231 send(self, socket.0, transmit)
232 }
233
234 #[cfg(not(any(
235 apple,
236 target_os = "openbsd",
237 target_os = "netbsd",
238 target_os = "dragonfly",
239 solarish
240 )))]
241 pub fn recv(
242 &self,
243 socket: UdpSockRef<'_>,
244 bufs: &mut [IoSliceMut<'_>],
245 meta: &mut [RecvMeta],
246 ) -> io::Result<usize> {
247 recv_via_recvmmsg(socket.0, bufs, meta)
248 }
249
250 #[cfg(apple_fast)]
251 pub fn recv(
252 &self,
253 socket: UdpSockRef<'_>,
254 bufs: &mut [IoSliceMut<'_>],
255 meta: &mut [RecvMeta],
256 ) -> io::Result<usize> {
257 if self.is_apple_fast_path_enabled() {
258 recv_via_recvmsg_x(self, socket.0, bufs, meta)
259 } else {
260 recv_single(socket.0, bufs, meta)
261 }
262 }
263
264 #[cfg(any(
265 target_os = "openbsd",
266 target_os = "netbsd",
267 target_os = "dragonfly",
268 solarish,
269 apple_slow
270 ))]
271 pub fn recv(
272 &self,
273 socket: UdpSockRef<'_>,
274 bufs: &mut [IoSliceMut<'_>],
275 meta: &mut [RecvMeta],
276 ) -> io::Result<usize> {
277 recv_single(socket.0, bufs, meta)
278 }
279
280 #[inline]
286 pub fn max_gso_segments(&self) -> NonZeroUsize {
287 self.max_gso_segments
288 .load(Ordering::Relaxed)
289 .try_into()
290 .expect("must have non zero GSO segments")
291 }
292
293 #[inline]
298 pub fn gro_segments(&self) -> NonZeroUsize {
299 self.gro_segments
300 }
301
302 #[inline]
304 pub fn set_send_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
305 socket.0.set_send_buffer_size(bytes)
306 }
307
308 #[inline]
310 pub fn set_recv_buffer_size(&self, socket: UdpSockRef<'_>, bytes: usize) -> io::Result<()> {
311 socket.0.set_recv_buffer_size(bytes)
312 }
313
314 #[inline]
316 pub fn send_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
317 socket.0.send_buffer_size()
318 }
319
320 #[inline]
322 pub fn recv_buffer_size(&self, socket: UdpSockRef<'_>) -> io::Result<usize> {
323 socket.0.recv_buffer_size()
324 }
325
326 #[inline]
330 pub fn may_fragment(&self) -> bool {
331 self.may_fragment
332 }
333
334 fn sendmsg_einval(&self) -> bool {
336 self.sendmsg_einval.load(Ordering::Relaxed)
337 }
338
339 #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
341 fn set_sendmsg_einval(&self) {
342 self.sendmsg_einval.store(true, Ordering::Relaxed)
343 }
344
345 #[cfg(apple_fast)]
355 pub unsafe fn set_apple_fast_path(&self) {
356 self.apple_fast_path.store(true, Ordering::Relaxed);
357 self.max_gso_segments.store(BATCH_SIZE, Ordering::Relaxed);
358 }
359
360 #[cfg(apple_fast)]
362 pub fn is_apple_fast_path_enabled(&self) -> bool {
363 self.apple_fast_path.load(Ordering::Relaxed)
364 }
365
366 #[cfg(apple_fast)]
368 fn disable_apple_fast_path(&self) {
369 self.apple_fast_path.store(false, Ordering::Relaxed);
370 self.max_gso_segments.store(1, Ordering::Relaxed);
371 }
372
373 #[cfg(apple_fast)]
376 fn resolve_apple_fast_fn<T>(&self, resolver: fn() -> Option<T>) -> Option<T> {
377 let f = resolver();
378 if f.is_none() {
379 self.disable_apple_fast_path();
380 }
381 f
382 }
383}
384
385#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
386fn send(
387 #[allow(unused_variables)] state: &UdpSocketState,
389 io: SockRef<'_>,
390 transmit: &Transmit<'_>,
391) -> io::Result<()> {
392 #[allow(unused_mut)] let mut encode_src_ip = true;
394 #[cfg(target_os = "freebsd")]
395 {
396 let addr = io.local_addr()?;
397 let is_ipv4 = addr.family() == libc::AF_INET as libc::sa_family_t;
398 if is_ipv4 {
399 if let Some(socket) = addr.as_socket_ipv4() {
400 encode_src_ip = socket.ip() == &Ipv4Addr::UNSPECIFIED;
401 }
402 }
403 }
404 let mut msg_hdr: libc::msghdr = unsafe { mem::zeroed() };
405 let mut iovec: libc::iovec = unsafe { mem::zeroed() };
406 let mut cmsgs = cmsg::Aligned([0u8; CMSG_LEN]);
407 let dst_addr = socket2::SockAddr::from(transmit.destination);
408 prepare_msg(
409 transmit,
410 &dst_addr,
411 &mut msg_hdr,
412 &mut iovec,
413 &mut cmsgs,
414 encode_src_ip,
415 state.sendmsg_einval(),
416 );
417
418 loop {
419 let n = unsafe { libc::sendmsg(io.as_raw_fd(), &msg_hdr, 0) };
420
421 if n >= 0 {
422 return Ok(());
423 }
424
425 let e = io::Error::last_os_error();
426 match e.kind() {
427 io::ErrorKind::Interrupted => continue,
429 io::ErrorKind::WouldBlock => return Err(e),
430 _ => {
431 #[cfg(any(target_os = "linux", target_os = "android"))]
435 if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error() {
436 if state.max_gso_segments().get() > 1 {
439 crate::log::info!(
440 "`libc::sendmsg` failed with {e}; halting segmentation offload"
441 );
442 state
443 .max_gso_segments
444 .store(1, std::sync::atomic::Ordering::Relaxed);
445 }
446 }
447
448 if e.raw_os_error() == Some(libc::EINVAL) && !state.sendmsg_einval() {
451 state.set_sendmsg_einval();
452 prepare_msg(
453 transmit,
454 &dst_addr,
455 &mut msg_hdr,
456 &mut iovec,
457 &mut cmsgs,
458 encode_src_ip,
459 state.sendmsg_einval(),
460 );
461 continue;
462 }
463
464 return Err(e);
465 }
466 }
467 }
468}
469
470#[cfg(apple_fast)]
471fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
472 if state.is_apple_fast_path_enabled() {
473 send_via_sendmsg_x(state, io, transmit)
474 } else {
475 send_single(state, io, transmit)
476 }
477}
478
479#[cfg(apple_fast)]
481fn send_via_sendmsg_x(
482 state: &UdpSocketState,
483 io: SockRef<'_>,
484 transmit: &Transmit<'_>,
485) -> io::Result<()> {
486 let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
487 let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
488 let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
489 let addr = socket2::SockAddr::from(transmit.destination);
490 let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len());
491 let mut cnt = 0;
492 debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE);
493 for (i, chunk) in transmit
494 .contents
495 .chunks(segment_size)
496 .enumerate()
497 .take(BATCH_SIZE)
498 {
499 prepare_msg_x(
500 &Transmit {
501 destination: transmit.destination,
502 ecn: transmit.ecn,
503 contents: chunk,
504 segment_size: Some(chunk.len()),
505 src_ip: transmit.src_ip,
506 },
507 &addr,
508 &mut hdrs[i],
509 &mut iovs[i],
510 &mut ctrls[i],
511 true,
512 state.sendmsg_einval(),
513 );
514 hdrs[i].msg_datalen = chunk.len();
515 cnt += 1;
516 }
517 let Some(sendmsg_x) = state.resolve_apple_fast_fn(sendmsg_x_fn) else {
518 return send_single(state, io, transmit);
519 };
520 retry_if_interrupted(|| unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) })?;
521 Ok(())
522}
523
524#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))]
525fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
526 send_single(state, io, transmit)
527}
528
529#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple))]
530#[cfg_attr(apple_fast, allow(dead_code))] fn send_single(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
532 let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
533 let mut iov: libc::iovec = unsafe { mem::zeroed() };
534 let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
535 let addr = socket2::SockAddr::from(transmit.destination);
536 prepare_msg(
537 transmit,
538 &addr,
539 &mut hdr,
540 &mut iov,
541 &mut ctrl,
542 cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"),
543 state.sendmsg_einval(),
544 );
545 retry_if_interrupted(|| unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) })?;
546 Ok(())
547}
548
549#[cfg(not(any(
551 apple,
552 target_os = "openbsd",
553 target_os = "netbsd",
554 target_os = "dragonfly",
555 solarish
556)))]
557fn recv_via_recvmmsg(
558 io: SockRef<'_>,
559 bufs: &mut [IoSliceMut<'_>],
560 meta: &mut [RecvMeta],
561) -> io::Result<usize> {
562 let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
563 let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
564 let mut hdrs = unsafe { mem::zeroed::<[libc::mmsghdr; BATCH_SIZE]>() };
565 let max_msg_count = bufs.len().min(BATCH_SIZE);
566 for i in 0..max_msg_count {
567 prepare_recv(
568 &mut bufs[i],
569 &mut names[i],
570 &mut ctrls[i],
571 &mut hdrs[i].msg_hdr,
572 );
573 }
574 let msg_count = retry_if_interrupted(|| unsafe {
575 libc::recvmmsg(
576 io.as_raw_fd(),
577 hdrs.as_mut_ptr(),
578 bufs.len().min(BATCH_SIZE) as _,
579 0,
580 ptr::null_mut::<libc::timespec>(),
581 ) as isize
582 })?;
583 for i in 0..(msg_count as usize) {
584 meta[i] = decode_recv(&names[i], &hdrs[i].msg_hdr, hdrs[i].msg_len as usize)?;
585 }
586 Ok(msg_count as usize)
587}
588
589#[cfg(apple_fast)]
591fn recv_via_recvmsg_x(
592 state: &UdpSocketState,
593 io: SockRef<'_>,
594 bufs: &mut [IoSliceMut<'_>],
595 meta: &mut [RecvMeta],
596) -> io::Result<usize> {
597 let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
598 let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
605 let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
606 let max_msg_count = bufs.len().min(BATCH_SIZE);
607 for i in 0..max_msg_count {
608 prepare_recv_x(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
609 }
610 let Some(recvmsg_x) = state.resolve_apple_fast_fn(recvmsg_x_fn) else {
611 return recv_single(io, bufs, meta);
612 };
613 let msg_count = retry_if_interrupted(|| unsafe {
614 recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0)
615 })?;
616 for i in 0..(msg_count as usize) {
617 meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize)?;
618 }
619 Ok(msg_count as usize)
620}
621
622#[cfg(apple_fast)]
626fn sendmsg_x_fn() -> Option<SendmsgXFn> {
627 static ADDR: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
628 resolve_symbol(&ADDR, c"sendmsg_x")
631 .map(|addr| unsafe { std::mem::transmute::<usize, SendmsgXFn>(addr) })
632}
633
634#[cfg(apple_fast)]
638fn recvmsg_x_fn() -> Option<RecvmsgXFn> {
639 static ADDR: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
640 resolve_symbol(&ADDR, c"recvmsg_x")
643 .map(|addr| unsafe { std::mem::transmute::<usize, RecvmsgXFn>(addr) })
644}
645
646#[cfg(apple_fast)]
647type SendmsgXFn =
648 unsafe extern "C" fn(libc::c_int, *const msghdr_x, libc::c_uint, libc::c_int) -> isize;
649#[cfg(apple_fast)]
650type RecvmsgXFn =
651 unsafe extern "C" fn(libc::c_int, *mut msghdr_x, libc::c_uint, libc::c_int) -> isize;
652
653#[cfg(apple_fast)]
657fn resolve_symbol(lock: &std::sync::OnceLock<usize>, name: &std::ffi::CStr) -> Option<usize> {
658 let addr =
659 *lock.get_or_init(|| unsafe { libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr()) as usize });
660 (addr != 0).then_some(addr)
661}
662
663#[cfg(any(
664 target_os = "openbsd",
665 target_os = "netbsd",
666 target_os = "dragonfly",
667 solarish,
668 apple
669))]
670#[cfg_attr(apple_fast, allow(dead_code))] fn recv_single(
672 io: SockRef<'_>,
673 bufs: &mut [IoSliceMut<'_>],
674 meta: &mut [RecvMeta],
675) -> io::Result<usize> {
676 let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
677 let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
678 let mut hdr = unsafe { mem::zeroed::<libc::msghdr>() };
679 prepare_recv(&mut bufs[0], &mut name, &mut ctrl, &mut hdr);
680 let n = loop {
681 let n = unsafe { libc::recvmsg(io.as_raw_fd(), &mut hdr, 0) };
682
683 if hdr.msg_flags & libc::MSG_TRUNC != 0 {
684 continue;
685 }
686
687 if n >= 0 {
688 break n;
689 }
690
691 let e = io::Error::last_os_error();
692 match e.kind() {
693 io::ErrorKind::Interrupted => continue,
695 _ => return Err(e),
696 }
697 };
698 meta[0] = decode_recv(&name, &hdr, n as usize)?;
699 Ok(1)
700}
701
702const CMSG_LEN: usize = 88;
703
704#[cfg_attr(apple_fast, allow(dead_code))] fn prepare_msg(
706 transmit: &Transmit<'_>,
707 dst_addr: &socket2::SockAddr,
708 hdr: &mut libc::msghdr,
709 iov: &mut libc::iovec,
710 ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
711 #[allow(unused_variables)] encode_src_ip: bool,
713 sendmsg_einval: bool,
714) {
715 iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
716 iov.iov_len = transmit.contents.len();
717
718 let name = dst_addr.as_ptr() as *mut libc::c_void;
724 let namelen = dst_addr.len();
725 hdr.msg_name = name as *mut _;
726 hdr.msg_namelen = namelen;
727 hdr.msg_iov = iov;
728 hdr.msg_iovlen = 1;
729
730 hdr.msg_control = ctrl.0.as_mut_ptr() as _;
731 hdr.msg_controllen = CMSG_LEN as _;
732 let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
733 let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
734 let is_ipv4 = transmit.destination.is_ipv4()
736 || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
737 if is_ipv4 {
738 if !sendmsg_einval {
739 #[cfg(not(target_os = "netbsd"))]
740 {
741 encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
742 }
743 }
744 } else {
745 encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
746 }
747
748 #[cfg(not(apple_fast))]
752 if let Some(segment_size) = transmit.effective_segment_size() {
753 gso::set_segment_size(&mut encoder, segment_size as u16);
754 }
755
756 if let Some(ip) = &transmit.src_ip {
757 match ip {
758 IpAddr::V4(v4) => {
759 #[cfg(any(target_os = "linux", target_os = "android"))]
760 {
761 let pktinfo = libc::in_pktinfo {
762 ipi_ifindex: 0,
763 ipi_spec_dst: libc::in_addr {
764 s_addr: u32::from_ne_bytes(v4.octets()),
765 },
766 ipi_addr: libc::in_addr { s_addr: 0 },
767 };
768 encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo);
769 }
770 #[cfg(any(bsd, apple, solarish))]
771 {
772 if encode_src_ip {
773 let addr = libc::in_addr {
774 s_addr: u32::from_ne_bytes(v4.octets()),
775 };
776 encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
777 }
778 }
779 }
780 IpAddr::V6(v6) => {
781 let pktinfo = libc::in6_pktinfo {
782 ipi6_ifindex: 0,
783 ipi6_addr: libc::in6_addr {
784 s6_addr: v6.octets(),
785 },
786 };
787 encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
788 }
789 }
790 }
791
792 encoder.finish();
793}
794
795#[cfg(apple_fast)]
797fn prepare_msg_x(
798 transmit: &Transmit<'_>,
799 dst_addr: &socket2::SockAddr,
800 hdr: &mut msghdr_x,
801 iov: &mut libc::iovec,
802 ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
803 #[allow(unused_variables)] encode_src_ip: bool,
804 sendmsg_einval: bool,
805) {
806 iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
807 iov.iov_len = transmit.contents.len();
808
809 let name = dst_addr.as_ptr() as *mut libc::c_void;
810 let namelen = dst_addr.len();
811 hdr.msg_name = name as *mut _;
812 hdr.msg_namelen = namelen;
813 hdr.msg_iov = iov;
814 hdr.msg_iovlen = 1;
815
816 hdr.msg_control = ctrl.0.as_mut_ptr() as _;
817 hdr.msg_controllen = CMSG_LEN as _;
818 let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
819 let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
820 let is_ipv4 = transmit.destination.is_ipv4()
821 || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
822 if is_ipv4 {
823 if !sendmsg_einval {
824 encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
825 }
826 } else {
827 encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
828 }
829
830 if let Some(ip) = &transmit.src_ip {
831 match ip {
832 IpAddr::V4(v4) => {
833 if encode_src_ip {
834 let addr = libc::in_addr {
835 s_addr: u32::from_ne_bytes(v4.octets()),
836 };
837 encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
838 }
839 }
840 IpAddr::V6(v6) => {
841 let pktinfo = libc::in6_pktinfo {
842 ipi6_ifindex: 0,
843 ipi6_addr: libc::in6_addr {
844 s6_addr: v6.octets(),
845 },
846 };
847 encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
848 }
849 }
850 }
851
852 encoder.finish();
853}
854
855#[cfg_attr(apple_fast, allow(dead_code))] fn prepare_recv(
857 buf: &mut IoSliceMut<'_>,
858 name: &mut MaybeUninit<libc::sockaddr_storage>,
859 ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
860 hdr: &mut libc::msghdr,
861) {
862 hdr.msg_name = name.as_mut_ptr() as _;
863 hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
864 hdr.msg_iov = buf as *mut IoSliceMut<'_> as *mut libc::iovec;
865 hdr.msg_iovlen = 1;
866 hdr.msg_control = ctrl.0.as_mut_ptr() as _;
867 hdr.msg_controllen = CMSG_LEN as _;
868 hdr.msg_flags = 0;
869}
870
871#[cfg(apple_fast)]
873fn prepare_recv_x(
874 buf: &mut IoSliceMut<'_>,
875 name: &mut MaybeUninit<libc::sockaddr_storage>,
876 ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
877 hdr: &mut msghdr_x,
878) {
879 hdr.msg_name = name.as_mut_ptr() as _;
880 hdr.msg_namelen = mem::size_of::<libc::sockaddr_storage>() as _;
881 hdr.msg_iov = buf as *mut IoSliceMut<'_> as *mut libc::iovec;
882 hdr.msg_iovlen = 1;
883 hdr.msg_control = ctrl.0.as_mut_ptr() as _;
884 hdr.msg_controllen = CMSG_LEN as _;
885 hdr.msg_flags = 0;
886 hdr.msg_datalen = buf.len();
887}
888
889fn decode_recv<M: cmsg::MsgHdr<ControlMessage = libc::cmsghdr>>(
890 name: &MaybeUninit<libc::sockaddr_storage>,
891 hdr: &M,
892 len: usize,
893) -> io::Result<RecvMeta> {
894 let name = unsafe { name.assume_init() };
895 let mut ctrl = ControlMetadata {
896 ecn_bits: 0,
897 dst_ip: None,
898 interface_index: None,
899 stride: len,
900 };
901
902 let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
903 for cmsg in cmsg_iter {
904 ctrl.decode(cmsg);
905 }
906
907 Ok(RecvMeta {
908 len,
909 stride: ctrl.stride,
910 addr: decode_socket_addr(&name)?,
911 ecn: EcnCodepoint::from_bits(ctrl.ecn_bits),
912 dst_ip: ctrl.dst_ip,
913 interface_index: ctrl.interface_index,
914 })
915}
916
917struct ControlMetadata {
919 ecn_bits: u8,
920 dst_ip: Option<IpAddr>,
921 interface_index: Option<u32>,
922 stride: usize,
923}
924
925impl ControlMetadata {
926 fn decode(&mut self, cmsg: &libc::cmsghdr) {
928 match (cmsg.cmsg_level, cmsg.cmsg_type) {
929 (libc::IPPROTO_IP, libc::IP_TOS) => unsafe {
930 self.ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
931 },
932 #[cfg(not(any(
934 target_os = "openbsd",
935 target_os = "netbsd",
936 target_os = "dragonfly",
937 solarish
938 )))]
939 (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe {
940 self.ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
941 },
942 (libc::IPPROTO_IPV6, libc::IPV6_TCLASS) => unsafe {
943 #[allow(clippy::unnecessary_cast)] if cfg!(apple)
947 && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::<u8>() as _) as usize
948 {
949 self.ecn_bits = cmsg::decode::<u8, libc::cmsghdr>(cmsg);
950 } else {
951 self.ecn_bits = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as u8;
952 }
953 },
954 #[cfg(any(target_os = "linux", target_os = "android"))]
955 (libc::IPPROTO_IP, libc::IP_PKTINFO) => {
956 let pktinfo = unsafe { cmsg::decode::<libc::in_pktinfo, libc::cmsghdr>(cmsg) };
957 self.dst_ip = Some(IpAddr::V4(Ipv4Addr::from(
958 pktinfo.ipi_addr.s_addr.to_ne_bytes(),
959 )));
960 self.interface_index = Some(pktinfo.ipi_ifindex as u32);
961 }
962 #[cfg(any(bsd, apple))]
963 (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => {
964 let in_addr = unsafe { cmsg::decode::<libc::in_addr, libc::cmsghdr>(cmsg) };
965 self.dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes())));
966 }
967 (libc::IPPROTO_IPV6, libc::IPV6_PKTINFO) => {
968 let pktinfo = unsafe { cmsg::decode::<libc::in6_pktinfo, libc::cmsghdr>(cmsg) };
969 self.dst_ip = Some(IpAddr::V6(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr)));
970 #[cfg_attr(not(target_os = "android"), expect(clippy::unnecessary_cast))]
971 {
972 self.interface_index = Some(pktinfo.ipi6_ifindex as u32);
973 }
974 }
975 #[cfg(any(target_os = "linux", target_os = "android"))]
976 (libc::SOL_UDP, libc::UDP_GRO) => unsafe {
977 self.stride = cmsg::decode::<libc::c_int, libc::cmsghdr>(cmsg) as usize;
978 },
979 _ => {}
980 }
981 }
982}
983
984fn decode_socket_addr(name: &libc::sockaddr_storage) -> io::Result<SocketAddr> {
986 match libc::c_int::from(name.ss_family) {
987 libc::AF_INET => {
988 let addr: &libc::sockaddr_in =
990 unsafe { &*(name as *const _ as *const libc::sockaddr_in) };
991 Ok(SocketAddr::V4(SocketAddrV4::new(
992 Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
993 u16::from_be(addr.sin_port),
994 )))
995 }
996 libc::AF_INET6 => {
997 let addr: &libc::sockaddr_in6 =
999 unsafe { &*(name as *const _ as *const libc::sockaddr_in6) };
1000 Ok(SocketAddr::V6(SocketAddrV6::new(
1001 Ipv6Addr::from(addr.sin6_addr.s6_addr),
1002 u16::from_be(addr.sin6_port),
1003 addr.sin6_flowinfo,
1004 addr.sin6_scope_id,
1005 )))
1006 }
1007 f => Err(io::Error::other(format!(
1008 "expected AF_INET or AF_INET6, got {f}"
1009 ))),
1010 }
1011}
1012
1013#[cfg(not(apple_slow))]
1014pub(crate) const BATCH_SIZE: usize = 32;
1016
1017#[cfg(apple_slow)]
1018pub(crate) const BATCH_SIZE: usize = 1;
1019
1020#[cfg(any(target_os = "linux", target_os = "android"))]
1021mod gso {
1022 use super::*;
1023 use std::{ffi::CStr, mem, str::FromStr, sync::OnceLock};
1024
1025 const SUPPORTED_SINCE: KernelVersion = KernelVersion {
1028 version: 4,
1029 major_revision: 18,
1030 };
1031
1032 pub(crate) fn max_gso_segments(socket: &impl AsRawFd) -> usize {
1035 const GSO_SIZE: libc::c_int = 1500;
1036
1037 if !SUPPORTED_BY_CURRENT_KERNEL.get_or_init(supported_by_current_kernel) {
1038 return 1;
1039 }
1040
1041 match set_socket_option(socket, libc::SOL_UDP, libc::UDP_SEGMENT, GSO_SIZE) {
1044 Ok(()) => {
1045 let _ = set_socket_option(socket, libc::SOL_UDP, libc::UDP_SEGMENT, 0);
1050
1051 64
1052 }
1053 Err(_e) => {
1054 crate::log::debug!(
1055 "failed to set `UDP_SEGMENT` socket option ({_e}); setting `max_gso_segments = 1`"
1056 );
1057
1058 1
1059 }
1060 }
1061 }
1062
1063 pub(crate) fn set_segment_size(
1064 encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
1065 segment_size: u16,
1066 ) {
1067 encoder.push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size);
1068 }
1069
1070 static SUPPORTED_BY_CURRENT_KERNEL: OnceLock<bool> = OnceLock::new();
1072
1073 fn supported_by_current_kernel() -> bool {
1074 let kernel_version_string = match kernel_version_string() {
1075 Ok(kernel_version_string) => kernel_version_string,
1076 Err(_e) => {
1077 crate::log::warn!("GSO disabled: uname returned {_e}");
1078 return false;
1079 }
1080 };
1081
1082 let Some(kernel_version) = KernelVersion::from_str(&kernel_version_string) else {
1083 crate::log::warn!(
1084 "GSO disabled: failed to parse kernel version ({kernel_version_string})"
1085 );
1086 return false;
1087 };
1088
1089 if kernel_version < SUPPORTED_SINCE {
1090 crate::log::info!("GSO disabled: kernel too old ({kernel_version_string}); need 4.18+",);
1091 return false;
1092 }
1093
1094 true
1095 }
1096
1097 fn kernel_version_string() -> io::Result<String> {
1098 let mut n = unsafe { mem::zeroed() };
1099 let r = unsafe { libc::uname(&mut n) };
1100 if r != 0 {
1101 return Err(io::Error::last_os_error());
1102 }
1103 Ok(unsafe {
1104 CStr::from_ptr(n.release[..].as_ptr())
1105 .to_string_lossy()
1106 .into_owned()
1107 })
1108 }
1109
1110 #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
1112 struct KernelVersion {
1113 version: u8,
1114 major_revision: u8,
1115 }
1116
1117 impl KernelVersion {
1118 fn from_str(release: &str) -> Option<Self> {
1119 let mut split = release
1120 .split_once('-')
1121 .map(|pair| pair.0)
1122 .unwrap_or(release)
1123 .split('.');
1124
1125 let version = u8::from_str(split.next()?).ok()?;
1126 let major_revision = u8::from_str(split.next()?).ok()?;
1127
1128 Some(Self {
1129 version,
1130 major_revision,
1131 })
1132 }
1133 }
1134
1135 #[cfg(test)]
1136 mod test {
1137 use super::*;
1138
1139 #[test]
1140 fn parse_current_kernel_version_release_string() {
1141 let release = kernel_version_string().unwrap();
1142 KernelVersion::from_str(&release).unwrap();
1143 }
1144
1145 #[test]
1146 fn parse_kernel_version_release_string() {
1147 assert_eq!(
1149 KernelVersion::from_str("4.14"),
1150 Some(KernelVersion {
1151 version: 4,
1152 major_revision: 14
1153 })
1154 );
1155 assert_eq!(
1156 KernelVersion::from_str("4.18"),
1157 Some(KernelVersion {
1158 version: 4,
1159 major_revision: 18
1160 })
1161 );
1162 assert_eq!(
1164 KernelVersion::from_str("4.14.186-27095505"),
1165 Some(KernelVersion {
1166 version: 4,
1167 major_revision: 14
1168 })
1169 );
1170 assert_eq!(
1171 KernelVersion::from_str("6.8.0-59-generic"),
1172 Some(KernelVersion {
1173 version: 6,
1174 major_revision: 8
1175 })
1176 );
1177 }
1178 }
1179}
1180
1181#[cfg(not(any(target_os = "linux", target_os = "android")))]
1187mod gso {
1188 use super::*;
1189
1190 pub(super) fn max_gso_segments(_socket: &impl AsRawFd) -> usize {
1191 1
1192 }
1193
1194 #[cfg_attr(apple_fast, allow(dead_code))] pub(super) fn set_segment_size(
1196 #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
1197 #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder<'_, msghdr_x>,
1198 _segment_size: u16,
1199 ) {
1200 }
1201}
1202
1203fn set_socket_option_supported(
1208 socket: &impl AsRawFd,
1209 level: libc::c_int,
1210 name: libc::c_int,
1211 value: libc::c_int,
1212) -> io::Result<bool> {
1213 match set_socket_option(socket, level, name, value) {
1214 Ok(()) => Ok(true),
1215 Err(err) if err.raw_os_error() == Some(libc::ENOPROTOOPT) => Ok(false),
1216 Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => Ok(false),
1217 Err(err) => Err(err),
1218 }
1219}
1220
1221fn set_socket_option(
1222 socket: &impl AsRawFd,
1223 level: libc::c_int,
1224 name: libc::c_int,
1225 value: libc::c_int,
1226) -> io::Result<()> {
1227 let rc = unsafe {
1228 libc::setsockopt(
1229 socket.as_raw_fd(),
1230 level,
1231 name,
1232 &value as *const _ as _,
1233 mem::size_of_val(&value) as _,
1234 )
1235 };
1236
1237 match rc == 0 {
1238 true => Ok(()),
1239 false => Err(io::Error::last_os_error()),
1240 }
1241}
1242
1243const OPTION_ON: libc::c_int = 1;
1244
1245fn retry_if_interrupted(mut f: impl FnMut() -> isize) -> io::Result<isize> {
1248 loop {
1249 let n = f();
1250 if n >= 0 {
1251 return Ok(n);
1252 }
1253 let e = io::Error::last_os_error();
1254 if e.kind() != io::ErrorKind::Interrupted {
1255 return Err(e);
1256 }
1257 }
1258}