1
+ use std:: io:: Write ;
1
2
use std:: path:: PathBuf ;
2
3
use std:: sync:: Arc ;
3
4
4
5
use crate :: config:: ClientConfig ;
6
+ use crate :: partial_file:: PartialFile ;
5
7
use crate :: transfer:: { TransferProgressInfo , TransferSender , TransferType } ;
6
8
use crate :: types:: { Bucket , ListObjects , ListObjectsV2Params , Object , Params } ;
7
9
use crate :: util:: get_name;
8
10
use crate :: Result ;
11
+ use anyhow:: Context ;
9
12
use cc_core:: ServiceType ;
10
- use futures:: StreamExt ;
11
13
12
14
use crate :: services;
13
- use opendal:: { Metadata , Metakey , Operator } ;
14
- use tokio:: {
15
- fs,
16
- io:: { self , AsyncWriteExt as _} ,
15
+ use crate :: stream:: {
16
+ AsyncReadProgressExt , BoxedStreamingUploader , StreamingUploader , TrackableBodyStream ,
17
17
} ;
18
-
19
- const DEFAULT_BUF_SIZE : usize = 8 * 1024 * 1024 ;
18
+ use futures :: { AsyncReadExt , StreamExt , TryStreamExt } ;
19
+ use opendal :: { Metadata , Metakey , Operator } ;
20
20
21
21
#[ derive( Clone ) ]
22
22
pub struct Client {
@@ -102,9 +102,9 @@ impl Client {
102
102
pub async fn delete_object ( & self , object : impl AsRef < str > ) -> Result < bool > {
103
103
let object = object. as_ref ( ) ;
104
104
self . operator . delete ( object) . await ?;
105
- // let result = self.operator.is_exist(object).await?;
105
+ let result = self . operator . is_exist ( object) . await ?;
106
106
107
- Ok ( true )
107
+ Ok ( result )
108
108
}
109
109
110
110
pub async fn delete_multi_object ( self , obj : Vec < Object > ) -> Result < bool > {
@@ -122,7 +122,6 @@ impl Client {
122
122
123
123
pub async fn list_v2 ( & self , query : ListObjectsV2Params ) -> Result < ListObjects > {
124
124
tracing:: debug!( "List object: {:?}" , query) ;
125
- // let path = query.prefix.map_or("".into(), |x| format!("{x}/"));
126
125
let mut path = query. prefix ;
127
126
if !path. is_empty ( ) && !path. ends_with ( '/' ) {
128
127
path. push ( '/' ) ;
@@ -199,66 +198,92 @@ impl Client {
199
198
Ok ( ( src. to_string ( ) , is_move) )
200
199
}
201
200
201
+ fn streaming_upload ( & self , path : & str ) -> Result < BoxedStreamingUploader > {
202
+ Ok ( Box :: new ( StreamingUploader :: new (
203
+ self . operator . clone ( ) ,
204
+ path. to_string ( ) ,
205
+ ) ) )
206
+ }
207
+
208
+ async fn streaming_read ( & self , path : & str , transfer : TransferSender ) -> Result < Vec < u8 > > {
209
+ let reader = self . operator . reader ( path) . await ?;
210
+
211
+ let size = self . meta_data ( path) . await ?. content_length ( ) ;
212
+ let mut body = Vec :: new ( ) ;
213
+
214
+ let mut stream = reader
215
+ . into_futures_async_read ( 0 ..)
216
+ . await ?
217
+ . report_progress ( |bytes_read| {
218
+ transfer
219
+ . send ( TransferType :: Download (
220
+ path. to_string ( ) ,
221
+ TransferProgressInfo {
222
+ total_bytes : size,
223
+ transferred_bytes : bytes_read as u64 ,
224
+ } ,
225
+ ) )
226
+ . unwrap ( ) ;
227
+ } ) ;
228
+
229
+ stream
230
+ . read_to_end ( & mut body)
231
+ . await
232
+ . context ( "failed to read object content into buffer" ) ?;
233
+
234
+ Ok ( body)
235
+ }
236
+
202
237
pub async fn download_file (
203
238
& self ,
204
239
obj : & str ,
205
240
target : PathBuf ,
206
241
transfer : TransferSender ,
207
242
) -> Result < ( ) > {
208
- let remote_op = self . operator . clone ( ) ;
209
- let progress_tx = transfer. clone ( ) ;
210
- let oid = obj. to_string ( ) ;
211
- let total_bytes = self . meta_data ( obj) . await ?. content_length ( ) ;
212
-
213
- tokio:: spawn ( async move {
214
- let _: Result < Option < String > > = async {
215
- fs:: create_dir_all ( target. parent ( ) . unwrap ( ) ) . await ?;
216
- let mut reader = remote_op. reader_with ( & oid) . buffer ( DEFAULT_BUF_SIZE ) . await ?;
217
- let mut writer = io:: BufWriter :: new ( fs:: File :: create ( & target) . await ?) ;
218
- copy_with_progress (
219
- "download" ,
220
- & progress_tx,
221
- & oid,
222
- total_bytes,
223
- & mut reader,
224
- & mut writer,
225
- )
226
- . await ?;
227
- writer. shutdown ( ) . await ?;
228
- Ok ( Some ( target. to_string_lossy ( ) . into ( ) ) )
229
- }
230
- . await ;
231
- } ) ;
243
+ let mut new_file = PartialFile :: create ( & target)
244
+ . with_context ( || format ! ( "create `{}`" , target. display( ) ) ) ?;
245
+
246
+ let content = self . streaming_read ( obj, transfer) . await ?;
247
+
248
+ new_file
249
+ . write_all ( & content)
250
+ . context ( "write content of file" ) ?;
251
+ new_file. finish ( ) . context ( "finish writing to new file" ) ?;
232
252
233
253
Ok ( ( ) )
234
254
}
235
255
236
256
pub async fn put ( & self , path : PathBuf , dest : & str , transfer : & TransferSender ) -> Result < ( ) > {
237
257
let name = get_name ( & path) ;
238
258
let key = format ! ( "{dest}{name}" ) ;
239
- let remote_op = self . operator . clone ( ) ;
259
+
260
+ let mut body = TrackableBodyStream :: try_from ( path)
261
+ . map_err ( |e| {
262
+ panic ! ( "Could not open sample file: {e}" ) ;
263
+ } )
264
+ . unwrap ( ) ;
240
265
let progress_tx = transfer. clone ( ) ;
241
- let total_bytes = fs:: metadata ( & path) . await ?. len ( ) ;
242
-
243
- tokio:: spawn ( async move {
244
- let _: Result < Option < String > > = async {
245
- let mut reader = io:: BufReader :: new ( fs:: File :: open ( path) . await ?) ;
246
- let mut writer = remote_op. writer_with ( & key) . buffer ( DEFAULT_BUF_SIZE ) . await ?;
247
- copy_with_progress (
248
- "upload" ,
249
- & progress_tx,
250
- & key,
251
- total_bytes,
252
- & mut reader,
253
- & mut writer,
254
- )
255
- . await ?;
256
- writer. close ( ) . await ?;
257
- Ok ( None )
258
- }
259
- . await ;
260
- } ) ;
261
266
267
+ body. set_callback (
268
+ & key,
269
+ move |key : & str , tot_size : u64 , sent : u64 , _cur_buf : u64 | {
270
+ progress_tx
271
+ . send ( TransferType :: Upload (
272
+ key. to_string ( ) ,
273
+ TransferProgressInfo {
274
+ total_bytes : tot_size,
275
+ transferred_bytes : sent,
276
+ } ,
277
+ ) )
278
+ . unwrap ( ) ;
279
+ } ,
280
+ ) ;
281
+
282
+ let mut uploader = self . streaming_upload ( & key) ?;
283
+ while let Ok ( Some ( bytes) ) = body. try_next ( ) . await {
284
+ uploader. write_bytes ( bytes) . await ?;
285
+ }
286
+ uploader. finish ( ) . await ?;
262
287
// TODO: check if put success
263
288
264
289
Ok ( ( ) )
@@ -331,53 +356,3 @@ impl ClientBuilder {
331
356
Client :: new ( self . config )
332
357
}
333
358
}
334
-
335
- async fn copy_with_progress < R , W > (
336
- tp : & str ,
337
- progress_sender : & TransferSender ,
338
- key : & str ,
339
- total_bytes : u64 ,
340
- mut reader : R ,
341
- mut writer : W ,
342
- ) -> io:: Result < usize >
343
- where
344
- R : io:: AsyncReadExt + Unpin ,
345
- W : io:: AsyncWriteExt + Unpin ,
346
- {
347
- let mut bytes_so_far: usize = 0 ;
348
- let mut buf = vec ! [ 0 ; DEFAULT_BUF_SIZE ] ;
349
-
350
- loop {
351
- let bytes_since_last = reader. read ( & mut buf) . await ?;
352
- if bytes_since_last == 0 {
353
- break ;
354
- }
355
- writer. write_all ( & buf[ ..bytes_since_last] ) . await ?;
356
- bytes_so_far += bytes_since_last;
357
- let msg = if tp == "download" {
358
- TransferType :: Download (
359
- key. to_string ( ) ,
360
- TransferProgressInfo {
361
- total_bytes : total_bytes as usize ,
362
- transferred_bytes : bytes_so_far,
363
- } ,
364
- )
365
- } else {
366
- TransferType :: Upload (
367
- key. to_string ( ) ,
368
- TransferProgressInfo {
369
- total_bytes : total_bytes as usize ,
370
- transferred_bytes : bytes_so_far,
371
- } ,
372
- )
373
- } ;
374
- send_response ( progress_sender, msg) . await ;
375
- }
376
-
377
- Ok ( bytes_so_far)
378
- }
379
-
380
- async fn send_response ( sender : & TransferSender , msg : TransferType ) {
381
- // tracing::debug!("response: {}", &msg);
382
- sender. send ( msg) . unwrap ( ) ;
383
- }
0 commit comments