1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use std:: sync:: Arc ;
18+ use std:: { fmt , sync:: Arc } ;
1919
20+ use async_trait:: async_trait;
2021use datafusion:: execution:: object_store:: ObjectStoreRegistry ;
21- use object_store:: ObjectStore ;
22+ use futures:: stream:: BoxStream ;
23+ use object_store:: {
24+ path:: Path , GetOptions , GetResult , ListResult , MultipartUpload , ObjectMeta ,
25+ ObjectStore , PutMultipartOptions , PutOptions , PutPayload , PutResult , Result ,
26+ } ;
2227use url:: Url ;
2328
24- /// Provides access to wrapped [`ObjectStore`] instances that record requests for reporting
29+ /// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
30+ /// inner [`ObjectStore`]
31+ #[ derive( Debug ) ]
32+ struct InstrumentedObjectStore {
33+ inner : Arc < dyn ObjectStore > ,
34+ }
35+
36+ impl InstrumentedObjectStore {
37+ /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
38+ fn new ( object_store : Arc < dyn ObjectStore > ) -> Self {
39+ Self {
40+ inner : object_store,
41+ }
42+ }
43+ }
44+
45+ impl fmt:: Display for InstrumentedObjectStore {
46+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
47+ write ! ( f, "Instrumented Object Store: {}" , self . inner)
48+ }
49+ }
50+
51+ #[ async_trait]
52+ impl ObjectStore for InstrumentedObjectStore {
53+ async fn put_opts (
54+ & self ,
55+ location : & Path ,
56+ payload : PutPayload ,
57+ opts : PutOptions ,
58+ ) -> Result < PutResult > {
59+ self . inner . put_opts ( location, payload, opts) . await
60+ }
61+
62+ async fn put_multipart_opts (
63+ & self ,
64+ location : & Path ,
65+ opts : PutMultipartOptions ,
66+ ) -> Result < Box < dyn MultipartUpload > > {
67+ self . inner . put_multipart_opts ( location, opts) . await
68+ }
69+
70+ async fn get_opts ( & self , location : & Path , options : GetOptions ) -> Result < GetResult > {
71+ self . inner . get_opts ( location, options) . await
72+ }
73+
74+ async fn delete ( & self , location : & Path ) -> Result < ( ) > {
75+ self . inner . delete ( location) . await
76+ }
77+
78+ fn list ( & self , prefix : Option < & Path > ) -> BoxStream < ' static , Result < ObjectMeta > > {
79+ self . inner . list ( prefix)
80+ }
81+
82+ async fn list_with_delimiter ( & self , prefix : Option < & Path > ) -> Result < ListResult > {
83+ self . inner . list_with_delimiter ( prefix) . await
84+ }
85+
86+ async fn copy ( & self , from : & Path , to : & Path ) -> Result < ( ) > {
87+ self . inner . copy ( from, to) . await
88+ }
89+
90+ async fn copy_if_not_exists ( & self , from : & Path , to : & Path ) -> Result < ( ) > {
91+ self . inner . copy_if_not_exists ( from, to) . await
92+ }
93+
94+ async fn head ( & self , location : & Path ) -> Result < ObjectMeta > {
95+ self . inner . head ( location) . await
96+ }
97+ }
98+
99+ /// Provides access to [`ObjectStore`] instances that record requests for reporting
25100#[ derive( Debug ) ]
26101pub struct InstrumentedObjectStoreRegistry {
27102 inner : Arc < dyn ObjectStoreRegistry > ,
@@ -41,7 +116,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
41116 url : & Url ,
42117 store : Arc < dyn ObjectStore > ,
43118 ) -> Option < Arc < dyn ObjectStore > > {
44- self . inner . register_store ( url, store)
119+ let instrumented = Arc :: new ( InstrumentedObjectStore :: new ( store) ) ;
120+ self . inner . register_store ( url, instrumented)
45121 }
46122
47123 fn get_store ( & self , url : & Url ) -> datafusion:: common:: Result < Arc < dyn ObjectStore > > {
0 commit comments