(rust) prost: test of protobuf en/decoding and transmission via tcp

This commit is contained in:
scbj
2025-06-03 15:50:25 +02:00
parent a7d09cb062
commit a545b29f03
+112 -28
View File
@@ -1,44 +1,103 @@
#![allow(dead_code, unused)] #![allow(dead_code, unused)]
use chrono::{ use chrono::{DateTime, Local};
DateTime, use prost::{DecodeError, EncodeError, Message};
Local, use proto::protest::*;
};
use tokio::{ use tokio::{
fs, fs,
io::{ io::{AsyncReadExt, AsyncWriteExt},
AsyncReadExt, net::{TcpListener, TcpStream},
AsyncWriteExt, time::{sleep, Duration},
},
net::{
TcpListener,
TcpStream,
},
time::{
Duration,
sleep,
},
}; };
use proto::protest::Payload;
const BLOCK_SIZE: usize = 1024;
const TIMEOUT_LIMIT: usize = 32;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:9090").await?; let listener = TcpListener::bind("127.0.0.1:9090").await?;
let mut file = fs::read("../lorem.md").await?;
let file = fs::read("../lorem.md").await?; println!("filesize = {}", file.len());
tokio::spawn(async move { tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
let mut connection = TcpStream::connect("127.0.0.1:9090").await.unwrap(); let mut connection = TcpStream::connect("127.0.0.1:9090").await.unwrap();
_ = send_file(&mut connection, file).await; let blocks = if 0 == file.len() % BLOCK_SIZE {
file.len() / BLOCK_SIZE
} else {
file.len() / BLOCK_SIZE + 1
};
let mut index = 0;
println!("blocks = {blocks}");
loop {
let mut buffer = vec![0u8; BLOCK_SIZE + 1024];
let data: Vec<u8> = if file.len() > BLOCK_SIZE {
file.drain(0..BLOCK_SIZE).collect()
} else {
file.drain(..).collect()
};
index += 1;
if index > blocks {
panic!("something went horribly wrong, index ({index}) > blocks ({blocks}), remaining filesize = {}", file.len());
}
println!("sending index = {}", index);
let frame = Payload {
index: index as u64,
blocks: blocks as u64,
payload: data,
};
let length = encode_with_length(&frame, &mut buffer).unwrap();
_ = connection.write_all(&buffer[0..length]).await;
if index == blocks {
break;
}
sleep(Duration::from_millis(1)).await;
}
}); });
match listener.accept().await { match listener.accept().await {
Ok((mut stream, address)) => { Ok((mut stream, address)) => {
println!("accepted connection request from {}", address); let mut output = Vec::<u8>::new();
let file = receive_file(&mut stream).await?; let mut buffer = [0u8; BLOCK_SIZE + 1024];
fs::write("test-lorem.md", file).await?; let mut vector: Vec<u8> = Vec::new();
let mut timeout_counter: usize = 0;
'receive: loop {
loop {
let num = stream.read(&mut buffer[..]).await.unwrap();
if num != 0 {
vector.extend(buffer[0..num].iter());
if timeout_counter > 0 {
timeout_counter -= 1;
}
break;
}
timeout_counter += 1;
if timeout_counter > TIMEOUT_LIMIT {
println!("\n\n[[ TIMEOUT ]]\n\n");
break 'receive;
}
sleep(Duration::from_millis(100)).await;
}
let mut delimiter = prost::decode_length_delimiter(&vector[..]).unwrap();
delimiter += prost::length_delimiter_len(delimiter);
let payload = Payload::decode_length_delimited(&vector[0..delimiter]).unwrap();
_ = vector.drain(0..delimiter);
println!("-- receiving index = {}, delimiter = {}", payload.index, delimiter);
output.extend(payload.payload.iter());
if payload.index == payload.blocks {
break;
}
sleep(Duration::from_millis(1)).await;
}
_ = fs::write("./lorem-copy.md", output).await?;
} }
Err(error) => { Err(error) => {
println!("this is unexpected.. {}", error.to_string()); println!("this is unexpected.. {}", error.to_string());
@@ -60,12 +119,30 @@ async fn receive_file(stream: &mut TcpStream) -> Result<Vec<u8>, anyhow::Error>
let num_bytes = stream.read(&mut buffer[..]).await?; let num_bytes = stream.read(&mut buffer[..]).await?;
processing_buffer.extend(&buffer[0..num_bytes]); processing_buffer.extend(&buffer[0..num_bytes]);
if 0 == index_counter { if 0 == index_counter {
size = usize::from_be_bytes(processing_buffer.drain(0..8).collect::<Vec<_>>().try_into().unwrap()); size = usize::from_be_bytes(
processing_buffer
.drain(0..8)
.collect::<Vec<_>>()
.try_into()
.unwrap(),
);
} else { } else {
_ = processing_buffer.drain(0..8); _ = processing_buffer.drain(0..8);
} }
let block_size = usize::from_be_bytes(processing_buffer.drain(0..8).collect::<Vec<_>>().try_into().unwrap()); let block_size = usize::from_be_bytes(
let index = usize::from_be_bytes(processing_buffer.drain(0..8).collect::<Vec<_>>().try_into().unwrap()); processing_buffer
.drain(0..8)
.collect::<Vec<_>>()
.try_into()
.unwrap(),
);
let index = usize::from_be_bytes(
processing_buffer
.drain(0..8)
.collect::<Vec<_>>()
.try_into()
.unwrap(),
);
if index_counter != index { if index_counter != index {
panic!("meh, counter = {index_counter}, index = {index}"); panic!("meh, counter = {index_counter}, index = {index}");
@@ -112,3 +189,10 @@ async fn send_file(stream: &mut TcpStream, mut file: Vec<u8>) -> Result<(), anyh
Ok(()) Ok(())
} }
fn encode_with_length(message: &impl Message, buffer: &mut Vec<u8>) -> Result<usize, EncodeError> {
let mut length = message.encoded_len();
length += prost::length_delimiter_len(length);
message.encode_length_delimited(&mut &mut buffer[0..length])?;
Ok(length)
}