Commit fb810005 authored by Amanjeev Sethi's avatar Amanjeev Sethi

service/files: wip, hacky copy-paste stuff from chat service but compiles

parent 69c05c9d
Pipeline #1008 passed with stages
in 5 minutes and 13 seconds
......@@ -1857,11 +1857,14 @@ dependencies = [
name = "qaul-files"
version = "0.1.0"
dependencies = [
"async-std",
"libqaul",
"mime",
"mime_guess",
"ratman-identity",
"serde",
"tracing",
"tracing-futures",
]
[[package]]
......
......@@ -11,3 +11,6 @@ libqaul = { path = "../../" }
mime = "0.3"
mime_guess = "2.0.1"
serde = { version = "1.0" }
async-std = "=1.5"
tracing = "0.1"
tracing-futures = "0.2"
//! `qaul.net` filesharing service
use std::sync::Arc;
use async_std::{sync::Arc, task};
use mime::Mime;
use libqaul::messages::{Message, MsgQuery};
use libqaul::users::UserAuth;
use libqaul::Identity;
use libqaul::{error::Result, Qaul};
use libqaul::services::ServiceEvent;
use crate::types::{File, FileFilter, FileId};
pub use crate::types::{File, FileFilter, FileId, Subscription, Files};
mod msg;
mod protocol;
pub mod types;
mod worker;
// TODO: Partial files
// TODO: file progress
......@@ -32,8 +34,23 @@ impl Fileshare {
///
/// In order to initialise, a valid and running
/// `Qaul` reference needs to be provided.
pub fn new(qaul: Arc<Qaul>, advertised: Arc<Vec<FileId>>) -> Result<Self> {
Ok(Self { qaul, advertised })
pub fn new(qaul: Arc<Qaul>, advertised: Arc<Vec<FileId>>) -> Result<Arc<Self>> {
let this = Arc::new(Self {qaul, advertised});
let sender = Arc::new(worker::run_asnc(Arc::clone(&this)));
this.qaul.services().register(ASC_NAME, move |cmd| {
let sender = Arc::clone(&sender);
task::block_on(async move {
match cmd {
ServiceEvent::Open(auth) => sender.send(worker::Command::Start(auth)).await,
ServiceEvent::Close(auth) => sender.send(worker::Command::Stop(auth)).await,
}
});
});
Ok(this)
//Ok(Self { qaul, advertised })
}
/// Advertise a file into a network
......@@ -45,7 +62,7 @@ impl Fileshare {
file_type: Mime,
) -> Result<Arc<Vec<FileId>>> {
// returns the `advertised` vector
unimplemented!()
}
......
use libqaul::messages::Message;
use crate::File;
impl From<Message> for File {
fn from(msg: Message) -> Self {
Self {
name: Some(msg.id.to_string()), // TODO: how to get name here?
id: msg.id,
data: Some(msg.payload),
owner: msg.sender,
}
}
}
\ No newline at end of file
......@@ -3,6 +3,8 @@ use serde::{Deserialize, Serialize};
use libqaul::error::Result;
use libqaul::Identity;
use libqaul::users::UserAuth;
use libqaul::messages::{Message, MsgId};
use libqaul::helpers::{Subscription as Sub};
pub type FileId = Identity;
......@@ -71,3 +73,19 @@ pub enum FileFilter {
pub struct Files<'chain> {
pub(crate) q: &'chain crate::Qaul,
}
/// A subscription handler that pushes out updates
pub struct Subscription {
pub(crate) inner: Sub<Message>,
}
impl Subscription {
pub(crate) fn new(inner: Sub<Message>) -> Self {
Self { inner }
}
/// Get the next chat message
pub async fn next(&self) -> File {
self.inner.next().await.into()
}
}
use crate::{ASC_NAME, Fileshare, Subscription};
use libqaul::{users::UserAuth, Identity, helpers::TagSet};
use async_std::{
sync::{channel, Arc, RwLock, Sender},
task,
};
use std::collections::BTreeSet;
use tracing::{debug, info, trace};
pub(crate) enum Command {
Start(UserAuth),
Stop(UserAuth),
}
type RunMap = Arc<RwLock<BTreeSet<Identity>>>;
pub(crate) fn run_asnc(file_serv: Arc<Fileshare>) -> Sender<Command> {
let (tx, rx) = channel(1);
task::spawn(async move {
let map: RunMap = Default::default();
while let Some(cmd) = rx.recv().await {
let map = Arc::clone(&map);
match cmd {
Command::Start(auth) => {
trace!("Receiving libqaul user {} START event!", auth.0);
map.write().await.insert(auth.0);
task::spawn(run_user(auth, Arc::clone(&file_serv), Arc::clone(&map)));
}
Command::Stop(auth) => {
trace!("Receiving libqaul user {} STOP event!", auth.0);
map.write().await.remove(&auth.0);
}
}
}
// Stop all remaining workers
info!("Deallocating subscription workers");
map.write().await.clear();
});
tx
}
pub(crate) async fn run_user(user: UserAuth, file_serv: Arc<Fileshare>, run: RunMap) {
let sub = Subscription::new(
file_serv.qaul
.messages()
.subscribe(user.clone(), ASC_NAME, TagSet::empty())
.await
.unwrap(),
);
trace!("Creating message subscription!");
while run.read().await.contains(&user.0) {
let file = sub.next().await;
if file.owner == user.0 && continue {}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment