(rust) started another cobs test

This commit is contained in:
scbj
2025-07-29 16:00:25 +02:00
parent f5a20d500e
commit 462d45ffd0
2 changed files with 220 additions and 19 deletions
+2
View File
@@ -11,5 +11,7 @@ clap = { version = "4.5.0", features = ["derive"] }
cobs = { version = "0.4.0" } cobs = { version = "0.4.0" }
dirs = "5.0.1" dirs = "5.0.1"
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.141"
thiserror = "2.0.12" thiserror = "2.0.12"
tokio = { version = "1.46.1", features = ["full"] }
+218 -19
View File
@@ -1,29 +1,228 @@
#![allow(dead_code, unused)] #![allow(dead_code, unused)]
use chrono::{DateTime, Local, Utc}; use chrono::{DateTime, Local, Utc};
use ciborium; use ciborium::ser;
use cobs;
use dirs::config_dir; use dirs::config_dir;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{
fs,
io::{AsyncWriteExt, AsyncReadExt},
net::{TcpListener, TcpStream},
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
},
time,
};
#[derive(Clone, Debug, Deserialize, Serialize)] #[tokio::main]
pub struct TestStruct { async fn main() -> anyhow::Result<()> {
pub test_int: i32, let send_handle = tokio::spawn(sender_process());
pub test_string: String, let recv_handle = tokio::spawn(receiver_process());
}
fn main() -> anyhow::Result<()> { _ = send_handle.await??;
let test_var: TestStruct = TestStruct { test_int: 0, test_string: "hello\0 world".to_owned() }; // _ = recv_handle.await??;
let mut serialised: Vec<u8> = Vec::with_capacity(1024);
let mut framed = vec![0u8; 1024];
_ = ciborium::into_writer(&test_var, &mut serialised);
println!("cbor: {:02x?}", &serialised[..]);
let num = cobs::encode(&serialised[..], &mut framed);
println!("cobs: {:02x?}", &framed[..num]);
Ok(()) Ok(())
} }
async fn sender_process() -> anyhow::Result<()> {
// const BLOCK_SIZE: usize = 1024;
const BLOCK_SIZE: usize = 32;
// const FILENAME: &str = "/home/scbj/repositories/test-projects/lorem.md";
const FILENAME: &str = "/home/zegonix/repositories/test-projects/small-lorem.md";
let mut buffer: Vec<u8> = Vec::new();
// let listener: TcpListener = TcpListener::bind("127.0.0.1:9090").await?;
// let mut connection = match listener.accept().await {
// Ok((mut stream, address)) => {
// println!("accepted connection request from {}", address);
// stream
// }
// Err(error) => {
// println!("this is unexpected.. {}", error.to_string());
// return Err(std::io::Error::other("failed to open tcp connection").try_into()?)
// }
// };
// let mut connection = TcpHandler::new(connection);
let mut file: Vec<u8> = match fs::read(FILENAME).await {
Ok(value) => value,
Err(error) => {
println!("{}", error.to_string());
panic!("failed to read file");
}
};
let length: usize = file.len();
let blocks: usize = (length + BLOCK_SIZE/2) / BLOCK_SIZE;
let mut index: usize = 0;
while index < blocks - 1 {
let end_index: usize = if file.len() < BLOCK_SIZE { file.len() } else { BLOCK_SIZE };
let message: Message = Message::FileSection(FileSection { name: "lorem.md".to_owned(), size: length as u64, sections: blocks as u64, index: index as u64, data: file.drain(0..BLOCK_SIZE).collect() });
// connection.write_message(message)?;
_ = message_to_buffer(message, &mut buffer);
index += 1;
}
fs::write("./serialised.txt", &buffer[..]);
while !buffer.is_empty() {
match buffer_to_message(&mut buffer) {
Ok(Some(message)) => println!("{:#?}", message),
Ok(None) => println!("hello .."),
Err(error) => println!("error - {}", error.to_string()),
}
}
Ok(())
}
async fn receiver_process() -> anyhow::Result<()> {
let connection = TcpStream::connect("127.0.0.1:9090").await?;
let mut connection = TcpHandler::new(connection);
let mut file: Vec<u8> = Vec::new();
let mut index: usize = 0;
let mut blocks: usize = 0;
let mut name: String = "".to_owned();
'receive: loop {
let message: FileSection = if let Ok(Some(Message::FileSection(message))) = connection.read_message() {
message
} else
{
println!("failed to read message");
continue 'receive;
};
if blocks == 0 {
blocks = message.sections as usize;
name = message.name;
}
index = message.index as usize;
file.extend(message.data);
println!("\nblocks = {blocks}, index = {index}\n");
if index >= blocks - 1 { break 'receive };
}
// fs::write(&format!("./{name}"), &file[..]);
Ok(())
}
type FileName = String;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message {
Acknowledge(bool),
FileRequest(FileName),
FileSection(FileSection),
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct FileSection {
pub name: String,
pub size: u64,
pub sections: u64,
pub index: u64,
pub data: Vec<u8>,
}
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("writing message to socket failed")]
FailedWrite(#[from] std::io::Error),
#[error("incomplete write error")]
IncompleteWrite,
#[error("failed to read from tcp stream")]
FailedRead,
}
struct TcpHandler {
socket: TcpStream,
buffer: Vec<u8>,
}
impl TcpHandler {
const BUFFER_SIZE: usize = 4096;
pub fn new(socket: TcpStream) -> Self {
Self {
socket,
buffer: Vec::<u8>::with_capacity(Self::BUFFER_SIZE),
}
}
/// reads a message from the socket
/// returns a message or none if no message was received
/// incomplete messages are stored in a buffer
pub fn read_message(&mut self) -> Result<Option<Message>, Error> {
let mut buffer: [u8; Self::BUFFER_SIZE] = [0u8; Self::BUFFER_SIZE];
let bytes: usize = match self.socket.try_read(&mut buffer) {
Ok(value) => value,
Err(error) => return Err(Error::FailedRead),
};
self.buffer.extend(&buffer[0..bytes]);
if !self.buffer.contains(&0u8) {
return Ok(None);
}
// let index = self.buffer
// TODO: proper error handling
let message_cbor: Vec<u8> = match cobs::decode_vec(&self.buffer) {
Ok(value) => value,
Err(error) => panic!("{}", error),
};
let message: Option<Message> = match ciborium::from_reader(std::io::Cursor::new(&mut self.buffer)) {
Ok(value) => Some(value),
Err(error) => None,
};
Ok(message)
}
pub fn write_message(&mut self, message: Message) -> Result<(), Error> {
let mut message_cbor: Vec<u8> = Vec::new();
_ = ciborium::into_writer(&message, &mut message_cbor);
let mut framed_message: Vec<u8> = cobs::encode_vec(&message_cbor);
framed_message.push(0x00u8);
let bytes: usize = self.socket.try_write(&framed_message)?;
if bytes != framed_message.len() {
return Err(Error::IncompleteWrite);
}
Ok(())
}
}
pub fn message_to_buffer(message: Message, buffer: &mut Vec<u8>) -> anyhow::Result<()> {
let mut serialised: Vec<u8> = Vec::new();
_ = ciborium::into_writer(&message, &mut serialised);
let mut framed: Vec<u8> = cobs::encode_vec(&serialised);
framed.push(0x00u8);
buffer.extend(framed);
Ok(())
}
pub fn buffer_to_message(buffer: &mut Vec<u8>) -> anyhow::Result<Option<Message>> {
let mut index: usize = 0;
if !buffer.contains(&0x00u8) { return Ok(None) }
for (n, byte) in buffer.iter().enumerate() {
if byte == &0u8 {
index = n;
break;
}
};
let deframed = cobs::decode_vec(&buffer.drain(0..=index).collect::<Vec<u8>>())?;
let deserialised: Message = ciborium::from_reader(&deframed[..])?;
Ok(Some(deserialised))
}