1- use std:: io:: { Read , Write } ;
1+ use std:: io:: { self , Read , Write } ;
2+ use std:: pin:: Pin ;
23use std:: sync:: { Arc , Mutex } ;
4+ use std:: task:: { Context , Poll } ;
35
46use async_trait:: async_trait;
57use spin_factors:: anyhow;
6- use wasmtime_wasi :: p2 :: {
7- InputStream , OutputStream , Pollable , StdinStream , StdoutStream , StreamError ,
8- } ;
8+ use tokio :: io :: { AsyncRead , AsyncWrite } ;
9+ use wasmtime_wasi :: cli :: { IsTerminal , StdinStream , StdoutStream } ;
10+ use wasmtime_wasi :: p2 :: { InputStream , OutputStream , Pollable , StreamError } ;
911
1012/// A [`OutputStream`] that writes to a `Write` type.
1113///
@@ -54,16 +56,37 @@ impl<T: Write + Send + Sync + 'static> OutputStream for PipedWriteStream<T> {
5456 }
5557}
5658
57- impl < T : Write + Send + Sync + ' static > StdoutStream for PipedWriteStream < T > {
58- fn stream ( & self ) -> Box < dyn OutputStream > {
59- Box :: new ( self . clone ( ) )
59+ impl < T : Write + Send + Sync + ' static > AsyncWrite for PipedWriteStream < T > {
60+ fn poll_write (
61+ self : Pin < & mut Self > ,
62+ _cx : & mut Context < ' _ > ,
63+ buf : & [ u8 ] ,
64+ ) -> Poll < io:: Result < usize > > {
65+ Poll :: Ready ( self . 0 . lock ( ) . unwrap ( ) . write ( buf) )
66+ }
67+ fn poll_flush ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < io:: Result < ( ) > > {
68+ Poll :: Ready ( self . 0 . lock ( ) . unwrap ( ) . flush ( ) )
6069 }
70+ fn poll_shutdown ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < io:: Result < ( ) > > {
71+ Poll :: Ready ( Ok ( ( ) ) )
72+ }
73+ }
6174
62- fn isatty ( & self ) -> bool {
75+ impl < T > IsTerminal for PipedWriteStream < T > {
76+ fn is_terminal ( & self ) -> bool {
6377 false
6478 }
6579}
6680
81+ impl < T : Write + Send + Sync + ' static > StdoutStream for PipedWriteStream < T > {
82+ fn p2_stream ( & self ) -> Box < dyn OutputStream > {
83+ Box :: new ( self . clone ( ) )
84+ }
85+ fn async_stream ( & self ) -> Box < dyn AsyncWrite + Send + Sync > {
86+ Box :: new ( self . clone ( ) )
87+ }
88+ }
89+
6790#[ async_trait]
6891impl < T : Write + Send + Sync + ' static > Pollable for PipedWriteStream < T > {
6992 async fn ready ( & mut self ) { }
@@ -95,6 +118,12 @@ impl<T> Clone for PipeReadStream<T> {
95118 }
96119}
97120
121+ impl < T > IsTerminal for PipeReadStream < T > {
122+ fn is_terminal ( & self ) -> bool {
123+ false
124+ }
125+ }
126+
98127impl < T : Read + Send + Sync + ' static > InputStream for PipeReadStream < T > {
99128 fn read ( & mut self , size : usize ) -> wasmtime_wasi:: p2:: StreamResult < bytes:: Bytes > {
100129 let size = size. min ( self . buffer . len ( ) ) ;
@@ -113,17 +142,33 @@ impl<T: Read + Send + Sync + 'static> InputStream for PipeReadStream<T> {
113142 }
114143}
115144
145+ impl < T : Read + Send + Sync + ' static > AsyncRead for PipeReadStream < T > {
146+ fn poll_read (
147+ self : Pin < & mut Self > ,
148+ _cx : & mut Context < ' _ > ,
149+ buf : & mut tokio:: io:: ReadBuf < ' _ > ,
150+ ) -> Poll < io:: Result < ( ) > > {
151+ let result = self
152+ . inner
153+ . lock ( )
154+ . unwrap ( )
155+ . read ( buf. initialize_unfilled ( ) )
156+ . map ( |n| buf. advance ( n) ) ;
157+ Poll :: Ready ( result)
158+ }
159+ }
160+
116161#[ async_trait]
117162impl < T : Read + Send + Sync + ' static > Pollable for PipeReadStream < T > {
118163 async fn ready ( & mut self ) { }
119164}
120165
121166impl < T : Read + Send + Sync + ' static > StdinStream for PipeReadStream < T > {
122- fn stream ( & self ) -> Box < dyn InputStream > {
167+ fn p2_stream ( & self ) -> Box < dyn InputStream > {
123168 Box :: new ( self . clone ( ) )
124169 }
125170
126- fn isatty ( & self ) -> bool {
127- false
171+ fn async_stream ( & self ) -> Box < dyn AsyncRead + Send + Sync > {
172+ Box :: new ( self . clone ( ) )
128173 }
129174}
0 commit comments