Commit a4f90e09 authored by Kaiden Fey's avatar Kaiden Fey Committed by Katharina Fey

netmod-tcp: ignoring ACKs on limited connections

parent 5c177111
......@@ -130,7 +130,7 @@ impl Peer {
});
// Start sender loop and send a hello
Arc::clone(&p).run_io_sender(port);
Arc::clone(&p).run_io_sender(port, _type);
task::block_on(async { Arc::clone(&p).send(Packet::Hello { port, _type }).await });
return p;
......@@ -198,8 +198,11 @@ impl Peer {
match p {
Packet::Hello { .. } => {
trace!("Sending HELLO to {}", addr);
let _self = Arc::clone(self);
task::spawn(_self.wait_for_ack());
if self._type == LinkType::Bidirect {
let _self = Arc::clone(self);
task::spawn(_self.wait_for_ack());
}
}
_ => {}
}
......@@ -210,6 +213,7 @@ impl Peer {
}
}
/// Run a listener to wait for an ACK to be returned to this connection
async fn wait_for_ack(self: Arc<Self>) {
let t = timeout(Duration::from_secs(10), async {
loop {
......@@ -257,7 +261,7 @@ impl Peer {
/// This function will try sending a packet, initialising the
/// output stream if it doesn't yet exist
async fn send_or_introduce(self: &Arc<Self>, p: Packet, port: u16) {
async fn send_or_introduce(self: &Arc<Self>, p: Packet, port: u16, _type: LinkType) {
loop {
if { self.sender.get_ref().read().await.is_some() } {
// Send the packet and re-run the loop if we failed to send
......@@ -266,8 +270,10 @@ impl Peer {
None => continue, // send_packet sets sender = None if failed
}
} else {
trace!("Sender is None, opening a connection first...");
Arc::clone(&self).introduce_blocking(port).await;
if _type == LinkType::Bidirect {
trace!("Sender is None, opening a connection first...");
Arc::clone(&self).introduce_blocking(port).await;
}
}
}
}
......@@ -280,12 +286,12 @@ impl Peer {
///
/// There's currently no way to get diagnostics from failed sends
/// back to ratman. **FIXME**: implement this!
pub(crate) fn run_io_sender(self: Arc<Self>, port: u16) {
pub(crate) fn run_io_sender(self: Arc<Self>, port: u16, _type: LinkType) {
trace!("Running IO sender");
task::spawn(async move {
while let Some(p) = self.io.rx.recv().await {
trace!("Queued packet {:?}", p);
self.send_or_introduce(p, port).await;
self.send_or_introduce(p, port, _type).await;
if !self.alive() {
break;
......@@ -296,8 +302,6 @@ impl Peer {
});
}
fn register_revhello(self: &Arc<Self>) {}
/// Loop on a connection until it could be established!
async fn introduce_blocking(self: Arc<Self>, port: u16) {
let id = self.id.clone();
......
......@@ -156,6 +156,7 @@ impl Server {
self.handle_hello(peer.id, state, &src_addr, port, _type, Arc::clone(&stream))
.await
}
(state, Ack) => trace!("Received ACK packet on wrong i/o stream. woops"),
(state, packet) => panic!(format!("state={:?}, packet={:?}", state, packet)),
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment