upload_worker.rs
2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use std::io::SeekFrom;
use async_std::{fs::File, channel::{Sender, Receiver, bounded}};
use futures::{ AsyncSeekExt, AsyncReadExt, FutureExt, StreamExt, select
, stream::FuturesUnordered};
use crate::{models::image::Image, upload_filename, upload_uuid};
use crate::uuid::Uuid;
use std::convert::TryFrom;
use crate::config::CONFIG;
pub fn launch() -> Sender<Image> {
let (tx_upload_worker, rx_upload_worker)
: (Sender<Image>, Receiver<Image>) = bounded(32);
actix_rt::spawn(async move {
let mut workers = FuturesUnordered::new();
loop {
select! {
image = rx_upload_worker.recv().fuse() => {
match image {
Err(_) => break,
Ok(image) => workers.push(worker(image)),
}
},
_result = workers.next() => {},
}
}
while workers.len() > 0 {
workers.next().await;
}
});
tx_upload_worker
}
async fn worker(image :Image) {
let upload_filename = upload_filename!(image).unwrap();
let mut f = File::open(&upload_filename).await.unwrap();
let mut buf = vec!['.' as u8; 3 * 4096];
get_sample(&mut f, buf.as_mut()).await.unwrap();
println!( "[upload worker] filename: {}"
, upload_filename );
println!( "[upload worker] uuid: {}"
, Uuid::get( "some.unique.namespace"
, buf.as_mut() ) );
}
async fn read_at( f :&mut File
, pos :SeekFrom
, buf :&mut [u8]) -> std::io::Result<()> {
f.seek(pos).await?;
f.read_exact(buf).await
}
async fn get_sample( f :&mut File
, buf :&mut [u8]) -> std::io::Result<()> {
let file_len = f.metadata().await?.len();
let chunk_size = buf.len() / 3;
read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
if file_len >= 2 * chunk_size as u64 {
read_at( f
, SeekFrom::End(-(chunk_size as i64))
, &mut buf[2*chunk_size..]).await?;
}
if file_len >= 3 * chunk_size as u64 {
read_at( f
, SeekFrom::Start((file_len-chunk_size as u64) / 2)
, &mut buf[chunk_size..2*chunk_size]).await?;
}
Ok(())
}