From 462d45ffd007805f889b5a4544cd8e683897e2db Mon Sep 17 00:00:00 2001 From: scbj Date: Tue, 29 Jul 2025 16:00:25 +0200 Subject: [PATCH] (rust) started another cobs test --- rust/Cargo.toml | 2 + rust/src/main.rs | 237 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 220 insertions(+), 19 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 9941cf7..ea8c327 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -11,5 +11,7 @@ clap = { version = "4.5.0", features = ["derive"] } cobs = { version = "0.4.0" } dirs = "5.0.1" serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.141" thiserror = "2.0.12" +tokio = { version = "1.46.1", features = ["full"] } diff --git a/rust/src/main.rs b/rust/src/main.rs index 4c2c613..f8337c1 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,29 +1,228 @@ #![allow(dead_code, unused)] use chrono::{DateTime, Local, Utc}; -use ciborium; -use cobs; +use ciborium::ser; use dirs::config_dir; use serde::{Deserialize, Serialize}; +use tokio::{ + fs, + io::{AsyncWriteExt, AsyncReadExt}, + net::{TcpListener, TcpStream}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, + }, + time, +}; -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct TestStruct { - pub test_int: i32, - pub test_string: String, -} +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let send_handle = tokio::spawn(sender_process()); + let recv_handle = tokio::spawn(receiver_process()); -fn main() -> anyhow::Result<()> { - let test_var: TestStruct = TestStruct { test_int: 0, test_string: "hello\0 world".to_owned() }; - let mut serialised: Vec = Vec::with_capacity(1024); - let mut framed = vec![0u8; 1024]; - - _ = ciborium::into_writer(&test_var, &mut serialised); - - println!("cbor: {:02x?}", &serialised[..]); - - let num = cobs::encode(&serialised[..], &mut framed); - - println!("cobs: {:02x?}", &framed[..num]); + _ = send_handle.await??; + // _ = recv_handle.await??; Ok(()) } + +async fn sender_process() -> anyhow::Result<()> { + // const BLOCK_SIZE: usize = 1024; + const BLOCK_SIZE: usize = 32; + // const FILENAME: &str = "/home/scbj/repositories/test-projects/lorem.md"; + const FILENAME: &str = "/home/zegonix/repositories/test-projects/small-lorem.md"; + + let mut buffer: Vec = Vec::new(); + + // let listener: TcpListener = TcpListener::bind("127.0.0.1:9090").await?; + // let mut connection = match listener.accept().await { + // Ok((mut stream, address)) => { + // println!("accepted connection request from {}", address); + // stream + // } + // Err(error) => { + // println!("this is unexpected.. {}", error.to_string()); + // return Err(std::io::Error::other("failed to open tcp connection").try_into()?) + // } + // }; + // let mut connection = TcpHandler::new(connection); + let mut file: Vec = match fs::read(FILENAME).await { + Ok(value) => value, + Err(error) => { + println!("{}", error.to_string()); + panic!("failed to read file"); + } + }; + let length: usize = file.len(); + let blocks: usize = (length + BLOCK_SIZE/2) / BLOCK_SIZE; + let mut index: usize = 0; + + while index < blocks - 1 { + let end_index: usize = if file.len() < BLOCK_SIZE { file.len() } else { BLOCK_SIZE }; + let message: Message = Message::FileSection(FileSection { name: "lorem.md".to_owned(), size: length as u64, sections: blocks as u64, index: index as u64, data: file.drain(0..BLOCK_SIZE).collect() }); + // connection.write_message(message)?; + _ = message_to_buffer(message, &mut buffer); + index += 1; + } + + fs::write("./serialised.txt", &buffer[..]); + + while !buffer.is_empty() { + match buffer_to_message(&mut buffer) { + Ok(Some(message)) => println!("{:#?}", message), + Ok(None) => println!("hello .."), + Err(error) => println!("error - {}", error.to_string()), + } + } + + Ok(()) +} + +async fn receiver_process() -> anyhow::Result<()> { + let connection = TcpStream::connect("127.0.0.1:9090").await?; + let mut connection = TcpHandler::new(connection); + let mut file: Vec = Vec::new(); + let mut index: usize = 0; + let mut blocks: usize = 0; + let mut name: String = "".to_owned(); + + 'receive: loop { + let message: FileSection = if let Ok(Some(Message::FileSection(message))) = connection.read_message() { + message + } else + { + println!("failed to read message"); + continue 'receive; + }; + if blocks == 0 { + blocks = message.sections as usize; + name = message.name; + } + index = message.index as usize; + + file.extend(message.data); + println!("\nblocks = {blocks}, index = {index}\n"); + + if index >= blocks - 1 { break 'receive }; + } + + // fs::write(&format!("./{name}"), &file[..]); + + Ok(()) +} + +type FileName = String; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum Message { + Acknowledge(bool), + FileRequest(FileName), + FileSection(FileSection), +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct FileSection { + pub name: String, + pub size: u64, + pub sections: u64, + pub index: u64, + pub data: Vec, +} + +#[derive(Debug, thiserror::Error)] +enum Error { + #[error("writing message to socket failed")] + FailedWrite(#[from] std::io::Error), + + #[error("incomplete write error")] + IncompleteWrite, + + #[error("failed to read from tcp stream")] + FailedRead, +} + +struct TcpHandler { + socket: TcpStream, + buffer: Vec, +} + +impl TcpHandler { + const BUFFER_SIZE: usize = 4096; + + pub fn new(socket: TcpStream) -> Self { + Self { + socket, + buffer: Vec::::with_capacity(Self::BUFFER_SIZE), + } + } + + /// reads a message from the socket + /// returns a message or none if no message was received + /// incomplete messages are stored in a buffer + pub fn read_message(&mut self) -> Result, Error> { + let mut buffer: [u8; Self::BUFFER_SIZE] = [0u8; Self::BUFFER_SIZE]; + let bytes: usize = match self.socket.try_read(&mut buffer) { + Ok(value) => value, + Err(error) => return Err(Error::FailedRead), + }; + self.buffer.extend(&buffer[0..bytes]); + + if !self.buffer.contains(&0u8) { + return Ok(None); + } + // let index = self.buffer + + // TODO: proper error handling + let message_cbor: Vec = match cobs::decode_vec(&self.buffer) { + Ok(value) => value, + Err(error) => panic!("{}", error), + }; + + let message: Option = match ciborium::from_reader(std::io::Cursor::new(&mut self.buffer)) { + Ok(value) => Some(value), + Err(error) => None, + }; + + Ok(message) + } + + pub fn write_message(&mut self, message: Message) -> Result<(), Error> { + let mut message_cbor: Vec = Vec::new(); + _ = ciborium::into_writer(&message, &mut message_cbor); + let mut framed_message: Vec = cobs::encode_vec(&message_cbor); + framed_message.push(0x00u8); + let bytes: usize = self.socket.try_write(&framed_message)?; + + if bytes != framed_message.len() { + return Err(Error::IncompleteWrite); + } + + Ok(()) + } +} + +pub fn message_to_buffer(message: Message, buffer: &mut Vec) -> anyhow::Result<()> { + let mut serialised: Vec = Vec::new(); + _ = ciborium::into_writer(&message, &mut serialised); + let mut framed: Vec = cobs::encode_vec(&serialised); + framed.push(0x00u8); + buffer.extend(framed); + Ok(()) +} + +pub fn buffer_to_message(buffer: &mut Vec) -> anyhow::Result> { + let mut index: usize = 0; + + if !buffer.contains(&0x00u8) { return Ok(None) } + for (n, byte) in buffer.iter().enumerate() { + if byte == &0u8 { + index = n; + break; + } + }; + + let deframed = cobs::decode_vec(&buffer.drain(0..=index).collect::>())?; + let deserialised: Message = ciborium::from_reader(&deframed[..])?; + + Ok(Some(deserialised)) +}