diff --git a/Cargo.lock b/Cargo.lock index 3179b43f5c..8bb80a21bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -478,6 +478,7 @@ dependencies = [ name = "io_uring" version = "0.1.0" dependencies = [ + "data_model", "libc", "sys_util", "syscall_defines", diff --git a/io_uring/Cargo.toml b/io_uring/Cargo.toml index 1b633d396d..e1c44f9054 100644 --- a/io_uring/Cargo.toml +++ b/io_uring/Cargo.toml @@ -5,6 +5,7 @@ authors = ["The Chromium OS Authors"] edition = "2018" [dependencies] +data_model = { path = "../data_model" } libc = "*" syscall_defines = { path = "../syscall_defines" } sys_util = { path = "../sys_util" } diff --git a/io_uring/src/uring.rs b/io_uring/src/uring.rs index 00e1d59a01..4e91161339 100644 --- a/io_uring/src/uring.rs +++ b/io_uring/src/uring.rs @@ -11,9 +11,9 @@ use std::fmt; use std::fs::File; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; -use std::ptr::null_mut; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; +use data_model::IoBufMut; use sys_util::{MappedRegion, MemoryMapping, Protection, WatchingEvents}; use crate::bindings::*; @@ -95,7 +95,7 @@ pub struct URingContext { submit_ring: SubmitQueueState, submit_queue_entries: SubmitQueueEntries, complete_ring: CompleteQueueState, - io_vecs: Pin>, + io_vecs: Pin]>>, in_flight: usize, // The number of pending operations. added: usize, // The number of ops added since the last call to `io_uring_enter`. num_sqes: usize, // The total number of sqes allocated in shared memory. @@ -164,16 +164,7 @@ impl URingContext { submit_ring, submit_queue_entries, complete_ring, - io_vecs: Pin::from( - vec![ - libc::iovec { - iov_base: null_mut(), - iov_len: 0 - }; - num_sqe - ] - .into_boxed_slice(), - ), + io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()), added: 0, num_sqes: ring_params.sq_entries as usize, in_flight: 0, @@ -204,7 +195,7 @@ impl URingContext { let index = (tail & self.submit_ring.ring_mask) as usize; let sqe = self.submit_queue_entries.get_mut(index).unwrap(); - f(sqe, &mut self.io_vecs[index]); + f(sqe, self.io_vecs[index].as_mut()); // Tells the kernel to use the new index when processing the entry at that index. self.submit_ring.set_array_entry(index, index as u32); @@ -293,7 +284,15 @@ impl URingContext { I: Iterator, { self.add_writev( - Pin::from(iovecs.collect::>().into_boxed_slice()), + Pin::from( + // Safe because the caller is required to guarantee that the memory pointed to by + // `iovecs` lives until the transaction is complete and the completion has been + // returned from `wait()`. + iovecs + .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len)) + .collect::>() + .into_boxed_slice(), + ), fd, offset, user_data, @@ -310,7 +309,7 @@ impl URingContext { /// The iovecs reference must be kept alive until the op returns. pub unsafe fn add_writev( &mut self, - iovecs: Pin>, + iovecs: Pin]>>, fd: RawFd, offset: u64, user_data: UserData, @@ -343,7 +342,15 @@ impl URingContext { I: Iterator, { self.add_readv( - Pin::from(iovecs.collect::>().into_boxed_slice()), + Pin::from( + // Safe because the caller is required to guarantee that the memory pointed to by + // `iovecs` lives until the transaction is complete and the completion has been + // returned from `wait()`. + iovecs + .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len)) + .collect::>() + .into_boxed_slice(), + ), fd, offset, user_data, @@ -360,7 +367,7 @@ impl URingContext { /// The iovecs reference must be kept alive until the op returns. pub unsafe fn add_readv( &mut self, - iovecs: Pin>, + iovecs: Pin]>>, fd: RawFd, offset: u64, user_data: UserData, @@ -564,7 +571,7 @@ struct SubmitQueueState { _mmap: MemoryMapping, pointers: QueuePointers, ring_mask: u32, - array: *mut u32, + array: AtomicPtr, } impl SubmitQueueState { @@ -579,7 +586,7 @@ impl SubmitQueueState { let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32; // This offset is guaranteed to be within the mmap so unwrap the result. let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap(); - let array = ptr.add(params.sq_off.array as usize) as *mut u32; + let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32); SubmitQueueState { _mmap: mmap, pointers: QueuePointers { head, tail }, @@ -593,7 +600,7 @@ impl SubmitQueueState { // Safe because self being constructed from the correct mmap guaratees that the memory is // valid to written. unsafe { - std::ptr::write_volatile(self.array.add(index), value as u32); + std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value as u32); } } } @@ -606,7 +613,7 @@ struct CompleteQueueState { completed: usize, //For ops that pass in arrays of iovecs, they need to be valid for the duration of the //operation because the kernel might read them at any time. - pending_op_addrs: BTreeMap>>, + pending_op_addrs: BTreeMap]>>>, } impl CompleteQueueState { @@ -628,7 +635,7 @@ impl CompleteQueueState { } } - fn add_op_data(&mut self, user_data: UserData, addrs: Pin>) { + fn add_op_data(&mut self, user_data: UserData, addrs: Pin]>>) { self.pending_op_addrs.insert(user_data, addrs); } @@ -691,6 +698,10 @@ struct QueuePointers { tail: *const AtomicU32, } +// Rust pointers don't implement Send but in this case both fields are atomics and so it's safe to +// send the pointers between threads. +unsafe impl Send for QueuePointers {} + impl QueuePointers { // Loads the tail pointer atomically with the given ordering. fn tail(&self, ordering: Ordering) -> u32 {