upload_worker.rs
3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::{io::{SeekFrom, ErrorKind}, sync::Arc};
use actix_web::web;
use async_std::{ fs::{File, DirBuilder, copy, metadata, remove_file}
, channel::{Sender, Receiver, bounded}
, path::PathBuf
, io::Result };
use futures::{ AsyncSeekExt, AsyncReadExt, FutureExt, StreamExt, select
, stream::FuturesUnordered};
use crate::{models::image::{Image, finalize}, upload_filename, config::CONFIG, Pool};
use crate::uuid::Uuid;
use std::convert::TryFrom;
use image::{io::Reader as ImageReader, GenericImageView};
pub fn launch() -> Sender<(Arc<Pool>, Image)> {
let (tx_upload_worker, rx_upload_worker)
: (Sender<(Arc<Pool>, Image)>, Receiver<(Arc<Pool>, Image)>) = bounded(32);
actix_rt::spawn(async move {
let mut workers = FuturesUnordered::new();
loop {
select! {
image = rx_upload_worker.recv().fuse() => {
match image {
Err(_) => break,
Ok((pool, image)) => workers.push(worker(pool, image)),
}
},
_result = workers.next() => {},
}
}
while workers.len() > 0 {
workers.next().await;
}
});
tx_upload_worker
}
async fn worker(pool :Arc<Pool>, mut image :Image) {
let upload_filename = upload_filename!(image).unwrap();
let mut f = File::open(&upload_filename).await.unwrap();
let mut buf = vec!['.' as u8; 3 * 3 * 4096];
get_sample(&mut f, buf.as_mut()).await.unwrap();
let uuid = Uuid::get(CONFIG.namespace(), buf.as_mut());
let uuid_string = format!("{}", uuid);
let mut image_path = PathBuf::from(CONFIG.images_dir());
image_path.push(&uuid_string.as_str()[..2]);
image_path.push(&uuid_string.as_str()[..5]);
DirBuilder::new() . recursive(true)
. create(&image_path)
. await
. unwrap();
image_path.push(&uuid_string);
image.upload_uuid = None;
image.uuid = Some(uuid.0.as_bytes().to_vec());
match metadata(&image_path).await {
Err(e) if e.kind() == ErrorKind::NotFound => {
copy(&upload_filename, &image_path).await.unwrap();
let img = ImageReader::open(&image_path).unwrap()
. with_guessed_format().unwrap()
. decode().unwrap();
let (dim_x, dim_y) = img.dimensions();
image.dim_x = Some(dim_x as i32);
image.dim_y = Some(dim_y as i32);
},
Err(e) => {
let e :Result<()> = Err(e);
e.unwrap();
},
Ok(_) => {},
}
remove_file(&upload_filename).await.unwrap();
web::block(move || finalize(pool, image)).await.unwrap();
}
async fn read_at( f :&mut File
, pos :SeekFrom
, buf :&mut [u8]) -> std::io::Result<()> {
f.seek(pos).await?;
f.read_exact(buf).await
}
async fn get_sample( f :&mut File
, buf :&mut [u8]) -> std::io::Result<()> {
let file_len = f.metadata().await?.len();
let chunk_size = buf.len() / 3;
read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
if file_len >= 2 * chunk_size as u64 {
read_at( f
, SeekFrom::End(-(chunk_size as i64))
, &mut buf[2*chunk_size..]).await?;
}
if file_len >= 3 * chunk_size as u64 {
read_at( f
, SeekFrom::Start((file_len-chunk_size as u64) / 2)
, &mut buf[chunk_size..2*chunk_size]).await?;
}
Ok(())
}