not working

This commit is contained in:
Changyuan Lyu 2022-12-11 16:47:37 -08:00
parent 8d5ad8bb59
commit 0b9d915021
13 changed files with 360 additions and 44 deletions

140
Cargo.lock generated
View file

@ -190,6 +190,15 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]]
name = "enum_primitive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180"
dependencies = [
"num-traits 0.1.43",
]
[[package]]
name = "errno"
version = "0.2.8"
@ -220,6 +229,18 @@ dependencies = [
"instant",
]
[[package]]
name = "filetime"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e884668cd0c7480504233e951174ddc3b382f7c2666e3b7310b5c4e7b0c37f9"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"windows-sys",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@ -347,7 +368,7 @@ dependencies = [
"indexmap",
"slab",
"tokio",
"tokio-util",
"tokio-util 0.7.4",
"tracing",
]
@ -546,9 +567,11 @@ dependencies = [
"nix 0.26.1",
"prost",
"prost-build",
"rust-9p",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util 0.7.4",
"tokio-vsock",
"tonic",
"tonic-build",
@ -563,6 +586,16 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f"
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
@ -665,6 +698,24 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "num-traits"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31"
dependencies = [
"num-traits 0.2.15",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.14.0"
@ -729,6 +780,29 @@ dependencies = [
"vsock 0.3.0",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@ -838,9 +912,9 @@ dependencies = [
[[package]]
name = "prost-build"
version = "0.11.3"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e330bf1316db56b12c2bcfa399e8edddd4821965ea25ddb2c134b610b1c1c604"
checksum = "276470f7f281b0ed53d2ae42dd52b4a8d08853a3c70e7fe95882acbb98a6ae94"
dependencies = [
"bytes 1.3.0",
"heck",
@ -953,11 +1027,30 @@ dependencies = [
"winapi",
]
[[package]]
name = "rust-9p"
version = "0.0.1"
dependencies = [
"async-trait",
"bitflags",
"byteorder",
"bytes 1.3.0",
"enum_primitive",
"filetime",
"futures",
"log",
"nix 0.26.1",
"num-traits 0.2.15",
"tokio",
"tokio-stream",
"tokio-util 0.6.10",
]
[[package]]
name = "rustix"
version = "0.36.4"
version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb93e85278e08bb5788653183213d3a60fc242b10cb9be96586f5a73dcb67c23"
checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588"
dependencies = [
"bitflags",
"errno",
@ -973,6 +1066,12 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.149"
@ -997,6 +1096,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "socket2"
version = "0.4.7"
@ -1081,9 +1186,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.22.0"
version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3"
checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
dependencies = [
"autocfg",
"bytes 1.3.0",
@ -1091,11 +1196,12 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
"windows-sys",
]
[[package]]
@ -1130,6 +1236,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507"
dependencies = [
"bytes 1.3.0",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.4"
@ -1180,7 +1300,7 @@ dependencies = [
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
"tokio-util 0.7.4",
"tower",
"tower-layer",
"tower-service",
@ -1215,7 +1335,7 @@ dependencies = [
"rand",
"slab",
"tokio",
"tokio-util",
"tokio-util 0.7.4",
"tower-layer",
"tower-service",
"tracing",

View file

@ -2,7 +2,8 @@
members = [
"crates/p9cpu",
"crates/p9cpud",
"crates/libp9cpu"
"crates/libp9cpu",
"crates/9p",
]
[profile.release]

View file

@ -32,7 +32,3 @@ tokio-stream = { version = "^0.1", features = ["fs"] }
bytes = "^1"
futures = "^0.3"
filetime = "^0"
[profile.release]
opt-level = 3
lto = true

View file

@ -27,4 +27,4 @@ pub use crate::error::Error;
pub use crate::fcall::*;
pub use crate::utils::Result;
pub mod unpfs;
pub mod unpfs;

View file

@ -323,7 +323,11 @@ where
Ok(response)
}
pub async fn dispatch<Fs, Reader, Writer>(filesystem: Fs, reader: Reader, writer: Writer) -> Result<()>
pub async fn dispatch<Fs, Reader, Writer>(
filesystem: Fs,
reader: Reader,
writer: Writer,
) -> Result<()>
where
Fs: 'static + Filesystem + Send + Sync,
Reader: 'static + AsyncRead + Send + std::marker::Unpin,

View file

@ -1,11 +1,11 @@
use {
async_trait::async_trait,
filetime::FileTime,
nix::libc::{O_CREAT, O_RDONLY, O_RDWR, O_TRUNC, O_WRONLY},
crate::{
srv::{srv_async, Fid, Filesystem},
*,
},
async_trait::async_trait,
filetime::FileTime,
nix::libc::{O_CREAT, O_RDONLY, O_RDWR, O_TRUNC, O_WRONLY},
std::{
io::SeekFrom,
os::unix::{fs::PermissionsExt, io::FromRawFd},
@ -52,7 +52,9 @@ pub struct Unpfs {
impl Unpfs {
pub fn new(root: &Path) -> Self {
Unpfs { realroot: root.to_path_buf() }
Unpfs {
realroot: root.to_path_buf(),
}
}
}
@ -398,7 +400,7 @@ impl Filesystem for Unpfs {
}
}
async fn unpfs_main(args: Vec<String>) -> crate::Result<i32> {
async fn _unpfs_main(args: Vec<String>) -> crate::Result<i32> {
if args.len() < 3 {
eprintln!("Usage: {} proto!address!port mountpoint", args[0]);
eprintln!(" where: proto = tcp | unix");
@ -421,11 +423,11 @@ async fn unpfs_main(args: Vec<String>) -> crate::Result<i32> {
.and(Ok(0))
}
async fn original_main() {
async fn _original_main() {
// env_logger::init();
let args = std::env::args().collect();
let exit_code = unpfs_main(args).await.unwrap_or_else(|e| {
let exit_code = _unpfs_main(args).await.unwrap_or_else(|e| {
eprintln!("Error: {:?}", e);
-1
});

View file

@ -23,6 +23,8 @@ libc = "0.2.138"
close_fds = "0.2"
tokio-vsock = { path = "/usr/local/google/home/changyuanl/tokio-vsock", version = "0.3.3", features = ["tonic-conn"] }
vsock = "0.3.0"
rust-9p = { path = "../9p", version = "*"}
tokio-util = { version = "0.7.4", features = ["io"] }
[build-dependencies]
tonic-build = "0.8.4"

View file

@ -1,14 +1,18 @@
use std::fmt::Debug;
use std::path::Path;
use std::pin::Pin;
use std::vec;
use crate::rpc;
use crate::Addr;
use crate::AsBytes;
use crate::P9cpuCommand;
use crate::{AsBytes, IntoByteVec};
use anyhow::Result;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use thiserror::Error;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::oneshot;
use tokio::{sync::mpsc, task::JoinHandle};
@ -51,6 +55,146 @@ pub trait ClientInnerT {
fn side_channel(&mut self) -> Self;
}
struct StreamReader<S> {
inner: S,
buffer: Vec<u8>,
consumed: usize,
}
impl<S> StreamReader<S> {
pub fn new(stream: S) -> Self {
StreamReader {
inner: stream,
buffer: vec![],
consumed: 0,
}
}
}
impl<'a, S, I> AsyncRead for StreamReader<S>
where
S: Stream<Item = I> + Unpin,
I: IntoByteVec,
{
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
if buf.remaining() == 0 {
return std::task::Poll::Ready(Ok(()));
}
loop {
if self.consumed < self.buffer.len() {
let remaining = self.buffer.len() - self.consumed;
let read_to = std::cmp::min(buf.remaining(), remaining) + self.consumed;
buf.put_slice(&self.buffer[self.consumed..read_to]);
self.consumed = read_to;
return std::task::Poll::Ready(Ok(()));
} else {
match Pin::new(&mut self.inner).poll_next(cx) {
std::task::Poll::Ready(Some(item)) => {
self.buffer = item.into_byte_vec();
self.consumed = 0;
if self.buffer.len() == 0 {
return std::task::Poll::Ready(Ok(()));
}
}
std::task::Poll::Ready(None) => {
return std::task::Poll::Ready(Ok(()));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
}
}
}
}
struct SenderWriter<Item> {
inner: Option<tokio_util::sync::PollSender<Item>>,
}
impl<Item> SenderWriter<Item>
where
Item: Send + 'static,
{
pub fn new(sender: mpsc::Sender<Item>) -> Self {
Self {
inner: Some(tokio_util::sync::PollSender::new(sender)),
}
}
}
impl<Item> AsyncWrite for SenderWriter<Item>
where
Item: From<Vec<u8>> + Send + 'static,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
if buf.len() == 0 {
return std::task::Poll::Ready(Ok(0));
}
let Some(inner )= self.inner.as_mut() else {
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Sender is down.",
)))};
match inner.poll_reserve(cx) {
std::task::Poll::Pending => return std::task::Poll::Pending,
std::task::Poll::Ready(Ok(())) => {}
std::task::Poll::Ready(Err(_)) => {
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Channel is closed.",
)))
}
};
let item = buf.to_vec().into();
match inner.send_item(item) {
Ok(()) => std::task::Poll::Ready(Ok(buf.len())),
Err(_) => std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Channel is closed.",
))),
}
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
if self.inner.is_some() {
std::task::Poll::Ready(Ok(()))
} else {
std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Sender is down.",
)))
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
// std::task::Poll::Ready(Ok(()))
match self.inner.take() {
Some(mut inner) => {
inner.close();
return std::task::Poll::Ready(Ok(()));
}
None => std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Sender is down.",
))),
}
}
}
struct SessionInfo<S> {
sid: S,
handles: Vec<JoinHandle<()>>,
@ -85,6 +229,9 @@ where
<<Inner as ClientInnerT>::OutStream as Stream>::Item: crate::AsBytes<'a> + Sync + Send,
<Inner as ClientInnerT>::InStreamItem: Send + 'static + From<Vec<u8>>,
<Inner as ClientInnerT>::SessionId: Clone + Debug + Sync + Send + 'static,
<Inner as ClientInnerT>::NinepInStreamItem: Send + 'static + From<Vec<u8>>,
<Inner as ClientInnerT>::NinepOutStream: Send + 'static + Stream + Unpin,
<<Inner as ClientInnerT>::NinepOutStream as Stream>::Item: IntoByteVec,
// <<Inner as ClientInnerT>::InStream as Stream>::Item: Sync + Send + 'static,
{
pub async fn new(mut inner: Inner) -> Result<P9cpuClient<Inner>> {
@ -97,7 +244,6 @@ where
if let Err(_e) = stdin_channel.stdin(sid, in_stream).await {}
}
});
// let (ninep_tx, mut ninep_rx) = mpsc::channel(buffer)
Ok(Self {
stdin_tx,
inner,
@ -112,7 +258,26 @@ where
let tty = command.tty;
let sid = self.inner.dial().await?;
if !command.namespace.is_empty() {
// letself.inner.ninep_forward(sid.clone(), stream);
let (ninep_tx, ninep_rx) = mpsc::channel(1);
// ninep_tx.send(<Inner as ClientInnerT>::NinepInStreamItem::from(vec![])).await;
let ninep_in_stream = ReceiverStream::from(ninep_rx);
let ninep_out_stream = self
.inner
.ninep_forward(sid.clone(), ninep_in_stream)
.await?;
println!("ninep forward established");
let reader = StreamReader::new(ninep_out_stream);
let writer = SenderWriter::new(ninep_tx);
// tokio_util::io::StreamReader::new(ninep_out_stream);
let root = Path::new("/");
tokio::spawn(async move {
if let Err(e) =
rs9p::srv::dispatch(rs9p::unpfs::Unpfs::new(&root), reader, writer).await
{
println!("rs9p error : {:?}", e);
}
});
}
self.inner.start(sid.clone(), command).await?;
let mut out_stream = self.inner.stdout(sid.clone()).await?;

View file

@ -1,7 +1,7 @@
pub mod client;
pub mod fstab;
pub mod rpc;
pub mod server;
pub mod fstab;
pub type P9cpuCommand = crate::rpc::P9cpuCommand;
@ -19,3 +19,7 @@ pub trait AsBytes<'a> {
pub trait FromVecu8 {
fn from_vec_u8(vec: Vec<u8>) -> Self;
}
pub trait IntoByteVec {
fn into_byte_vec(self) -> Vec<u8>;
}

View file

@ -4,7 +4,7 @@ use futures::Stream;
use tonic::Status;
use crate::{AsBytes, FromVecu8};
use crate::{AsBytes, FromVecu8, IntoByteVec};
pub mod rpc_client;
pub mod rpc_server;
@ -47,12 +47,36 @@ impl From<Vec<u8>> for P9cpuStdinRequest {
}
}
impl From<Vec<u8>> for NinepForwardRequest {
fn from(data: Vec<u8>) -> Self {
NinepForwardRequest { id: None, data }
}
}
impl FromVecu8 for Result<P9cpuBytes, Status> {
fn from_vec_u8(vec: Vec<u8>) -> Self {
Ok(P9cpuBytes { data: vec })
}
}
impl<T, E> IntoByteVec for Result<T, E>
where
T: IntoByteVec,
{
fn into_byte_vec(self) -> Vec<u8> {
match self {
Ok(inner) => inner.into_byte_vec(),
Err(_) => vec![],
}
}
}
impl IntoByteVec for P9cpuBytes {
fn into_byte_vec(self) -> Vec<u8> {
self.data
}
}
struct PrependedStream<I, S> {
stream: S,
item: Option<I>,

View file

@ -92,8 +92,7 @@ impl crate::client::ClientInnerT for RpcInner {
id: sid.into_bytes().into(),
cmd: Some(command),
};
self
.rpc_client
self.rpc_client
.start(req)
.await
.map_err(RpcInnerError::Rpc)?
@ -154,12 +153,12 @@ impl crate::client::ClientInnerT for RpcInner {
async fn ninep_forward(
&mut self,
sid: Self::SessionId,
mut in_stream: impl Stream<Item = Self::NinepInStreamItem> + Send + Sync + 'static + Unpin,
in_stream: impl Stream<Item = Self::NinepInStreamItem> + Send + Sync + 'static + Unpin,
) -> Result<Self::NinepOutStream, Self::Error> {
let Some(mut first_req) = in_stream.next().await else {
return Err(RpcInnerError::AlreadyStarted);
let first_req = crate::rpc::NinepForwardRequest {
id: Some(sid.into_bytes().into()),
data: vec![],
};
first_req.id = Some(sid.into_bytes().into());
let stream = PrependedStream {
stream: in_stream,
item: Some(first_req),

View file

@ -1,11 +1,6 @@
use std::{
collections::HashMap,
fmt::{format, Debug},
hash::Hash,
os::unix::prelude::FromRawFd,
pin::Pin,
process::Stdio,
sync::Arc,
collections::HashMap, fmt::Debug, hash::Hash, os::unix::prelude::FromRawFd, pin::Pin,
process::Stdio, sync::Arc,
};
use anyhow::Result;
@ -289,7 +284,11 @@ where
Ok(op(info))
}
fn make_cmd(&self, command: P9cpuCommand, ninep_port: Option<u16>) -> Result<(Command, Option<File>), P9cpuServerError> {
fn make_cmd(
&self,
command: P9cpuCommand,
ninep_port: Option<u16>,
) -> Result<(Command, Option<File>), P9cpuServerError> {
let mut cmd = Command::new(command.program);
cmd.args(command.args);
if let Some(tmp_mnt) = command.tmp_mnt {

View file

@ -1,4 +1,4 @@
use std::{collections::HashMap};
use std::collections::HashMap;
use libp9cpu::{fstab::FsTab, P9cpuCommand};