devices: virtio: block: parallelize by multiple worker threads

Currently, virtio-blk does all operations serially, which makes every
operation wait for all preceding operations to complete.

This commit enables the virtio-blk device to run multiple worker threads
simultaneously. This allows the virtio-blk device to handle multiple
requests in parallel. For now, this feature is behind a runtime
command-line flag to make the feature swithable for experiments.

BUG=b:267716786
TEST=`cargo test -p devices` passes.
TEST=`tast run $DUT arc.AuthPerf.unmanaged_virtio_blk_vm` passes.
TEST=`tast run $DUT arc.AuthPerf.unmanaged_virtio_blk_vm` passes with
multiple workers enabled by hard-code change. Confirmed multiple
virtio_blk threads worked.
TEST=`crosvm run --block ...,multiple-workers=true` runs with multiple
virtio_blk threads

Change-Id: I089768afed3ea0f126c907d7ae62a1c55be1b829
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4272807
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Reviewed-by: Keiichi Watanabe <keiichiw@chromium.org>
Commit-Queue: Takaya Saeki <takayas@chromium.org>
This commit is contained in:
Takaya Saeki 2023-02-27 19:57:26 +09:00 committed by crosvm LUCI
parent 3e400e9293
commit af1a48da8a
8 changed files with 263 additions and 67 deletions

View file

