Skip to content

Commit 09a639b

Browse files
authored
Refactor main (#16)
* fix: update egui and eframe to 0.27.2 * fix: update opendal to 0.47.1
1 parent f77bc87 commit 09a639b

File tree

20 files changed

+1993
-662
lines changed

20 files changed

+1993
-662
lines changed

Cargo.lock

Lines changed: 1496 additions & 502 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ cc_ui = { path = "crates/cc_ui" }
6363
cc_storage = { path = "crates/cc_storage" }
6464
cc_runtime = { path = "crates/cc_runtime" }
6565
cc_files = { path = "crates/cc_files" }
66-
eframe = { version = "0.23.0", features = ["persistence"] }
67-
egui = "0.23.0"
68-
egui_extras = { version = "0.23.0", features = ["all_loaders"] }
66+
eframe = { version = "0.27.2", features = ["persistence"] }
67+
egui = "0.27.2"
68+
egui_extras = { version = "0.27.2", features = ["all_loaders"] }
6969
serde = { version = "1", features = ["derive"] }
7070
serde_json = "1.0"
7171
tracing = "0.1.29"

crates/cc_storage/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ anyhow = "1.0"
1111
tracing = { workspace = true }
1212
bytesize = "1.1.0"
1313
chrono = "0.4.24"
14-
opendal = { version = "0.44.2", default-features = false, features = [
14+
opendal = { version = "0.47.1", default-features = false, features = [
1515
"services-azblob",
1616
"services-gcs",
1717
"services-oss",

crates/cc_storage/src/client.rs

Lines changed: 80 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1+
use std::io::Write;
12
use std::path::PathBuf;
23
use std::sync::Arc;
34

45
use crate::config::ClientConfig;
6+
use crate::partial_file::PartialFile;
57
use crate::transfer::{TransferProgressInfo, TransferSender, TransferType};
68
use crate::types::{Bucket, ListObjects, ListObjectsV2Params, Object, Params};
79
use crate::util::get_name;
810
use crate::Result;
11+
use anyhow::Context;
912
use cc_core::ServiceType;
10-
use futures::StreamExt;
1113

1214
use crate::services;
13-
use opendal::{Metadata, Metakey, Operator};
14-
use tokio::{
15-
fs,
16-
io::{self, AsyncWriteExt as _},
15+
use crate::stream::{
16+
AsyncReadProgressExt, BoxedStreamingUploader, StreamingUploader, TrackableBodyStream,
1717
};
18-
19-
const DEFAULT_BUF_SIZE: usize = 8 * 1024 * 1024;
18+
use futures::{AsyncReadExt, StreamExt, TryStreamExt};
19+
use opendal::{Metadata, Metakey, Operator};
2020

2121
#[derive(Clone)]
2222
pub struct Client {
@@ -102,9 +102,9 @@ impl Client {
102102
pub async fn delete_object(&self, object: impl AsRef<str>) -> Result<bool> {
103103
let object = object.as_ref();
104104
self.operator.delete(object).await?;
105-
// let result = self.operator.is_exist(object).await?;
105+
let result = self.operator.is_exist(object).await?;
106106

107-
Ok(true)
107+
Ok(result)
108108
}
109109

110110
pub async fn delete_multi_object(self, obj: Vec<Object>) -> Result<bool> {
@@ -122,7 +122,6 @@ impl Client {
122122

123123
pub async fn list_v2(&self, query: ListObjectsV2Params) -> Result<ListObjects> {
124124
tracing::debug!("List object: {:?}", query);
125-
// let path = query.prefix.map_or("".into(), |x| format!("{x}/"));
126125
let mut path = query.prefix;
127126
if !path.is_empty() && !path.ends_with('/') {
128127
path.push('/');
@@ -199,66 +198,92 @@ impl Client {
199198
Ok((src.to_string(), is_move))
200199
}
201200

201+
fn streaming_upload(&self, path: &str) -> Result<BoxedStreamingUploader> {
202+
Ok(Box::new(StreamingUploader::new(
203+
self.operator.clone(),
204+
path.to_string(),
205+
)))
206+
}
207+
208+
async fn streaming_read(&self, path: &str, transfer: TransferSender) -> Result<Vec<u8>> {
209+
let reader = self.operator.reader(path).await?;
210+
211+
let size = self.meta_data(path).await?.content_length();
212+
let mut body = Vec::new();
213+
214+
let mut stream = reader
215+
.into_futures_async_read(0..)
216+
.await?
217+
.report_progress(|bytes_read| {
218+
transfer
219+
.send(TransferType::Download(
220+
path.to_string(),
221+
TransferProgressInfo {
222+
total_bytes: size,
223+
transferred_bytes: bytes_read as u64,
224+
},
225+
))
226+
.unwrap();
227+
});
228+
229+
stream
230+
.read_to_end(&mut body)
231+
.await
232+
.context("failed to read object content into buffer")?;
233+
234+
Ok(body)
235+
}
236+
202237
pub async fn download_file(
203238
&self,
204239
obj: &str,
205240
target: PathBuf,
206241
transfer: TransferSender,
207242
) -> Result<()> {
208-
let remote_op = self.operator.clone();
209-
let progress_tx = transfer.clone();
210-
let oid = obj.to_string();
211-
let total_bytes = self.meta_data(obj).await?.content_length();
212-
213-
tokio::spawn(async move {
214-
let _: Result<Option<String>> = async {
215-
fs::create_dir_all(target.parent().unwrap()).await?;
216-
let mut reader = remote_op.reader_with(&oid).buffer(DEFAULT_BUF_SIZE).await?;
217-
let mut writer = io::BufWriter::new(fs::File::create(&target).await?);
218-
copy_with_progress(
219-
"download",
220-
&progress_tx,
221-
&oid,
222-
total_bytes,
223-
&mut reader,
224-
&mut writer,
225-
)
226-
.await?;
227-
writer.shutdown().await?;
228-
Ok(Some(target.to_string_lossy().into()))
229-
}
230-
.await;
231-
});
243+
let mut new_file = PartialFile::create(&target)
244+
.with_context(|| format!("create `{}`", target.display()))?;
245+
246+
let content = self.streaming_read(obj, transfer).await?;
247+
248+
new_file
249+
.write_all(&content)
250+
.context("write content of file")?;
251+
new_file.finish().context("finish writing to new file")?;
232252

233253
Ok(())
234254
}
235255

236256
pub async fn put(&self, path: PathBuf, dest: &str, transfer: &TransferSender) -> Result<()> {
237257
let name = get_name(&path);
238258
let key = format!("{dest}{name}");
239-
let remote_op = self.operator.clone();
259+
260+
let mut body = TrackableBodyStream::try_from(path)
261+
.map_err(|e| {
262+
panic!("Could not open sample file: {e}");
263+
})
264+
.unwrap();
240265
let progress_tx = transfer.clone();
241-
let total_bytes = fs::metadata(&path).await?.len();
242-
243-
tokio::spawn(async move {
244-
let _: Result<Option<String>> = async {
245-
let mut reader = io::BufReader::new(fs::File::open(path).await?);
246-
let mut writer = remote_op.writer_with(&key).buffer(DEFAULT_BUF_SIZE).await?;
247-
copy_with_progress(
248-
"upload",
249-
&progress_tx,
250-
&key,
251-
total_bytes,
252-
&mut reader,
253-
&mut writer,
254-
)
255-
.await?;
256-
writer.close().await?;
257-
Ok(None)
258-
}
259-
.await;
260-
});
261266

267+
body.set_callback(
268+
&key,
269+
move |key: &str, tot_size: u64, sent: u64, _cur_buf: u64| {
270+
progress_tx
271+
.send(TransferType::Upload(
272+
key.to_string(),
273+
TransferProgressInfo {
274+
total_bytes: tot_size,
275+
transferred_bytes: sent,
276+
},
277+
))
278+
.unwrap();
279+
},
280+
);
281+
282+
let mut uploader = self.streaming_upload(&key)?;
283+
while let Ok(Some(bytes)) = body.try_next().await {
284+
uploader.write_bytes(bytes).await?;
285+
}
286+
uploader.finish().await?;
262287
// TODO: check if put success
263288

264289
Ok(())
@@ -331,53 +356,3 @@ impl ClientBuilder {
331356
Client::new(self.config)
332357
}
333358
}
334-
335-
async fn copy_with_progress<R, W>(
336-
tp: &str,
337-
progress_sender: &TransferSender,
338-
key: &str,
339-
total_bytes: u64,
340-
mut reader: R,
341-
mut writer: W,
342-
) -> io::Result<usize>
343-
where
344-
R: io::AsyncReadExt + Unpin,
345-
W: io::AsyncWriteExt + Unpin,
346-
{
347-
let mut bytes_so_far: usize = 0;
348-
let mut buf = vec![0; DEFAULT_BUF_SIZE];
349-
350-
loop {
351-
let bytes_since_last = reader.read(&mut buf).await?;
352-
if bytes_since_last == 0 {
353-
break;
354-
}
355-
writer.write_all(&buf[..bytes_since_last]).await?;
356-
bytes_so_far += bytes_since_last;
357-
let msg = if tp == "download" {
358-
TransferType::Download(
359-
key.to_string(),
360-
TransferProgressInfo {
361-
total_bytes: total_bytes as usize,
362-
transferred_bytes: bytes_so_far,
363-
},
364-
)
365-
} else {
366-
TransferType::Upload(
367-
key.to_string(),
368-
TransferProgressInfo {
369-
total_bytes: total_bytes as usize,
370-
transferred_bytes: bytes_so_far,
371-
},
372-
)
373-
};
374-
send_response(progress_sender, msg).await;
375-
}
376-
377-
Ok(bytes_so_far)
378-
}
379-
380-
async fn send_response(sender: &TransferSender, msg: TransferType) {
381-
// tracing::debug!("response: {}", &msg);
382-
sender.send(msg).unwrap();
383-
}

crates/cc_storage/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,5 @@ pub enum OSSError {
88
#[error("{0}")]
99
WithDescription(String),
1010
}
11+
12+
pub type ObjectResult<T> = std::result::Result<T, anyhow::Error>;

crates/cc_storage/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ pub type Result<T> = anyhow::Result<T>;
33
mod client;
44
mod config;
55
mod error;
6-
// mod partial_file;
6+
mod partial_file;
77
mod services;
8-
// mod stream;
8+
mod stream;
99
mod transfer;
1010
mod types;
1111
pub mod util;

0 commit comments

Comments
 (0)