Commit 2dfe05da authored by Katharina Fey's avatar Katharina Fey

qrpc-sdk: adding default timeout to socket abstraction

parent f9654550
//! An internal abstraction over the RPC socket
use async_std::{sync::Arc, task};
use async_std::{future, sync::Arc, task};
use byteorder::{BigEndian, ByteOrder};
use socket2::{Domain, SockAddr, Socket, Type};
use std::{
......@@ -8,12 +8,22 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
/// A qrpc connection wrapper
/// This type wraps a UNIX socket connection to a remote client. By
/// default it is configured in client-only mode, meaning that the
/// only time it listens for incoming messages is when waiting for a
/// reply from the rpc broker, libqaul, or another service. To
/// pro-actively reply to incoming requests (for example, if you want
/// that your service can be used by other services)
pub struct RpcSocket {
inner: Socket,
run: AtomicBool,
listening: AtomicBool,
timeout: Duration,
impl RpcSocket {
......@@ -23,8 +33,14 @@ impl RpcSocket {
/// `listen(...)`, otherwise it will only act as a sending socket,
/// where each reply is meant for one request.
pub fn new<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
let run = AtomicBool::from(true);
let listening = AtomicBool::from(false);
Self::with_duration(path, Duration::from_secs(5))
/// Create a new socket with an explicit timeout duration
/// Setup is the same as when calling `new`, except that you can
/// choose an explicit timeout, instead of the default.
pub fn with_duration<P: AsRef<Path>>(path: P, timeout: Duration) -> Result<Arc<Self>> {
let mut inner = Socket::new(
Type::seqpacket(), // this _may_ not be supported on MacOS
......@@ -34,12 +50,16 @@ impl RpcSocket {
Ok(Arc::new(Self {
run: AtomicBool::from(true),
listening: AtomicBool::from(false),
/// Start listening for connections with a future
/// Incoming messages need to be parsed by your service, and then
/// replied to.
pub fn listen<F>(self: &Arc<Self>, fut: F)
F: Future<Output = ()> + Send + 'static,
......@@ -60,4 +80,16 @@ impl RpcSocket {
pub fn listening(&self) -> bool {
pub async fn send_msg(self: &Arc<Self>) -> Option<()> {
/// Drive a future to completion with a timeout
async fn with_timeout<T, F>(&self, fut: F) -> Option<T>
F: Future<Output = T> + Send + 'static,
future::timeout(self.timeout.clone(), fut).await.ok()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment