io_uring: add reading and writing iovecs

Let the user of the io_uring interface specify a slice of iovecs to read
from or write to when queueing an operation to the ring.

These ops can be used by block that has a Vec of iovecs from the
descriptor chain already.

Change-Id: Ia91e03e441cdae03e4fdba33bb601de006ef53ef
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2140914
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Tested-by: Dylan Reid <dgreid@chromium.org>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
This commit is contained in:
Dylan Reid 2020-04-05 21:36:46 +00:00 committed by Commit Bot
parent 40d0e01de6
commit 23a2b7b8a2

View file

@ -4,6 +4,7 @@
use std::fmt;
use std::fs::File;
use std::io::IoSlice;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::ptr::null_mut;
use std::sync::atomic::{AtomicU32, Ordering};
@ -258,6 +259,60 @@ impl URingContext {
self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8)
}
/// Asynchronously writes to `fd` from the addresses given in `iovecs`.
/// # Safety
/// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
/// guarantees there are no other references to that memory and that the memory lives until the
/// transaction is complete and that completion has been returned from the `wait` function. In
/// addition there must not be any mutable references to the data pointed to by `iovecs` until
/// the operation completes. Ensure that the fd remains open until the op completes as well.
pub unsafe fn add_writev(
&mut self,
iovecs: &[IoSlice],
fd: RawFd,
offset: u64,
user_data: UserData,
) -> Result<()> {
self.prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_WRITEV as u8;
sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
sqe.len = iovecs.len() as u32;
sqe.__bindgen_anon_1.off = offset;
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
sqe.ioprio = 0;
sqe.user_data = user_data;
sqe.flags = 0;
sqe.fd = fd;
})
}
/// Asynchronously reads from `fd` to the addresses given in `iovecs`.
/// # Safety
/// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
/// guarantees there are no other references to that memory and that the memory lives until the
/// transaction is complete and that completion has been returned from the `wait` function. In
/// addition there must not be any references to the data pointed to by `iovecs` until the
/// operation completes. Ensure that the fd remains open until the op completes as well.
pub unsafe fn add_readv(
&mut self,
iovecs: &[IoSlice],
fd: RawFd,
offset: u64,
user_data: UserData,
) -> Result<()> {
self.prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_READV as u8;
sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
sqe.len = iovecs.len() as u32;
sqe.__bindgen_anon_1.off = offset;
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
sqe.ioprio = 0;
sqe.user_data = user_data;
sqe.flags = 0;
sqe.fd = fd;
})
}
/// Syncs all completed operations, the ordering with in-flight async ops is not
/// defined.
pub fn add_fsync(&mut self, fd: RawFd, user_data: UserData) -> Result<()> {
@ -581,7 +636,7 @@ impl QueuePointers {
#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::io::Write;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time::Duration;
@ -596,14 +651,43 @@ mod tests {
joined
}
#[test]
fn read_one_block_at_a_time() {
let tempdir = TempDir::new().unwrap();
let file_path = append_file_name(tempdir.path(), "test");
let queue_size = 128;
fn check_one_read(
uring: &mut URingContext,
buf: &mut [u8],
fd: RawFd,
offset: u64,
user_data: UserData,
) {
let (user_data_ret, res) = unsafe {
// Safe because the `wait` call waits until the kernel is done with `buf`.
uring
.add_read(buf.as_mut_ptr(), buf.len(), fd, offset, user_data)
.unwrap();
uring.wait().unwrap().next().unwrap()
};
assert_eq!(user_data_ret, user_data);
assert_eq!(res.unwrap(), buf.len() as u32);
}
let mut uring = URingContext::new(queue_size).unwrap();
let mut buf = [0u8; 0x1000];
fn check_one_readv(
uring: &mut URingContext,
buf: &mut [u8],
fd: RawFd,
offset: u64,
user_data: UserData,
) {
let iovecs = [IoSlice::new(buf)];
let (user_data_ret, res) = unsafe {
// Safe because the `wait` call waits until the kernel is done with `buf`.
uring.add_readv(&iovecs, fd, offset, user_data).unwrap();
uring.wait().unwrap().next().unwrap()
};
assert_eq!(user_data_ret, user_data);
assert_eq!(res.unwrap(), buf.len() as u32);
}
fn create_test_file(temp_dir: &TempDir, size: u64) -> std::fs::File {
let file_path = append_file_name(temp_dir.path(), "test");
let f = OpenOptions::new()
.read(true)
.write(true)
@ -611,28 +695,62 @@ mod tests {
.truncate(true)
.open(&file_path)
.unwrap();
f.set_len(0x1000 * 2).unwrap();
f.set_len(size).unwrap();
f
}
#[test]
fn read_readv() {
let temp_dir = TempDir::new().unwrap();
let queue_size = 128;
let mut uring = URingContext::new(queue_size).unwrap();
let mut buf = [0u8; 0x1000];
let f = create_test_file(&temp_dir, 0x1000 * 2);
// check that the whoe file can be read and that the queues wrapping is handled by reading
// double the quue depth of buffers.
for i in 0..queue_size * 2 {
unsafe {
// Safe because the `wait` call waits until the kernel is done with`buf`.
let index = i as u64;
uring
.add_read(
buf.as_mut_ptr(),
buf.len(),
f.as_raw_fd(),
(index % 2) * 0x1000,
index,
)
.unwrap();
let (user_data, res) = uring.wait().unwrap().next().unwrap();
assert_eq!(user_data, i as UserData);
assert_eq!(res.unwrap(), buf.len() as u32);
}
let index = i as u64;
check_one_read(
&mut uring,
&mut buf,
f.as_raw_fd(),
(index % 2) * 0x1000,
index,
);
check_one_readv(
&mut uring,
&mut buf,
f.as_raw_fd(),
(index % 2) * 0x1000,
index,
);
}
}
#[test]
fn readv_vec() {
let temp_dir = TempDir::new().unwrap();
let queue_size = 128;
const BUF_SIZE: usize = 0x2000;
let mut uring = URingContext::new(queue_size).unwrap();
let buf = [0u8; BUF_SIZE];
let buf2 = [0u8; BUF_SIZE];
let buf3 = [0u8; BUF_SIZE];
let io_slices = vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)];
let total_len = io_slices.iter().fold(0, |a, iovec| a + iovec.len());
let f = create_test_file(&temp_dir, total_len as u64 * 2);
let (user_data_ret, res) = unsafe {
// Safe because the `wait` call waits until the kernel is done with `buf`.
uring.add_readv(&io_slices, f.as_raw_fd(), 0, 55).unwrap();
uring.wait().unwrap().next().unwrap()
};
assert_eq!(user_data_ret, 55);
assert_eq!(res.unwrap(), total_len as u32);
}
#[test]
fn write_one_block() {
let tempdir = TempDir::new().unwrap();
@ -700,6 +818,41 @@ mod tests {
assert_eq!(res.unwrap(), buf.len() as u32);
}
}
#[test]
fn writev_vec() {
let temp_dir = TempDir::new().unwrap();
let queue_size = 128;
const BUF_SIZE: usize = 0x2000;
const OFFSET: u64 = 0x2000;
let mut uring = URingContext::new(queue_size).unwrap();
let buf = [0xaau8; BUF_SIZE];
let buf2 = [0xffu8; BUF_SIZE];
let buf3 = [0x55u8; BUF_SIZE];
let io_slices = vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)];
let total_len = io_slices.iter().fold(0, |a, iovec| a + iovec.len());
let mut f = create_test_file(&temp_dir, total_len as u64 * 2);
let (user_data_ret, res) = unsafe {
// Safe because the `wait` call waits until the kernel is done with `buf`.
uring
.add_writev(&io_slices, f.as_raw_fd(), OFFSET, 55)
.unwrap();
uring.wait().unwrap().next().unwrap()
};
assert_eq!(user_data_ret, 55);
assert_eq!(res.unwrap(), total_len as u32);
let mut read_back = [0u8; BUF_SIZE];
f.seek(SeekFrom::Start(OFFSET)).unwrap();
f.read(&mut read_back).unwrap();
assert!(!read_back.iter().any(|&b| b != 0xaa));
f.read(&mut read_back).unwrap();
assert!(!read_back.iter().any(|&b| b != 0xff));
f.read(&mut read_back).unwrap();
assert!(!read_back.iter().any(|&b| b != 0x55));
}
#[test]
fn fallocate_fsync() {
let tempdir = TempDir::new().unwrap();