From cd7b47d3e776fe85e2afc31e9be06b2363739eec Mon Sep 17 00:00:00 2001 From: scbj Date: Thu, 31 Jul 2025 16:14:59 +0200 Subject: [PATCH] (rust) tested communcication over tcp with cobs and cbor --- rust/src/main.rs | 245 ++++++++++++++++++++++------------------------- 1 file changed, 115 insertions(+), 130 deletions(-) diff --git a/rust/src/main.rs b/rust/src/main.rs index f8337c1..4ec342d 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,52 +1,60 @@ #![allow(dead_code, unused)] +use std::{num, path::PathBuf, str::FromStr}; + use chrono::{DateTime, Local, Utc}; use ciborium::ser; -use dirs::config_dir; +use dirs::{config_dir, home_dir}; use serde::{Deserialize, Serialize}; use tokio::{ fs, - io::{AsyncWriteExt, AsyncReadExt}, + io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, sync::{ mpsc::{channel, Receiver, Sender}, Mutex, }, - time, + time::{sleep, Duration}, }; #[tokio::main] async fn main() -> anyhow::Result<()> { - let send_handle = tokio::spawn(sender_process()); + let (running, mut stop) = channel::(1); + + let send_handle = tokio::spawn(sender_process(running.clone())); let recv_handle = tokio::spawn(receiver_process()); + _ = recv_handle.await??; + stop.close(); _ = send_handle.await??; - // _ = recv_handle.await??; Ok(()) } -async fn sender_process() -> anyhow::Result<()> { - // const BLOCK_SIZE: usize = 1024; +async fn sender_process(running: Sender) -> anyhow::Result<()> { const BLOCK_SIZE: usize = 32; - // const FILENAME: &str = "/home/scbj/repositories/test-projects/lorem.md"; - const FILENAME: &str = "/home/zegonix/repositories/test-projects/small-lorem.md"; + const FILE_PATH: &str = "repositories/test-projects/"; + const FILE_NAME: &str = "small-lorem.md"; + let listener: TcpListener = TcpListener::bind("127.0.0.1:9090").await?; + let mut connection = match listener.accept().await { + Ok((mut stream, address)) => { + println!("accepted connection request from {}", address); + stream + } + Err(error) => { + println!("this is unexpected.. {}", error.to_string()); + return Err(std::io::Error::other("failed to open tcp connection").try_into()?); + } + }; + + let mut filepath: PathBuf = home_dir().expect("failed to determine home directory"); let mut buffer: Vec = Vec::new(); - // let listener: TcpListener = TcpListener::bind("127.0.0.1:9090").await?; - // let mut connection = match listener.accept().await { - // Ok((mut stream, address)) => { - // println!("accepted connection request from {}", address); - // stream - // } - // Err(error) => { - // println!("this is unexpected.. {}", error.to_string()); - // return Err(std::io::Error::other("failed to open tcp connection").try_into()?) - // } - // }; - // let mut connection = TcpHandler::new(connection); - let mut file: Vec = match fs::read(FILENAME).await { + filepath.push(FILE_PATH); + filepath.push(FILE_NAME); + + let mut file: Vec = match fs::read(filepath).await { Ok(value) => value, Err(error) => { println!("{}", error.to_string()); @@ -54,46 +62,73 @@ async fn sender_process() -> anyhow::Result<()> { } }; let length: usize = file.len(); - let blocks: usize = (length + BLOCK_SIZE/2) / BLOCK_SIZE; + let blocks: usize = (length + BLOCK_SIZE / 2) / BLOCK_SIZE; let mut index: usize = 0; - while index < blocks - 1 { - let end_index: usize = if file.len() < BLOCK_SIZE { file.len() } else { BLOCK_SIZE }; - let message: Message = Message::FileSection(FileSection { name: "lorem.md".to_owned(), size: length as u64, sections: blocks as u64, index: index as u64, data: file.drain(0..BLOCK_SIZE).collect() }); - // connection.write_message(message)?; - _ = message_to_buffer(message, &mut buffer); + connection.writable().await; + while index < blocks { + if index >= blocks { + panic!("index >= blocks !@#$!%"); + } + let end_index: usize = if file.len() < BLOCK_SIZE { + file.len() + } else { + BLOCK_SIZE + }; + let message: Message = Message::FileSection(FileSection { + name: "lorem.md".to_owned(), + size: length as u64, + sections: blocks as u64, + index: index as u64, + data: file.drain(0..end_index).collect(), + }); + let serialised: Vec = message.to_vec(); + connection.try_write(&serialised[..]); index += 1; + + sleep(Duration::from_millis(20)).await; } - fs::write("./serialised.txt", &buffer[..]); - - while !buffer.is_empty() { - match buffer_to_message(&mut buffer) { - Ok(Some(message)) => println!("{:#?}", message), - Ok(None) => println!("hello .."), - Err(error) => println!("error - {}", error.to_string()), - } + while !running.is_closed() { + sleep(Duration::from_millis(20)).await; } Ok(()) } async fn receiver_process() -> anyhow::Result<()> { - let connection = TcpStream::connect("127.0.0.1:9090").await?; - let mut connection = TcpHandler::new(connection); + let mut connection = TcpStream::connect("127.0.0.1:9090").await?; + let mut stream: Vec = Vec::with_capacity(8192); let mut file: Vec = Vec::new(); let mut index: usize = 0; let mut blocks: usize = 0; let mut name: String = "".to_owned(); + let mut buffer = [0u8; 8192]; + + let mut counter: usize = 20; 'receive: loop { - let message: FileSection = if let Ok(Some(Message::FileSection(message))) = connection.read_message() { - message - } else - { - println!("failed to read message"); - continue 'receive; + let length: usize = stream.len(); + + connection.readable().await; + let num_bytes: usize = match connection.try_read(&mut buffer) { + Ok(0) => continue 'receive, + Ok(value) => value, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue 'receive, + Err(_) => panic!("fuck you!"), }; + stream.extend_from_slice(&buffer[0..num_bytes]); + + let message: FileSection = match Message::from_vec(&mut stream) { + Ok(Message::FileSection(value)) => value, + Ok(unexpected) => panic!(".. {:#?}", unexpected), + Err(Error::IncompleteFrame) => continue 'receive, + Err(Error::EmptyBuffer) => panic!("don't be stupid!"), + Err(error) => panic!(".. {:#?}", error), + }; + + // println!("{:#?}", message); + if blocks == 0 { blocks = message.sections as usize; name = message.name; @@ -101,12 +136,15 @@ async fn receiver_process() -> anyhow::Result<()> { index = message.index as usize; file.extend(message.data); - println!("\nblocks = {blocks}, index = {index}\n"); - if index >= blocks - 1 { break 'receive }; + if index >= blocks - 1 { + break 'receive; + }; + + sleep(Duration::from_millis(20)).await; } - // fs::write(&format!("./{name}"), &file[..]); + fs::write("./received.md", file).await; Ok(()) } @@ -130,99 +168,46 @@ pub struct FileSection { } #[derive(Debug, thiserror::Error)] -enum Error { - #[error("writing message to socket failed")] - FailedWrite(#[from] std::io::Error), +pub enum Error { + #[error("serialisation with cbor failed")] + Serialisation(#[from] ciborium::de::Error), - #[error("incomplete write error")] - IncompleteWrite, + #[error("framing with cobs failed")] + Framing(#[from] cobs::DecodeError), - #[error("failed to read from tcp stream")] - FailedRead, + #[error("can't deserialise from an empty buffer")] + EmptyBuffer, + + #[error("no framedelimiter found, the frame is probably not fully received yet")] + IncompleteFrame, } -struct TcpHandler { - socket: TcpStream, - buffer: Vec, -} - -impl TcpHandler { - const BUFFER_SIZE: usize = 4096; - - pub fn new(socket: TcpStream) -> Self { - Self { - socket, - buffer: Vec::::with_capacity(Self::BUFFER_SIZE), - } +impl Message { + /// serialises the message and returns a vector of `u8` + pub fn to_vec(&self) -> Vec { + let mut serialised: Vec = Vec::with_capacity(1024); + _ = ciborium::into_writer(self, &mut serialised); + let mut framed: Vec = cobs::encode_vec(&serialised); + framed.push(0x00u8); + framed } - /// reads a message from the socket - /// returns a message or none if no message was received - /// incomplete messages are stored in a buffer - pub fn read_message(&mut self) -> Result, Error> { - let mut buffer: [u8; Self::BUFFER_SIZE] = [0u8; Self::BUFFER_SIZE]; - let bytes: usize = match self.socket.try_read(&mut buffer) { - Ok(value) => value, - Err(error) => return Err(Error::FailedRead), - }; - self.buffer.extend(&buffer[0..bytes]); + /// deserialises a message from a vector + pub fn from_vec(buffer: &mut Vec) -> Result { + if buffer.is_empty() { return Err(Error::EmptyBuffer); } + if !buffer.contains(&0u8) { return Err(Error::IncompleteFrame); } - if !self.buffer.contains(&0u8) { - return Ok(None); + let mut index: usize = 0; + for (n, byte) in buffer.iter().enumerate() { + if byte == &0u8 { + index = n; + break; + } } - // let index = self.buffer - // TODO: proper error handling - let message_cbor: Vec = match cobs::decode_vec(&self.buffer) { - Ok(value) => value, - Err(error) => panic!("{}", error), - }; - - let message: Option = match ciborium::from_reader(std::io::Cursor::new(&mut self.buffer)) { - Ok(value) => Some(value), - Err(error) => None, - }; + let serialised: Vec = cobs::decode_vec(&buffer.drain(0..=index).collect::>())?; + let message: Message = ciborium::from_reader(&serialised[..])?; Ok(message) } - - pub fn write_message(&mut self, message: Message) -> Result<(), Error> { - let mut message_cbor: Vec = Vec::new(); - _ = ciborium::into_writer(&message, &mut message_cbor); - let mut framed_message: Vec = cobs::encode_vec(&message_cbor); - framed_message.push(0x00u8); - let bytes: usize = self.socket.try_write(&framed_message)?; - - if bytes != framed_message.len() { - return Err(Error::IncompleteWrite); - } - - Ok(()) - } -} - -pub fn message_to_buffer(message: Message, buffer: &mut Vec) -> anyhow::Result<()> { - let mut serialised: Vec = Vec::new(); - _ = ciborium::into_writer(&message, &mut serialised); - let mut framed: Vec = cobs::encode_vec(&serialised); - framed.push(0x00u8); - buffer.extend(framed); - Ok(()) -} - -pub fn buffer_to_message(buffer: &mut Vec) -> anyhow::Result> { - let mut index: usize = 0; - - if !buffer.contains(&0x00u8) { return Ok(None) } - for (n, byte) in buffer.iter().enumerate() { - if byte == &0u8 { - index = n; - break; - } - }; - - let deframed = cobs::decode_vec(&buffer.drain(0..=index).collect::>())?; - let deserialised: Message = ciborium::from_reader(&deframed[..])?; - - Ok(Some(deserialised)) }