diff --git a/rust/src/main.rs b/rust/src/main.rs index b1bb849..1e3b188 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,44 +1,103 @@ #![allow(dead_code, unused)] -use chrono::{ - DateTime, - Local, -}; +use chrono::{DateTime, Local}; +use prost::{DecodeError, EncodeError, Message}; +use proto::protest::*; use tokio::{ fs, - io::{ - AsyncReadExt, - AsyncWriteExt, - }, - net::{ - TcpListener, - TcpStream, - }, - time::{ - Duration, - sleep, - }, + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + time::{sleep, Duration}, }; -use proto::protest::Payload; + +const BLOCK_SIZE: usize = 1024; +const TIMEOUT_LIMIT: usize = 32; #[tokio::main] async fn main() -> anyhow::Result<()> { let listener = TcpListener::bind("127.0.0.1:9090").await?; - - let file = fs::read("../lorem.md").await?; + let mut file = fs::read("../lorem.md").await?; + println!("filesize = {}", file.len()); tokio::spawn(async move { - sleep(Duration::from_secs(1)).await; - let mut connection = TcpStream::connect("127.0.0.1:9090").await.unwrap(); - _ = send_file(&mut connection, file).await; + let blocks = if 0 == file.len() % BLOCK_SIZE { + file.len() / BLOCK_SIZE + } else { + file.len() / BLOCK_SIZE + 1 + }; + let mut index = 0; + println!("blocks = {blocks}"); + loop { + let mut buffer = vec![0u8; BLOCK_SIZE + 1024]; + let data: Vec = if file.len() > BLOCK_SIZE { + file.drain(0..BLOCK_SIZE).collect() + } else { + file.drain(..).collect() + }; + index += 1; + if index > blocks { + panic!("something went horribly wrong, index ({index}) > blocks ({blocks}), remaining filesize = {}", file.len()); + } + + println!("sending index = {}", index); + let frame = Payload { + index: index as u64, + blocks: blocks as u64, + payload: data, + }; + + let length = encode_with_length(&frame, &mut buffer).unwrap(); + _ = connection.write_all(&buffer[0..length]).await; + + if index == blocks { + break; + } + sleep(Duration::from_millis(1)).await; + } }); match listener.accept().await { Ok((mut stream, address)) => { - println!("accepted connection request from {}", address); - let file = receive_file(&mut stream).await?; - fs::write("test-lorem.md", file).await?; + let mut output = Vec::::new(); + let mut buffer = [0u8; BLOCK_SIZE + 1024]; + let mut vector: Vec = Vec::new(); + let mut timeout_counter: usize = 0; + + 'receive: loop { + loop { + let num = stream.read(&mut buffer[..]).await.unwrap(); + if num != 0 { + vector.extend(buffer[0..num].iter()); + if timeout_counter > 0 { + timeout_counter -= 1; + } + break; + } + timeout_counter += 1; + if timeout_counter > TIMEOUT_LIMIT { + println!("\n\n[[ TIMEOUT ]]\n\n"); + break 'receive; + } + sleep(Duration::from_millis(100)).await; + } + let mut delimiter = prost::decode_length_delimiter(&vector[..]).unwrap(); + delimiter += prost::length_delimiter_len(delimiter); + + let payload = Payload::decode_length_delimited(&vector[0..delimiter]).unwrap(); + _ = vector.drain(0..delimiter); + + println!("-- receiving index = {}, delimiter = {}", payload.index, delimiter); + + output.extend(payload.payload.iter()); + + if payload.index == payload.blocks { + break; + } + sleep(Duration::from_millis(1)).await; + } + + _ = fs::write("./lorem-copy.md", output).await?; } Err(error) => { println!("this is unexpected.. {}", error.to_string()); @@ -60,12 +119,30 @@ async fn receive_file(stream: &mut TcpStream) -> Result, anyhow::Error> let num_bytes = stream.read(&mut buffer[..]).await?; processing_buffer.extend(&buffer[0..num_bytes]); if 0 == index_counter { - size = usize::from_be_bytes(processing_buffer.drain(0..8).collect::>().try_into().unwrap()); + size = usize::from_be_bytes( + processing_buffer + .drain(0..8) + .collect::>() + .try_into() + .unwrap(), + ); } else { _ = processing_buffer.drain(0..8); } - let block_size = usize::from_be_bytes(processing_buffer.drain(0..8).collect::>().try_into().unwrap()); - let index = usize::from_be_bytes(processing_buffer.drain(0..8).collect::>().try_into().unwrap()); + let block_size = usize::from_be_bytes( + processing_buffer + .drain(0..8) + .collect::>() + .try_into() + .unwrap(), + ); + let index = usize::from_be_bytes( + processing_buffer + .drain(0..8) + .collect::>() + .try_into() + .unwrap(), + ); if index_counter != index { panic!("meh, counter = {index_counter}, index = {index}"); @@ -112,3 +189,10 @@ async fn send_file(stream: &mut TcpStream, mut file: Vec) -> Result<(), anyh Ok(()) } +fn encode_with_length(message: &impl Message, buffer: &mut Vec) -> Result { + let mut length = message.encoded_len(); + length += prost::length_delimiter_len(length); + message.encode_length_delimited(&mut &mut buffer[0..length])?; + + Ok(length) +}