Commit 2ea3fda5 authored by Kaiden Fey's avatar Kaiden Fey

netmod-tcp: fixing: updating peer between packet reads or updates

parent 27f4043e
......@@ -61,6 +61,7 @@ pub(crate) type DstAddr = SocketAddr;
type LockedStream = Arc<RwLock<Option<TcpStream>>>;
/// Encode the different states a `Peer` can be in
#[derive(Debug)]
pub(crate) enum PeerState {
/// Only a receiving channel exists
///
......@@ -233,11 +234,12 @@ impl Peer {
/// it's impossible for the other side to associate an incoming
/// stream to a destination stream.
pub(crate) fn introduce(self: Arc<Self>, port: u16) {
let _self = Arc::clone(&self);
// let _self = Arc::clone(&self);
// if !_self.get_intro() {
// _self.introduce_blocking(port).await
// }
task::spawn(async move {
if !_self.get_intro() {
_self.introduce_blocking(port).await
}
self.send(Packet::Hello { port }).await
});
}
......@@ -281,7 +283,7 @@ impl Peer {
);
// FIXME: Make this configurable
task::sleep(Duration::from_secs(20)).await;
task::sleep(Duration::from_secs(5)).await;
ctr += 1;
continue;
}
......
......@@ -103,18 +103,19 @@ impl Server {
}
};
// Find the correct peer or create a temporary one. If we
// create a temporary one, we will need to upgrade it before
// being able to accept valid connections
let pid = self
.routes
.find_via_src(&src_addr)
.await
.unwrap_or_else(|| task::block_on(async { self.routes.add_via_src(&src_addr).await }));
let peer = self.routes.get_peer(pid).await.unwrap();
// Loop forever
loop {
// Find the correct peer or create a temporary one. If we
// create a temporary one, we will need to upgrade it
// before being able to accept valid connections. We
// update the peer on every iteration of the loop because
// a previous packet might have upgraded the connection.
let pid = self
.routes
.find_via_src(&src_addr)
.await
.unwrap_or_else(|| task::block_on(async { self.routes.add_via_src(&src_addr).await }));
let peer = self.routes.get_peer(pid).await.unwrap();
let mut fb = PacketBuilder::new(&mut stream);
if let Err(_) = fb.parse().await {
error!("Failed to read from incoming packet stream; dropping connection!");
......@@ -139,8 +140,8 @@ impl Server {
(Duplex, Frame(f)) | (RxOnly, Frame(f)) => self.handle_frame(peer.id, f).await,
(_, Hello { port }) => self.handle_hello(&src_addr, port).await,
(RxOnly, KeepAlive) => self.rx_keepalive(),
(Duplex, KeepAlive) => self.dup_keepalive(Arc::clone(&peer)),
_ => todo!(),
(Duplex, KeepAlive) | (TxOnly, KeepAlive) => self.dup_keepalive(Arc::clone(&peer)),
(state, packet) => panic!(format!("state={:?}, packte={:?}", state, packet)),
}
}
......@@ -205,7 +206,7 @@ impl Server {
/// Wait n seconds and then reply to a keep-alive
async fn send_keepalive(peer: Arc<Peer>) {
task::sleep(Duration::from_secs(10)).await;
task::sleep(Duration::from_secs(2)).await;
peer.send(Packet::KeepAlive).await;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment