diff --git a/io_uring/src/uring.rs b/io_uring/src/uring.rs index 5c00129487..4d401e6ce9 100644 --- a/io_uring/src/uring.rs +++ b/io_uring/src/uring.rs @@ -4,6 +4,7 @@ use std::fmt; use std::fs::File; +use std::io::IoSlice; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::ptr::null_mut; use std::sync::atomic::{AtomicU32, Ordering}; @@ -258,6 +259,60 @@ impl URingContext { self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8) } + /// Asynchronously writes to `fd` from the addresses given in `iovecs`. + /// # Safety + /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller + /// guarantees there are no other references to that memory and that the memory lives until the + /// transaction is complete and that completion has been returned from the `wait` function. In + /// addition there must not be any mutable references to the data pointed to by `iovecs` until + /// the operation completes. Ensure that the fd remains open until the op completes as well. + pub unsafe fn add_writev( + &mut self, + iovecs: &[IoSlice], + fd: RawFd, + offset: u64, + user_data: UserData, + ) -> Result<()> { + self.prep_next_sqe(|sqe, _iovec| { + sqe.opcode = IORING_OP_WRITEV as u8; + sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64; + sqe.len = iovecs.len() as u32; + sqe.__bindgen_anon_1.off = offset; + sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; + sqe.ioprio = 0; + sqe.user_data = user_data; + sqe.flags = 0; + sqe.fd = fd; + }) + } + + /// Asynchronously reads from `fd` to the addresses given in `iovecs`. + /// # Safety + /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller + /// guarantees there are no other references to that memory and that the memory lives until the + /// transaction is complete and that completion has been returned from the `wait` function. In + /// addition there must not be any references to the data pointed to by `iovecs` until the + /// operation completes. Ensure that the fd remains open until the op completes as well. + pub unsafe fn add_readv( + &mut self, + iovecs: &[IoSlice], + fd: RawFd, + offset: u64, + user_data: UserData, + ) -> Result<()> { + self.prep_next_sqe(|sqe, _iovec| { + sqe.opcode = IORING_OP_READV as u8; + sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64; + sqe.len = iovecs.len() as u32; + sqe.__bindgen_anon_1.off = offset; + sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; + sqe.ioprio = 0; + sqe.user_data = user_data; + sqe.flags = 0; + sqe.fd = fd; + }) + } + /// Syncs all completed operations, the ordering with in-flight async ops is not /// defined. pub fn add_fsync(&mut self, fd: RawFd, user_data: UserData) -> Result<()> { @@ -581,7 +636,7 @@ impl QueuePointers { #[cfg(test)] mod tests { use std::fs::OpenOptions; - use std::io::Write; + use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::time::Duration; @@ -596,14 +651,43 @@ mod tests { joined } - #[test] - fn read_one_block_at_a_time() { - let tempdir = TempDir::new().unwrap(); - let file_path = append_file_name(tempdir.path(), "test"); - let queue_size = 128; + fn check_one_read( + uring: &mut URingContext, + buf: &mut [u8], + fd: RawFd, + offset: u64, + user_data: UserData, + ) { + let (user_data_ret, res) = unsafe { + // Safe because the `wait` call waits until the kernel is done with `buf`. + uring + .add_read(buf.as_mut_ptr(), buf.len(), fd, offset, user_data) + .unwrap(); + uring.wait().unwrap().next().unwrap() + }; + assert_eq!(user_data_ret, user_data); + assert_eq!(res.unwrap(), buf.len() as u32); + } - let mut uring = URingContext::new(queue_size).unwrap(); - let mut buf = [0u8; 0x1000]; + fn check_one_readv( + uring: &mut URingContext, + buf: &mut [u8], + fd: RawFd, + offset: u64, + user_data: UserData, + ) { + let iovecs = [IoSlice::new(buf)]; + let (user_data_ret, res) = unsafe { + // Safe because the `wait` call waits until the kernel is done with `buf`. + uring.add_readv(&iovecs, fd, offset, user_data).unwrap(); + uring.wait().unwrap().next().unwrap() + }; + assert_eq!(user_data_ret, user_data); + assert_eq!(res.unwrap(), buf.len() as u32); + } + + fn create_test_file(temp_dir: &TempDir, size: u64) -> std::fs::File { + let file_path = append_file_name(temp_dir.path(), "test"); let f = OpenOptions::new() .read(true) .write(true) @@ -611,28 +695,62 @@ mod tests { .truncate(true) .open(&file_path) .unwrap(); - f.set_len(0x1000 * 2).unwrap(); + f.set_len(size).unwrap(); + f + } + #[test] + fn read_readv() { + let temp_dir = TempDir::new().unwrap(); + let queue_size = 128; + + let mut uring = URingContext::new(queue_size).unwrap(); + let mut buf = [0u8; 0x1000]; + let f = create_test_file(&temp_dir, 0x1000 * 2); + + // check that the whoe file can be read and that the queues wrapping is handled by reading + // double the quue depth of buffers. for i in 0..queue_size * 2 { - unsafe { - // Safe because the `wait` call waits until the kernel is done with`buf`. - let index = i as u64; - uring - .add_read( - buf.as_mut_ptr(), - buf.len(), - f.as_raw_fd(), - (index % 2) * 0x1000, - index, - ) - .unwrap(); - let (user_data, res) = uring.wait().unwrap().next().unwrap(); - assert_eq!(user_data, i as UserData); - assert_eq!(res.unwrap(), buf.len() as u32); - } + let index = i as u64; + check_one_read( + &mut uring, + &mut buf, + f.as_raw_fd(), + (index % 2) * 0x1000, + index, + ); + check_one_readv( + &mut uring, + &mut buf, + f.as_raw_fd(), + (index % 2) * 0x1000, + index, + ); } } + #[test] + fn readv_vec() { + let temp_dir = TempDir::new().unwrap(); + let queue_size = 128; + const BUF_SIZE: usize = 0x2000; + + let mut uring = URingContext::new(queue_size).unwrap(); + let buf = [0u8; BUF_SIZE]; + let buf2 = [0u8; BUF_SIZE]; + let buf3 = [0u8; BUF_SIZE]; + let io_slices = vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]; + let total_len = io_slices.iter().fold(0, |a, iovec| a + iovec.len()); + let f = create_test_file(&temp_dir, total_len as u64 * 2); + let (user_data_ret, res) = unsafe { + // Safe because the `wait` call waits until the kernel is done with `buf`. + uring.add_readv(&io_slices, f.as_raw_fd(), 0, 55).unwrap(); + uring.wait().unwrap().next().unwrap() + }; + assert_eq!(user_data_ret, 55); + assert_eq!(res.unwrap(), total_len as u32); + } + #[test] fn write_one_block() { let tempdir = TempDir::new().unwrap(); @@ -700,6 +818,41 @@ mod tests { assert_eq!(res.unwrap(), buf.len() as u32); } } + + #[test] + fn writev_vec() { + let temp_dir = TempDir::new().unwrap(); + let queue_size = 128; + const BUF_SIZE: usize = 0x2000; + const OFFSET: u64 = 0x2000; + + let mut uring = URingContext::new(queue_size).unwrap(); + let buf = [0xaau8; BUF_SIZE]; + let buf2 = [0xffu8; BUF_SIZE]; + let buf3 = [0x55u8; BUF_SIZE]; + let io_slices = vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]; + let total_len = io_slices.iter().fold(0, |a, iovec| a + iovec.len()); + let mut f = create_test_file(&temp_dir, total_len as u64 * 2); + let (user_data_ret, res) = unsafe { + // Safe because the `wait` call waits until the kernel is done with `buf`. + uring + .add_writev(&io_slices, f.as_raw_fd(), OFFSET, 55) + .unwrap(); + uring.wait().unwrap().next().unwrap() + }; + assert_eq!(user_data_ret, 55); + assert_eq!(res.unwrap(), total_len as u32); + + let mut read_back = [0u8; BUF_SIZE]; + f.seek(SeekFrom::Start(OFFSET)).unwrap(); + f.read(&mut read_back).unwrap(); + assert!(!read_back.iter().any(|&b| b != 0xaa)); + f.read(&mut read_back).unwrap(); + assert!(!read_back.iter().any(|&b| b != 0xff)); + f.read(&mut read_back).unwrap(); + assert!(!read_back.iter().any(|&b| b != 0x55)); + } + #[test] fn fallocate_fsync() { let tempdir = TempDir::new().unwrap();