(rust) tested communcication over tcp with cobs and cbor
This commit is contained in:
+115
-130
@@ -1,52 +1,60 @@
|
|||||||
#![allow(dead_code, unused)]
|
#![allow(dead_code, unused)]
|
||||||
|
|
||||||
|
use std::{num, path::PathBuf, str::FromStr};
|
||||||
|
|
||||||
use chrono::{DateTime, Local, Utc};
|
use chrono::{DateTime, Local, Utc};
|
||||||
use ciborium::ser;
|
use ciborium::ser;
|
||||||
use dirs::config_dir;
|
use dirs::{config_dir, home_dir};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs,
|
fs,
|
||||||
io::{AsyncWriteExt, AsyncReadExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
sync::{
|
sync::{
|
||||||
mpsc::{channel, Receiver, Sender},
|
mpsc::{channel, Receiver, Sender},
|
||||||
Mutex,
|
Mutex,
|
||||||
},
|
},
|
||||||
time,
|
time::{sleep, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let send_handle = tokio::spawn(sender_process());
|
let (running, mut stop) = channel::<bool>(1);
|
||||||
|
|
||||||
|
let send_handle = tokio::spawn(sender_process(running.clone()));
|
||||||
let recv_handle = tokio::spawn(receiver_process());
|
let recv_handle = tokio::spawn(receiver_process());
|
||||||
|
|
||||||
|
_ = recv_handle.await??;
|
||||||
|
stop.close();
|
||||||
_ = send_handle.await??;
|
_ = send_handle.await??;
|
||||||
// _ = recv_handle.await??;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sender_process() -> anyhow::Result<()> {
|
async fn sender_process(running: Sender<bool>) -> anyhow::Result<()> {
|
||||||
// const BLOCK_SIZE: usize = 1024;
|
|
||||||
const BLOCK_SIZE: usize = 32;
|
const BLOCK_SIZE: usize = 32;
|
||||||
// const FILENAME: &str = "/home/scbj/repositories/test-projects/lorem.md";
|
const FILE_PATH: &str = "repositories/test-projects/";
|
||||||
const FILENAME: &str = "/home/zegonix/repositories/test-projects/small-lorem.md";
|
const FILE_NAME: &str = "small-lorem.md";
|
||||||
|
|
||||||
|
let listener: TcpListener = TcpListener::bind("127.0.0.1:9090").await?;
|
||||||
|
let mut connection = match listener.accept().await {
|
||||||
|
Ok((mut stream, address)) => {
|
||||||
|
println!("accepted connection request from {}", address);
|
||||||
|
stream
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
println!("this is unexpected.. {}", error.to_string());
|
||||||
|
return Err(std::io::Error::other("failed to open tcp connection").try_into()?);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut filepath: PathBuf = home_dir().expect("failed to determine home directory");
|
||||||
let mut buffer: Vec<u8> = Vec::new();
|
let mut buffer: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
// let listener: TcpListener = TcpListener::bind("127.0.0.1:9090").await?;
|
filepath.push(FILE_PATH);
|
||||||
// let mut connection = match listener.accept().await {
|
filepath.push(FILE_NAME);
|
||||||
// Ok((mut stream, address)) => {
|
|
||||||
// println!("accepted connection request from {}", address);
|
let mut file: Vec<u8> = match fs::read(filepath).await {
|
||||||
// stream
|
|
||||||
// }
|
|
||||||
// Err(error) => {
|
|
||||||
// println!("this is unexpected.. {}", error.to_string());
|
|
||||||
// return Err(std::io::Error::other("failed to open tcp connection").try_into()?)
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
// let mut connection = TcpHandler::new(connection);
|
|
||||||
let mut file: Vec<u8> = match fs::read(FILENAME).await {
|
|
||||||
Ok(value) => value,
|
Ok(value) => value,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
println!("{}", error.to_string());
|
println!("{}", error.to_string());
|
||||||
@@ -54,46 +62,73 @@ async fn sender_process() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let length: usize = file.len();
|
let length: usize = file.len();
|
||||||
let blocks: usize = (length + BLOCK_SIZE/2) / BLOCK_SIZE;
|
let blocks: usize = (length + BLOCK_SIZE / 2) / BLOCK_SIZE;
|
||||||
let mut index: usize = 0;
|
let mut index: usize = 0;
|
||||||
|
|
||||||
while index < blocks - 1 {
|
connection.writable().await;
|
||||||
let end_index: usize = if file.len() < BLOCK_SIZE { file.len() } else { BLOCK_SIZE };
|
while index < blocks {
|
||||||
let message: Message = Message::FileSection(FileSection { name: "lorem.md".to_owned(), size: length as u64, sections: blocks as u64, index: index as u64, data: file.drain(0..BLOCK_SIZE).collect() });
|
if index >= blocks {
|
||||||
// connection.write_message(message)?;
|
panic!("index >= blocks !@#$!%");
|
||||||
_ = message_to_buffer(message, &mut buffer);
|
}
|
||||||
|
let end_index: usize = if file.len() < BLOCK_SIZE {
|
||||||
|
file.len()
|
||||||
|
} else {
|
||||||
|
BLOCK_SIZE
|
||||||
|
};
|
||||||
|
let message: Message = Message::FileSection(FileSection {
|
||||||
|
name: "lorem.md".to_owned(),
|
||||||
|
size: length as u64,
|
||||||
|
sections: blocks as u64,
|
||||||
|
index: index as u64,
|
||||||
|
data: file.drain(0..end_index).collect(),
|
||||||
|
});
|
||||||
|
let serialised: Vec<u8> = message.to_vec();
|
||||||
|
connection.try_write(&serialised[..]);
|
||||||
index += 1;
|
index += 1;
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(20)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fs::write("./serialised.txt", &buffer[..]);
|
while !running.is_closed() {
|
||||||
|
sleep(Duration::from_millis(20)).await;
|
||||||
while !buffer.is_empty() {
|
|
||||||
match buffer_to_message(&mut buffer) {
|
|
||||||
Ok(Some(message)) => println!("{:#?}", message),
|
|
||||||
Ok(None) => println!("hello .."),
|
|
||||||
Err(error) => println!("error - {}", error.to_string()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receiver_process() -> anyhow::Result<()> {
|
async fn receiver_process() -> anyhow::Result<()> {
|
||||||
let connection = TcpStream::connect("127.0.0.1:9090").await?;
|
let mut connection = TcpStream::connect("127.0.0.1:9090").await?;
|
||||||
let mut connection = TcpHandler::new(connection);
|
let mut stream: Vec<u8> = Vec::with_capacity(8192);
|
||||||
let mut file: Vec<u8> = Vec::new();
|
let mut file: Vec<u8> = Vec::new();
|
||||||
let mut index: usize = 0;
|
let mut index: usize = 0;
|
||||||
let mut blocks: usize = 0;
|
let mut blocks: usize = 0;
|
||||||
let mut name: String = "".to_owned();
|
let mut name: String = "".to_owned();
|
||||||
|
let mut buffer = [0u8; 8192];
|
||||||
|
|
||||||
|
let mut counter: usize = 20;
|
||||||
|
|
||||||
'receive: loop {
|
'receive: loop {
|
||||||
let message: FileSection = if let Ok(Some(Message::FileSection(message))) = connection.read_message() {
|
let length: usize = stream.len();
|
||||||
message
|
|
||||||
} else
|
connection.readable().await;
|
||||||
{
|
let num_bytes: usize = match connection.try_read(&mut buffer) {
|
||||||
println!("failed to read message");
|
Ok(0) => continue 'receive,
|
||||||
continue 'receive;
|
Ok(value) => value,
|
||||||
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue 'receive,
|
||||||
|
Err(_) => panic!("fuck you!"),
|
||||||
};
|
};
|
||||||
|
stream.extend_from_slice(&buffer[0..num_bytes]);
|
||||||
|
|
||||||
|
let message: FileSection = match Message::from_vec(&mut stream) {
|
||||||
|
Ok(Message::FileSection(value)) => value,
|
||||||
|
Ok(unexpected) => panic!(".. {:#?}", unexpected),
|
||||||
|
Err(Error::IncompleteFrame) => continue 'receive,
|
||||||
|
Err(Error::EmptyBuffer) => panic!("don't be stupid!"),
|
||||||
|
Err(error) => panic!(".. {:#?}", error),
|
||||||
|
};
|
||||||
|
|
||||||
|
// println!("{:#?}", message);
|
||||||
|
|
||||||
if blocks == 0 {
|
if blocks == 0 {
|
||||||
blocks = message.sections as usize;
|
blocks = message.sections as usize;
|
||||||
name = message.name;
|
name = message.name;
|
||||||
@@ -101,12 +136,15 @@ async fn receiver_process() -> anyhow::Result<()> {
|
|||||||
index = message.index as usize;
|
index = message.index as usize;
|
||||||
|
|
||||||
file.extend(message.data);
|
file.extend(message.data);
|
||||||
println!("\nblocks = {blocks}, index = {index}\n");
|
|
||||||
|
|
||||||
if index >= blocks - 1 { break 'receive };
|
if index >= blocks - 1 {
|
||||||
|
break 'receive;
|
||||||
|
};
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(20)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// fs::write(&format!("./{name}"), &file[..]);
|
fs::write("./received.md", file).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -130,99 +168,46 @@ pub struct FileSection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
enum Error {
|
pub enum Error {
|
||||||
#[error("writing message to socket failed")]
|
#[error("serialisation with cbor failed")]
|
||||||
FailedWrite(#[from] std::io::Error),
|
Serialisation(#[from] ciborium::de::Error<std::io::Error>),
|
||||||
|
|
||||||
#[error("incomplete write error")]
|
#[error("framing with cobs failed")]
|
||||||
IncompleteWrite,
|
Framing(#[from] cobs::DecodeError),
|
||||||
|
|
||||||
#[error("failed to read from tcp stream")]
|
#[error("can't deserialise from an empty buffer")]
|
||||||
FailedRead,
|
EmptyBuffer,
|
||||||
|
|
||||||
|
#[error("no framedelimiter found, the frame is probably not fully received yet")]
|
||||||
|
IncompleteFrame,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TcpHandler {
|
impl Message {
|
||||||
socket: TcpStream,
|
/// serialises the message and returns a vector of `u8`
|
||||||
buffer: Vec<u8>,
|
pub fn to_vec(&self) -> Vec<u8> {
|
||||||
}
|
let mut serialised: Vec<u8> = Vec::with_capacity(1024);
|
||||||
|
_ = ciborium::into_writer(self, &mut serialised);
|
||||||
impl TcpHandler {
|
let mut framed: Vec<u8> = cobs::encode_vec(&serialised);
|
||||||
const BUFFER_SIZE: usize = 4096;
|
framed.push(0x00u8);
|
||||||
|
framed
|
||||||
pub fn new(socket: TcpStream) -> Self {
|
|
||||||
Self {
|
|
||||||
socket,
|
|
||||||
buffer: Vec::<u8>::with_capacity(Self::BUFFER_SIZE),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// reads a message from the socket
|
/// deserialises a message from a vector
|
||||||
/// returns a message or none if no message was received
|
pub fn from_vec(buffer: &mut Vec<u8>) -> Result<Message, Error> {
|
||||||
/// incomplete messages are stored in a buffer
|
if buffer.is_empty() { return Err(Error::EmptyBuffer); }
|
||||||
pub fn read_message(&mut self) -> Result<Option<Message>, Error> {
|
if !buffer.contains(&0u8) { return Err(Error::IncompleteFrame); }
|
||||||
let mut buffer: [u8; Self::BUFFER_SIZE] = [0u8; Self::BUFFER_SIZE];
|
|
||||||
let bytes: usize = match self.socket.try_read(&mut buffer) {
|
|
||||||
Ok(value) => value,
|
|
||||||
Err(error) => return Err(Error::FailedRead),
|
|
||||||
};
|
|
||||||
self.buffer.extend(&buffer[0..bytes]);
|
|
||||||
|
|
||||||
if !self.buffer.contains(&0u8) {
|
let mut index: usize = 0;
|
||||||
return Ok(None);
|
for (n, byte) in buffer.iter().enumerate() {
|
||||||
|
if byte == &0u8 {
|
||||||
|
index = n;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// let index = self.buffer
|
|
||||||
|
|
||||||
// TODO: proper error handling
|
let serialised: Vec<u8> = cobs::decode_vec(&buffer.drain(0..=index).collect::<Vec<u8>>())?;
|
||||||
let message_cbor: Vec<u8> = match cobs::decode_vec(&self.buffer) {
|
let message: Message = ciborium::from_reader(&serialised[..])?;
|
||||||
Ok(value) => value,
|
|
||||||
Err(error) => panic!("{}", error),
|
|
||||||
};
|
|
||||||
|
|
||||||
let message: Option<Message> = match ciborium::from_reader(std::io::Cursor::new(&mut self.buffer)) {
|
|
||||||
Ok(value) => Some(value),
|
|
||||||
Err(error) => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(message)
|
Ok(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_message(&mut self, message: Message) -> Result<(), Error> {
|
|
||||||
let mut message_cbor: Vec<u8> = Vec::new();
|
|
||||||
_ = ciborium::into_writer(&message, &mut message_cbor);
|
|
||||||
let mut framed_message: Vec<u8> = cobs::encode_vec(&message_cbor);
|
|
||||||
framed_message.push(0x00u8);
|
|
||||||
let bytes: usize = self.socket.try_write(&framed_message)?;
|
|
||||||
|
|
||||||
if bytes != framed_message.len() {
|
|
||||||
return Err(Error::IncompleteWrite);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn message_to_buffer(message: Message, buffer: &mut Vec<u8>) -> anyhow::Result<()> {
|
|
||||||
let mut serialised: Vec<u8> = Vec::new();
|
|
||||||
_ = ciborium::into_writer(&message, &mut serialised);
|
|
||||||
let mut framed: Vec<u8> = cobs::encode_vec(&serialised);
|
|
||||||
framed.push(0x00u8);
|
|
||||||
buffer.extend(framed);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn buffer_to_message(buffer: &mut Vec<u8>) -> anyhow::Result<Option<Message>> {
|
|
||||||
let mut index: usize = 0;
|
|
||||||
|
|
||||||
if !buffer.contains(&0x00u8) { return Ok(None) }
|
|
||||||
for (n, byte) in buffer.iter().enumerate() {
|
|
||||||
if byte == &0u8 {
|
|
||||||
index = n;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let deframed = cobs::decode_vec(&buffer.drain(0..=index).collect::<Vec<u8>>())?;
|
|
||||||
let deserialised: Message = ciborium::from_reader(&deframed[..])?;
|
|
||||||
|
|
||||||
Ok(Some(deserialised))
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user