@ -92,6 +92,7 @@ fuzz_target!(|bytes| {
false,
true,
512,
false,
None,
None,
None,

View file

@ -202,10 +202,17 @@ pub type BlockId = [u8; ID_LEN];
/// Tracks the state of an anynchronous disk.
pub struct DiskState {
pub disk_image: Box<dyn AsyncDisk>,
pub disk_size: Arc<AtomicU64>,
pub read_only: bool,
pub sparse: bool,
pub id: Option<BlockId>,
/// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
/// `worker_shared_state` holds the state shared by workers in Arc.
worker_shared_state: Arc<AsyncMutex<WorkerSharedState>>,
}
/// Disk state which can be modified by other worker threads
struct WorkerSharedState {
disk_size: Arc<AtomicU64>,
sparse: bool,
}
impl DiskState {
@ -219,10 +226,9 @@ impl DiskState {
) -> DiskState {
DiskState {
disk_image,
disk_size,
read_only,
sparse,
id,
worker_shared_state: Arc::new(AsyncMutex::new(WorkerSharedState { disk_size, sparse })),
}
}
}
@ -421,6 +427,9 @@ async fn resize(disk_state: Rc<AsyncMutex<DiskState>>, new_size: u64) -> DiskCon
// Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
// the state while resizing.
let mut disk_state = disk_state.lock().await;
// Prevent any other worker threads won't be able to do IO.
let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
let mut worker_shared_state = worker_shared_state.lock().await;
if disk_state.read_only {
error!("Attempted to resize read-only block device");
@ -440,10 +449,12 @@ async fn resize(disk_state: Rc<AsyncMutex<DiskState>>, new_size: u64) -> DiskCon
return DiskControlResult::Err(SysError::new(libc::EIO));
}
disk_state.sparse = false;
worker_shared_state.sparse = false;
if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
disk_state.disk_size.store(new_disk_size, Ordering::Release);
worker_shared_state
.disk_size
.store(new_disk_size, Ordering::Release);
}
DiskControlResult::Ok
}
@ -572,7 +583,9 @@ pub struct BlockAsync {
pub(crate) control_tube: Option<Tube>,
pub(crate) queue_sizes: Vec<u16>,
pub(crate) executor_kind: ExecutorKind,
worker_thread: Option<WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>>,
worker_threads: Vec<WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>>,
// Whether to run worker threads in parallel for each queue
worker_per_queue: bool,
}
impl BlockAsync {
@ -583,6 +596,7 @@ impl BlockAsync {
read_only: bool,
sparse: bool,
block_size: u32,
multiple_workers: bool,
id: Option<BlockId>,
control_tube: Option<Tube>,
queue_size: Option<u16>,
@ -633,7 +647,8 @@ impl BlockAsync {
block_size,
id,
queue_sizes,
worker_thread: None,
worker_threads: vec![],
worker_per_queue: multiple_workers,
control_tube,
executor_kind,
})
@ -675,8 +690,10 @@ impl BlockAsync {
flush_timer: Rc<RefCell<TimerAsync>>,
flush_timer_armed: Rc<RefCell<bool>>,
) -> result::Result<(), ExecuteError> {
// Acquire immutable access to disk_state to prevent the disk from being resized.
// Acquire immutable access to prevent tasks from resizing disk.
let disk_state = disk_state.read_lock().await;
// Acquire immutable access to prevent other worker threads from resizing disk.
let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
@ -706,7 +723,7 @@ impl BlockAsync {
}
}
let disk_size = disk_state.disk_size.load(Ordering::Relaxed);
let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
match req_type {
VIRTIO_BLK_T_IN => {
let data_len = writer.available_bytes();
@ -757,7 +774,7 @@ impl BlockAsync {
}
}
VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
if req_type == VIRTIO_BLK_T_DISCARD && !worker_shared_state.sparse {
// Discard is a hint; if this is a non-sparse disk, just ignore it.
return Ok(());
}
@ -902,62 +919,98 @@ impl VirtioDevice for BlockAsync {
) -> anyhow::Result<()> {
let read_only = self.read_only;
let sparse = self.sparse;
let disk_size = self.disk_size.clone();
let id = self.id.take();
let executor_kind = self.executor_kind;
let disk_image = self.disk_image.take().context("missing disk image")?;
let control_tube = self.control_tube.take();
let disk_image = self
.disk_image
.take()
.context("Failed to take a disk image")?;
self.worker_thread = Some(WorkerThread::start("virtio_blk", move |kill_evt| {
let ex =
Executor::with_executor_kind(executor_kind).expect("Failed to create an executor");
// If worker_per_queue is enabled and disk_image supports cloning, run workers in parallel.
let queues_per_worker = if self.worker_per_queue && disk_image.try_clone().is_ok() {
// 1 queue per 1 worker
queues
.into_iter()
.map(|queue| {
Ok((
vec![queue],
disk_image
.try_clone()
.context("Failed to clone a disk image")?,
))
})
.collect::<anyhow::Result<_>>()?
} else {
vec![(queues, disk_image)]
};
let async_control =
control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
let async_image = match disk_image.to_async_disk(&ex) {
Ok(d) => d,
Err(e) => panic!("Failed to create async disk {}", e),
};
let disk_state = Rc::new(AsyncMutex::new(DiskState {
disk_image: async_image,
disk_size,
read_only,
sparse,
id,
}));
if let Err(err_string) = run_worker(
ex,
interrupt,
queues,
mem,
&disk_state,
&async_control,
kill_evt,
) {
error!("{}", err_string);
}
let disk_state = match Rc::try_unwrap(disk_state) {
Ok(d) => d.into_inner(),
Err(_) => panic!("too many refs to the disk"),
};
(
disk_state.disk_image.into_inner(),
async_control.map(|c| c.into()),
)
let shared_state = Arc::new(AsyncMutex::new(WorkerSharedState {
disk_size: self.disk_size.clone(),
sparse,
}));
let mut worker_threads = vec![];
for (queues, disk_image) in queues_per_worker.into_iter() {
let mem = mem.clone();
let shared_state = Arc::clone(&shared_state);
let interrupt = interrupt.clone();
let control_tube = self.control_tube.take();
let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
let ex = Executor::with_executor_kind(executor_kind)
.expect("Failed to create an executor");
let async_control = control_tube
.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
let async_image = match disk_image.to_async_disk(&ex) {
Ok(d) => d,
Err(e) => panic!("Failed to create async disk {}", e),
};
let disk_state = Rc::new(AsyncMutex::new(DiskState {
disk_image: async_image,
read_only,
id,
worker_shared_state: shared_state,
}));
if let Err(err_string) = run_worker(
ex,
interrupt,
queues,
mem,
&disk_state,
&async_control,
kill_evt,
) {
error!("{}", err_string);
}
let disk_state = match Rc::try_unwrap(disk_state) {
Ok(d) => d.into_inner(),
Err(_) => panic!("too many refs to the disk"),
};
(
disk_state.disk_image.into_inner(),
async_control.map(Tube::from),
)
});
worker_threads.push(worker_thread);
}
self.worker_threads = worker_threads;
Ok(())
}
fn reset(&mut self) -> bool {
if let Some(worker_thread) = self.worker_thread.take() {
let mut success = false;
while let Some(worker_thread) = self.worker_threads.pop() {
let (disk_image, control_tube) = worker_thread.stop();
self.disk_image = Some(disk_image);
self.control_tube = control_tube;
return true;
if let Some(control_tube) = control_tube {
self.control_tube = Some(control_tube);
}
success = true;
}
false
success
}
}
@ -996,6 +1049,7 @@ mod tests {
true,
false,
512,
false,
None,
None,
None,
@ -1025,6 +1079,7 @@ mod tests {
true,
false,
4096,
false,
None,
None,
None,
@ -1054,6 +1109,7 @@ mod tests {
false,
true,
512,
false,
None,
None,
None,
@ -1077,6 +1133,7 @@ mod tests {
false,
false,
512,
false,
None,
None,
None,
@ -1100,6 +1157,7 @@ mod tests {
true,
true,
512,
false,
None,
None,
None,
@ -1129,6 +1187,7 @@ mod tests {
false,
true,
512,
false,
None,
None,
None,
@ -1149,6 +1208,7 @@ mod tests {
false,
false,
512,
false,
None,
None,
Some(128),
@ -1207,10 +1267,12 @@ mod tests {
let disk_state = Rc::new(AsyncMutex::new(DiskState {
disk_image: Box::new(af),
disk_size: Arc::new(AtomicU64::new(disk_size)),
read_only: false,
sparse: true,
id: None,
worker_shared_state: Arc::new(AsyncMutex::new(WorkerSharedState {
disk_size: Arc::new(AtomicU64::new(disk_size)),
sparse: true,
})),
}));
let fut = process_one_request(avail_desc, disk_state, flush_timer, flush_timer_armed, &mem);
@ -1268,10 +1330,12 @@ mod tests {
let flush_timer_armed = Rc::new(RefCell::new(false));
let disk_state = Rc::new(AsyncMutex::new(DiskState {
disk_image: Box::new(af),
disk_size: Arc::new(AtomicU64::new(disk_size)),
read_only: false,
sparse: true,
id: None,
worker_shared_state: Arc::new(AsyncMutex::new(WorkerSharedState {
disk_size: Arc::new(AtomicU64::new(disk_size)),
sparse: true,
})),
}));
let fut = process_one_request(avail_desc, disk_state, flush_timer, flush_timer_armed, &mem);
@ -1331,10 +1395,12 @@ mod tests {
let disk_state = Rc::new(AsyncMutex::new(DiskState {
disk_image: Box::new(af),
disk_size: Arc::new(AtomicU64::new(disk_size)),
read_only: false,
sparse: true,
id: Some(*id),
worker_shared_state: Arc::new(AsyncMutex::new(WorkerSharedState {
disk_size: Arc::new(AtomicU64::new(disk_size)),
sparse: true,
})),
}));
let fut = process_one_request(avail_desc, disk_state, flush_timer, flush_timer_armed, &mem);
@ -1355,10 +1421,23 @@ mod tests {
// TODO(b/270225199): enable this test on Windows once IoSourceExt::into_source is implemented
#[cfg(unix)]
#[test]
fn reset_and_reactivate() {
fn reset_and_reactivate_single_worker() {
reset_and_reactivate(false);
}
// TODO(b/270225199): enable this test on Windows once IoSourceExt::into_source is implemented
#[cfg(unix)]
#[test]
fn reset_and_reactivate_multiple_workers() {
reset_and_reactivate(true);
}
#[cfg(unix)]
fn reset_and_reactivate(enables_multiple_workers: bool) {
// Create an empty disk image
let f = tempfile().unwrap();
f.set_len(0x1000).unwrap();
let disk_image: Box<dyn DiskFile> = Box::new(f);
// Create an empty guest memory
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
@ -1368,10 +1447,11 @@ mod tests {
let features = base_features(ProtectionType::Unprotected);
let mut b = BlockAsync::new(
features,
Box::new(f),
disk_image.try_clone().unwrap(),
true,
false,
512,
enables_multiple_workers,
None,
Some(Tube::pair().unwrap().0),
None,
@ -1427,7 +1507,20 @@ mod tests {
// or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
#[cfg(unix)]
#[test]
fn resize() {
fn resize_with_single_worker() {
resize(false);
}
// TODO(b/270225199): enable this test on Windows once IoSourceExt::into_source is implemented,
// or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
#[cfg(unix)]
#[test]
fn resize_with_multiple_workers() {
// Test resize handled by one worker affect the whole state
resize(true);
}
fn resize(enables_multiple_workers: bool) {
// disk image size constants
let original_size = 0x1000;
let resized_size = 0x2000;
@ -1435,6 +1528,8 @@ mod tests {
// Create an empty disk image
let f = tempfile().unwrap();
f.set_len(original_size).unwrap();
let disk_image: Box<dyn DiskFile> = Box::new(f);
assert_eq!(disk_image.get_len().unwrap(), original_size);
// Create an empty guest memory
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
@ -1447,10 +1542,11 @@ mod tests {
let features = base_features(ProtectionType::Unprotected);
let mut b = BlockAsync::new(
features,
Box::new(f),
disk_image.try_clone().unwrap(),
false,
false,
512,
enables_multiple_workers,
None,
Some(control_tube_device),
None,
@ -1502,6 +1598,11 @@ mod tests {
resized_size,
"disk_size should be resized to the new size"
);
assert_eq!(
disk_image.get_len().unwrap(),
resized_size,
"underlying disk image should be resized to the new size"
);
let mut capacity = [0u8; 8];
b.read_config(0, &mut capacity);
assert_eq!(
@ -1512,9 +1613,9 @@ mod tests {
);
assert_eq!(
interrupt
.get_interrupt_evt()
// Wait a bit until the blk signals the interrupt
.wait_timeout(Duration::from_millis(300)),
.get_interrupt_evt()
// Wait a bit until the blk signals the interrupt
.wait_timeout(Duration::from_millis(300)),
Ok(base::EventWaitResult::Signaled),
"interrupt should be signaled"
);
@ -1524,4 +1625,70 @@ mod tests {
"INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
);
}
// TODO(b/270225199): enable this test on Windows once IoSourceExt::into_source is implemented,
// or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
#[cfg(unix)]
#[test]
fn run_worker_threads() {
// Create an empty duplicable disk image
let f = tempfile().unwrap();
f.set_len(0x1000).unwrap();
let disk_image: Box<dyn DiskFile> = Box::new(f);
// Create an empty guest memory
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
.expect("Creating guest memory failed.");
// Create a BlockAsync to test with single worker thread
let features = base_features(ProtectionType::Unprotected);
let mut b = BlockAsync::new(
features,
disk_image.try_clone().unwrap(),
true,
false,
512,
false, // run with single worker thread
None,
None,
None,
None,
None,
)
.unwrap();
// activate with queues of an arbitrary size.
b.activate(
mem.clone(),
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
vec![
(Queue::new(DEFAULT_QUEUE_SIZE), Event::new().unwrap()),
(Queue::new(DEFAULT_QUEUE_SIZE), Event::new().unwrap()),
],
)
.expect("activate should succeed");
assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
drop(b);
// Create a BlockAsync to test with multiple worker threads
let features = base_features(ProtectionType::Unprotected);
let mut b = BlockAsync::new(
features, disk_image, true, false, 512, true, None, None, None, None, None,
)
.unwrap();
// activate should succeed
b.activate(
mem,
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
vec![
(Queue::new(DEFAULT_QUEUE_SIZE), Event::new().unwrap()),
(Queue::new(DEFAULT_QUEUE_SIZE), Event::new().unwrap()),
],
)
.expect("activate should succeed");
assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
}
}

View file

@ -75,6 +75,10 @@ pub struct DiskOption {
alias = "io_concurrency"
)]
pub io_concurrency: NonZeroU32,
#[serde(default)]
/// Experimental option to run multiple worker threads in parallel. If false, only single thread
/// runs by default. Note this option is not effective for vhost-user blk device.
pub multiple_workers: bool,
#[serde(default, alias = "async_executor")]
/// The async executor kind to simulate the block device with. This option takes
/// precedence over the async executor kind specified by the subcommand's option.
@ -118,6 +122,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -136,6 +141,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -154,6 +160,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -172,6 +179,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -190,6 +198,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -206,6 +215,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -224,6 +234,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -242,6 +253,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -260,6 +272,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -279,6 +292,7 @@ mod tests {
async_executor: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
}
);
@ -297,6 +311,7 @@ mod tests {
block_size: 512,
id: None,
io_concurrency: NonZeroU32::new(4).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -316,6 +331,7 @@ mod tests {
id: Some(*b"DISK\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"),
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: None,
}
);
@ -347,6 +363,7 @@ mod tests {
id: None,
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: Some(ex_kind),
}
);
@ -369,6 +386,7 @@ mod tests {
id: Some(*b"DISK_LABEL\0\0\0\0\0\0\0\0\0\0"),
#[cfg(windows)]
io_concurrency: NonZeroU32::new(1).unwrap(),
multiple_workers: false,
async_executor: Some(ex_kind),
}
);

View file

@ -51,6 +51,7 @@ pub fn start_device(opts: Options) -> anyhow::Result<()> {
direct: false,
block_size: 512,
id: None,
multiple_workers: false,
async_executor: None,
};
@ -60,6 +61,7 @@ pub fn start_device(opts: Options) -> anyhow::Result<()> {
disk.read_only,
disk.sparse,
disk.block_size,
false,
None,
None,
None,

View file

@ -76,6 +76,7 @@ pub fn start_device(opts: Options) -> anyhow::Result<()> {
disk_option.read_only,
disk_option.sparse,
disk_option.block_size,
false,
None,
None,
None,

View file

@ -848,6 +848,10 @@ pub struct RunCommand {
/// async-executor=epoll|uring - set the async executor kind
/// to simulate the block device with. This takes
/// precedence over the global --async-executor option.
/// multiple-workers=BOOL - (Experimental) run multiple
/// worker threads in parallel. this option is not
/// effective for vhost-user blk device.
/// (default: false)
block: Vec<DiskOptionWithId>,
/// ratelimit enforced on detected bus locks in guest.

View file

@ -233,6 +233,7 @@ impl<'a> VirtioDeviceBuilder for DiskConfig<'a> {
self.disk.read_only,
self.disk.sparse,
self.disk.block_size,
self.disk.multiple_workers,
self.disk.id,
self.device_tube,
None,
@ -256,6 +257,7 @@ impl<'a> VirtioDeviceBuilder for DiskConfig<'a> {
disk.read_only,
disk.sparse,
disk.block_size,
false,
disk.id,
self.device_tube,
None,

View file

@ -264,6 +264,7 @@ fn create_block_device(cfg: &Config, disk: &DiskOption, disk_device_tube: Tube)
disk.read_only,
disk.sparse,
disk.block_size,
false,
disk.id,
Some(disk_device_tube),
None,