Skip to content

Commit 53acda5

Browse files
committed
add udp support for windows
1 parent 525fc89 commit 53acda5

File tree

8 files changed

+28
-32
lines changed

8 files changed

+28
-32
lines changed

forwarder/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ anyhow = "1.0.71"
88
log = "0.4.20"
99
etherparse = "0.13.0"
1010
socket2 = { version = "0.5.5", features = ["all"] }
11-
mio = { version = "1.0.2", features = ["net", "os-ext", "os-poll"] }
11+
mio = { version = "1.0.2", features = ["net", "os-poll"] }
1212
parking_lot = "0.12.3"

forwarder/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ fn try_cleanup(peer_manager: &RwLock<PeerManager>) {
150150
for peer in peers.get_all() {
151151
let used = peer.reset_used();
152152
if !used {
153-
let client_addr = peer.get_client_addr();
153+
let client_addr = *peer.get_client_addr();
154154
log::info!("cleaning peer that handled '{client_addr}'");
155-
if let Err(error) = peers.remove_peer(&peer) {
155+
if let Err(error) = peers.remove_peer(peer) {
156156
log::warn!("couldn't remove peer of '{client_addr}': {error:?}");
157157
}
158158
} else {

forwarder/src/peer.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,13 @@ impl PeerManager {
7676
}
7777
}
7878

79-
pub fn add_peer(&mut self, new_peer: Peer) -> anyhow::Result<Arc<Peer>> {
79+
pub fn add_peer(&mut self, mut new_peer: Peer) -> anyhow::Result<Arc<Peer>> {
8080
let client_addr = new_peer.client_addr;
81+
self.registry.register(&mut new_peer.socket)?;
8182
let peer = Arc::new(new_peer);
8283
self.client_addr_to_peers.insert(client_addr, peer.clone());
8384
let peer_port = peer.socket.local_addr()?.port();
8485
self.port_to_peers.insert(peer_port, peer.clone());
85-
self.registry.register(&peer.socket)?;
8686
Ok(peer)
8787
}
8888

@@ -100,10 +100,13 @@ impl PeerManager {
100100
self.client_addr_to_peers.values().cloned().collect()
101101
}
102102

103-
pub fn remove_peer(&mut self, peer: &Peer) -> anyhow::Result<()> {
104-
self.registry.deregister(&peer.socket)?;
103+
pub fn remove_peer(&mut self, peer: Arc<Peer>) -> anyhow::Result<()> {
105104
self.client_addr_to_peers.remove(&peer.client_addr);
106105
self.port_to_peers.remove(&peer.socket.local_addr()?.port());
106+
107+
let mut peer =
108+
Arc::try_unwrap(peer).map_err(|_| anyhow::anyhow!("can't unwrap Arc<peer>"))?;
109+
self.registry.deregister(&mut peer.socket)?;
107110
Ok(())
108111
}
109112
}

forwarder/src/poll.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ pub trait Poll: Send {
2525
/// trait that allows others to register socket to `Poll`
2626
pub trait Registry: Send + Sync {
2727
// need Sync because parking_lot::RwLock needs inner to be Sync
28-
fn register(&self, socket: &NonBlockingSocket) -> anyhow::Result<()>;
29-
fn deregister(&self, socket: &NonBlockingSocket) -> anyhow::Result<()>;
28+
fn register(&self, socket: &mut NonBlockingSocket) -> anyhow::Result<()>;
29+
fn deregister(&self, socket: &mut NonBlockingSocket) -> anyhow::Result<()>;
3030
}
3131

3232
mod icmp;

forwarder/src/poll/icmp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ impl Poll for IcmpPoll {
5151
pub struct IcmpRegistry;
5252
// icmp doesn't need a registry because we manage it's poll ourself
5353
impl Registry for IcmpRegistry {
54-
fn register(&self, _socket: &NonBlockingSocket) -> anyhow::Result<()> {
54+
fn register(&self, _socket: &mut NonBlockingSocket) -> anyhow::Result<()> {
5555
Ok(())
5656
}
57-
fn deregister(&self, _socket: &NonBlockingSocket) -> anyhow::Result<()> {
57+
fn deregister(&self, _socket: &mut NonBlockingSocket) -> anyhow::Result<()> {
5858
Ok(())
5959
}
6060
}

forwarder/src/poll/udp.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
socket::{NonBlockingSocket, NonBlockingSocketTrait},
55
MAX_PACKET_SIZE,
66
};
7-
use mio::{unix::SourceFd, Events, Interest, Token};
7+
use mio::{Events, Interest, Token};
88
use parking_lot::RwLock;
99
use std::sync::Arc;
1010

@@ -50,22 +50,20 @@ impl Poll for UdpPoll {
5050
#[derive(Debug)]
5151
pub struct UdpRegistry(pub mio::Registry);
5252
impl Registry for UdpRegistry {
53-
fn register(&self, socket: &NonBlockingSocket) -> anyhow::Result<()> {
54-
let socket = socket.as_udp().unwrap();
53+
fn register(&self, socket: &mut NonBlockingSocket) -> anyhow::Result<()> {
54+
let socket = socket.as_mut_udp().unwrap();
5555
let local_port = socket.local_addr()?.port();
5656
self.0.register(
57-
&mut SourceFd(&socket.as_raw_fd()),
57+
socket.as_inner(),
5858
Token(local_port.into()),
5959
Interest::READABLE,
6060
)?;
6161
Ok(())
6262
}
6363

64-
fn deregister(&self, socket: &NonBlockingSocket) -> anyhow::Result<()> {
65-
let socket = socket.as_udp().unwrap();
66-
let raw_fd = socket.as_raw_fd();
67-
let source = &mut SourceFd(&raw_fd);
68-
self.0.deregister(source)?;
64+
fn deregister(&self, socket: &mut NonBlockingSocket) -> anyhow::Result<()> {
65+
let socket = socket.as_mut_udp().unwrap();
66+
self.0.deregister(socket.as_inner())?;
6967
Ok(())
7068
}
7169
}

forwarder/src/socket.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl NonBlockingSocket {
6868
Ok(socket)
6969
}
7070

71-
pub fn as_udp(&self) -> Option<&udp::NonBlockingUdpSocket> {
71+
pub fn as_mut_udp(&mut self) -> Option<&mut udp::NonBlockingUdpSocket> {
7272
match self {
7373
Self::Udp(inner) => Some(inner),
7474
_ => None,

forwarder/src/socket/udp.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
use super::{NonBlockingSocketTrait, SocketTrait};
2-
use std::{
3-
io,
4-
net::SocketAddr,
5-
os::fd::{AsRawFd, RawFd},
6-
};
2+
use std::{io, net::SocketAddr};
73

84
#[derive(Debug)]
95
pub struct UdpSocket(std::net::UdpSocket);
@@ -30,17 +26,16 @@ impl SocketTrait for UdpSocket {
3026
}
3127

3228
#[derive(Debug)]
33-
pub struct NonBlockingUdpSocket(std::net::UdpSocket);
29+
pub struct NonBlockingUdpSocket(mio::net::UdpSocket);
3430

3531
impl NonBlockingUdpSocket {
3632
pub fn bind(address: &SocketAddr) -> io::Result<Self> {
37-
let socket = std::net::UdpSocket::bind(address)?;
38-
socket.set_nonblocking(true)?;
33+
let socket = mio::net::UdpSocket::bind(*address)?;
3934
Ok(Self(socket))
4035
}
4136

42-
pub fn as_raw_fd(&self) -> RawFd {
43-
self.0.as_raw_fd()
37+
pub fn as_inner(&mut self) -> &mut mio::net::UdpSocket {
38+
&mut self.0
4439
}
4540
}
4641

@@ -50,7 +45,7 @@ impl NonBlockingSocketTrait for NonBlockingUdpSocket {
5045
}
5146

5247
fn connect(&mut self, addr: &SocketAddr) -> io::Result<()> {
53-
self.0.connect(addr)
48+
self.0.connect(*addr)
5449
}
5550

5651
fn recv(&self, buffer: &mut [u8]) -> io::Result<usize> {

0 commit comments

Comments
 (0)