Commit f9654550 authored by Katharina Fey's avatar Katharina Fey

qrpc-sdk: moving socket code from broker to SDK

parent 6eccc17d
......@@ -1673,9 +1673,14 @@ dependencies = [
name = "qrpc-sdk"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"byteorder",
"capnp",
"capnpc",
"ratman-identity",
"socket2",
"tracing",
]
[[package]]
......
......@@ -7,8 +7,13 @@ edition = "2018"
[dependencies]
identity = { path = "../../ratman/identity", version = "0.4", package = "ratman-identity" }
capnp = "0.13"
async-std = "=1.5"
async-trait = "*"
byteorder = "1.0"
capnp = "0.13"
socket2 = { version = "0.3", features = ["unix"] }
tracing = "0.1"
[build-dependencies]
capnpc = "0.13"
\ No newline at end of file
//! Error handling
mod service;
pub use service::{Result as ServiceResult, ServiceError};
//! Service related error handling
pub type Result<T> = std::result::Result<T, ServiceError>;
/// A set of errors that occur when connecting to services
#[derive(Debug)]
pub enum ServiceError {
/// No such service was found by the broker
NoSuchService,
/// The service didn't reply within the timeout time
///
/// This may indicate that the requested service has crashed, is
/// dealing with backpressure, or the broker is quietly dropping
/// requests.
ServiceBusy,
/// Tried connecting to a service that's already connected
AlreadyConnected,
/// Failed to perform action that requires a connection
NotConnected,
/// Any other failure with it's error message string
Other(String),
}
......@@ -11,11 +11,8 @@
//! service needs to register itself and it's capabilities. This
//! mechanism is handled by this sdk.
pub mod io;
use identity::Identity;
// FIXME: currently the protocols have to be in the root of the crate
// because of [this issue][i] in the capnproto codegen units:
// [i]: https://github.com/capnproto/capnproto-rust/issues/194
......@@ -43,31 +40,8 @@ pub mod rpc {
pub use crate::carrier_capnp::{register, unregister, upgrade};
}
pub mod errors;
mod service;
mod socket;
/// A service representation on the qrpc system
pub struct Service {
name: String,
version: u16,
description: String,
hash_id: Option<Identity>,
}
impl Service {
/// Create a new service without hash_id
///
/// The `hash_id` field will be filled in by the remote RPC server
/// after calling `register()`.
pub fn new<S: Into<String>>(name: S, version: u16, description: S) -> Self {
Self {
name: name.into(),
version,
description: description.into(),
hash_id: None,
}
}
/// Register this service with the RPC broker/ libqaul
pub async fn register(&mut self) -> Option<()> {
None
}
}
pub use service::Service;
use crate::errors::ServiceResult as Result;
use identity::Identity;
use std::sync::Arc;
/// A service representation on the qrpc system
///
/// Use this struct to handle RPC connections to the network, and to
/// update any data you want your service to broadcast to other
/// participants on the QRPC system.
///
/// ```
/// let my_serv = Service::new(
/// "de.spacekookie.myapp",
/// 1,
/// "An app that does things!");
///
/// // Nothing happened yet. Connect to QRPC and register yourself
///
pub struct Service {
name: String,
version: u16,
description: String,
hash_id: Option<Identity>,
}
impl Service {
/// Create a new service without hash_id
///
/// The `hash_id` field will be filled in by the remote RPC server
/// after calling `register()`.
pub fn new<S: Into<String>>(name: S, version: u16, description: S) -> Self {
Self {
name: name.into(),
version,
description: description.into(),
hash_id: None,
}
}
/// Register this service with the RPC broker/ libqaul
pub async fn register(&mut self) -> Option<()> {
None
}
}
/// An external service that can be connected to
///
/// In order to use the function-set from an external service, your
/// application needs to include it's client-lib (usually named
/// `<service>-rpc`) which provides a strongly typed API that
/// abstracts away the RPC protocol logic.
///
/// Any service that should be connectable needs to implement this
/// trait. Any service API object also needs to implement the
/// `Default` trait so that the sdk internally can create a default of
/// it, then call `establish_connection()` to fully initialise it.
#[async_trait::async_trait]
pub trait ServiceConnector: Default {
/// Start a connection to the service backend
async fn establish_connection(self: Arc<Self>) -> Result<()>;
/// Terminate the connection to the service backend
async fn terminate_connection(self: Arc<Self>) -> Result<()>;
}
//! An internal abstraction over the RPC socket
use async_std::{sync::Arc, task};
use byteorder::{BigEndian, ByteOrder};
use socket2::{Domain, SockAddr, Socket, Type};
use std::{
future::Future,
io::Result,
path::Path,
sync::atomic::{AtomicBool, Ordering},
};
pub struct RpcSocket {
inner: Socket,
run: AtomicBool,
listening: AtomicBool,
}
impl RpcSocket {
/// Create a new socket to the RPC system
///
/// To listen for new connections you need to explicitly call
/// `listen(...)`, otherwise it will only act as a sending socket,
/// where each reply is meant for one request.
pub fn new<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
let run = AtomicBool::from(true);
let listening = AtomicBool::from(false);
let mut inner = Socket::new(
Domain::unix(),
Type::seqpacket(), // this _may_ not be supported on MacOS
None,
)?;
inner.connect(&SockAddr::unix(path)?)?;
Ok(Arc::new(Self {
inner,
run,
listening,
}))
}
/// Start listening for connections with a future
pub fn listen<F>(self: &Arc<Self>, fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.listening.fetch_or(true, Ordering::Relaxed);
task::spawn(fut);
}
/// Check if the socket is still running
///
/// Use this function in your service's listening code to
/// determine whether the connection should be shut-down
pub fn running(&self) -> bool {
self.run.load(Ordering::Relaxed)
}
/// Query whether this socket is listening for connections
pub fn listening(&self) -> bool {
self.listening.load(Ordering::Relaxed)
}
}
......@@ -7,9 +7,10 @@ edition = "2018"
[dependencies]
identity = { path = "../../ratman/identity", version = "0.4", package = "ratman-identity" }
qrpc-sdk = { path = "../qrpc-sdk" }
capnp = "0.13"
async-std = "=1.5"
byteorder = "1.0"
capnp = "0.13"
qrpc-sdk = { path = "../qrpc-sdk" }
socket2 = { version = "0.3", features = ["unix"] }
async-std = "=1.5"
tracing = "0.1"
\ No newline at end of file
//! An extensible rpc message broker for the libqaul ecosystem.
mod parser;
mod socket;
pub use parser::UtilReader;
/// Hold the main broker state
pub struct Broker {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment