Commit ecfa39ac23e21b5d630bc689747747f503f942ef

Authored by Georg Hopp
1 parent 1667225a

improbe config and upload informations handling

  1 +CONFIG=artshop.toml
1 DATABASE_URL=crud.db 2 DATABASE_URL=crud.db
  1 +[database]
  2 +# url = "./var/lib/artshop/database"
  3 +
  4 +[locations]
  5 +upload = "/tmp/artshop/uploads"
  6 +images = "./var/lib/artshop/images"
@@ -22,8 +22,10 @@ flate2 = "^1.0" @@ -22,8 +22,10 @@ flate2 = "^1.0"
22 futures = "^0.3" 22 futures = "^0.3"
23 futures-util = { version = "0", features = ["std"] } 23 futures-util = { version = "0", features = ["std"] }
24 listenfd = "0.3" 24 listenfd = "0.3"
  25 +once_cell = "^1.9"
25 r2d2 = "0.8.9" 26 r2d2 = "0.8.9"
26 serde = { version = "^1.0", features = ["derive"] } 27 serde = { version = "^1.0", features = ["derive"] }
27 serde_derive = "1.0" 28 serde_derive = "1.0"
28 serde_json = "1.0" 29 serde_json = "1.0"
  30 +toml = "^0.5"
29 uuid = { version = "^0.8", features = ["v4", "v5"] } 31 uuid = { version = "^0.8", features = ["v4", "v5"] }
  1 +use std::fs::File;
  2 +use std::io::Read;
  3 +use once_cell::sync::Lazy;
  4 +use serde::Deserialize;
  5 +
  6 +#[derive(Debug, Deserialize)]
  7 +struct Database { url :Option<String> }
  8 +
  9 +#[derive(Debug, Deserialize)]
  10 +struct Locations { upload :String
  11 + , images :String }
  12 +
  13 +#[derive(Debug, Deserialize)]
  14 +pub(crate) struct Config { database :Database
  15 + , locations :Locations }
  16 +
  17 +pub(crate) static CONFIG :Lazy<Config> = Lazy::new(|| Config::load());
  18 +
  19 +impl Config {
  20 + pub fn load() -> Config {
  21 + let filename = std::env::var("CONFIG").unwrap();
  22 +
  23 + let mut buffer = vec![];
  24 + let mut file = File::open(filename).unwrap();
  25 +
  26 + file.read_to_end(&mut buffer).unwrap();
  27 + let mut config :Config = toml::from_slice(&buffer).unwrap();
  28 +
  29 + config.database.url = match config.database.url {
  30 + Some(url) => Some(url),
  31 + None => std::env::var("DATABASE_URL").ok()
  32 + };
  33 +
  34 + config
  35 + }
  36 +
  37 + pub fn upload_dir(&self) -> &str {
  38 + self.locations.upload.as_str()
  39 + }
  40 +
  41 + pub fn images_dir(&self) -> &str {
  42 + self.locations.images.as_str()
  43 + }
  44 +}
1 #[macro_use] 1 #[macro_use]
2 extern crate diesel; 2 extern crate diesel;
3 3
  4 +mod config;
4 mod error; 5 mod error;
5 mod models; 6 mod models;
6 mod routes; 7 mod routes;
7 mod schema; 8 mod schema;
8 mod uuid; 9 mod uuid;
9 -mod upload; 10 +mod upload_worker;
10 11
11 -use async_std::channel::Receiver;  
12 use models::image::Image; 12 use models::image::Image;
13 use routes::markdown::*; 13 use routes::markdown::*;
14 use routes::other::*; 14 use routes::other::*;
15 use routes::user::*; 15 use routes::user::*;
16 use routes::upload::*; 16 use routes::upload::*;
17 -use crate::upload::get_sample;  
18 -use crate::uuid::Uuid;  
19 17
20 use actix_web::{guard, web, App, HttpResponse, HttpServer}; 18 use actix_web::{guard, web, App, HttpResponse, HttpServer};
21 -use async_std::{ channel::{ Sender, bounded }  
22 - , fs::File }; 19 +use async_std::channel::Sender;
23 use diesel::r2d2::{self, ConnectionManager}; 20 use diesel::r2d2::{self, ConnectionManager};
24 use diesel::SqliteConnection; 21 use diesel::SqliteConnection;
25 -use futures::{ FutureExt, StreamExt, select  
26 - , stream::FuturesUnordered };  
27 use listenfd::ListenFd; 22 use listenfd::ListenFd;
28 -use std::convert::TryFrom;  
29 use std::sync::Arc; 23 use std::sync::Arc;
  24 +use std::ops::Deref;
30 25
31 pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>; 26 pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
32 27
@@ -42,39 +37,9 @@ async fn main() -> std::io::Result<()> { @@ -42,39 +37,9 @@ async fn main() -> std::io::Result<()> {
42 37
43 dotenv::dotenv().ok(); 38 dotenv::dotenv().ok();
44 39
45 - let (tx_upload_worker, rx_upload_worker)  
46 - : (Sender<Image>, Receiver<Image>) = bounded(32); 40 + println!("CONFIG: {:?}", config::CONFIG.deref());
47 41
48 - let _upload_worker = actix_rt::spawn(async move {  
49 - let mut workers = FuturesUnordered::new();  
50 -  
51 - loop {  
52 - select! {  
53 - filename = rx_upload_worker.recv().fuse() => {  
54 - match filename {  
55 - Err(_) => break,  
56 - Ok(upload) => workers.push(async move {  
57 - let upload_uuid = Uuid::try_from(upload.upload_uuid.unwrap()).unwrap();  
58 - let upload_filename = format!("/tmp/upload_{}", upload_uuid);  
59 - let mut f = File::open(&upload_filename).await.unwrap();  
60 - let mut buf = vec!['.' as u8; 3 * 4096];  
61 - get_sample(&mut f, buf.as_mut()).await.unwrap();  
62 - println!( "[UPLOAD WORKER] filename: {}"  
63 - , upload_filename );  
64 - println!( "[UPLOAD WORKER] uuid: {}"  
65 - , Uuid::get( "some.unique.namespace"  
66 - , buf.as_mut() ) );  
67 - })  
68 - }  
69 - },  
70 - _result = workers.next() => {},  
71 - }  
72 - }  
73 -  
74 - while workers.len() > 0 {  
75 - workers.next().await;  
76 - }  
77 - }); 42 + let tx_upload_worker = upload_worker::launch();
78 43
79 let database_url = std::env::var("DATABASE_URL").expect("NOT FOUND"); 44 let database_url = std::env::var("DATABASE_URL").expect("NOT FOUND");
80 let database_pool = Pool::builder() 45 let database_pool = Pool::builder()
@@ -37,6 +37,27 @@ pub struct Upload { @@ -37,6 +37,27 @@ pub struct Upload {
37 pub mime_type :String, 37 pub mime_type :String,
38 } 38 }
39 39
  40 +#[macro_export]
  41 +macro_rules! upload_uuid {
  42 + ($u:expr) => {
  43 + match &$u.upload_uuid {
  44 + Some(uuid) => Uuid::try_from(uuid.as_slice()).ok(),
  45 + None => None,
  46 + }
  47 + };
  48 +}
  49 +
  50 +#[macro_export]
  51 +macro_rules! upload_filename {
  52 + ($u:expr) => {
  53 + upload_uuid!($u).and_then(|uuid|
  54 + Some(format!( "{}/upload_{}"
  55 + , CONFIG.upload_dir()
  56 + , uuid)))
  57 + };
  58 +}
  59 +
  60 +
40 pub(crate) fn upload( pool: Arc<Pool> 61 pub(crate) fn upload( pool: Arc<Pool>
41 , item: Upload ) -> Result<Image> { 62 , item: Upload ) -> Result<Image> {
42 use crate::schema::images::dsl::*; 63 use crate::schema::images::dsl::*;
1 use actix_web::{Error, HttpResponse, web}; 1 use actix_web::{Error, HttpResponse, web};
2 use anyhow::Result; 2 use anyhow::Result;
  3 +use async_std::fs::DirBuilder;
3 use futures::stream::StreamExt; 4 use futures::stream::StreamExt;
4 use async_std::{fs::OpenOptions, io::WriteExt}; 5 use async_std::{fs::OpenOptions, io::WriteExt};
5 -use uuid::Uuid; 6 +use crate::uuid::Uuid;
6 7
7 -use crate::{AppData, models::image::{Upload, self}}; 8 +use crate::{AppData, models::image::{Upload, self}, upload_filename, upload_uuid};
  9 +use crate::config::CONFIG;
  10 +use std::convert::TryFrom;
8 11
9 pub async fn upload( app_data :web::Data<AppData> 12 pub async fn upload( app_data :web::Data<AppData>
10 , mut body :web::Payload 13 , mut body :web::Payload
@@ -13,8 +16,7 @@ pub async fn upload( app_data :web::Data<AppData> @@ -13,8 +16,7 @@ pub async fn upload( app_data :web::Data<AppData>
13 let pool = app_data.database_pool.clone(); 16 let pool = app_data.database_pool.clone();
14 let worker = app_data.tx_upload_worker.clone(); 17 let worker = app_data.tx_upload_worker.clone();
15 18
16 - let random_uuid = Uuid::new_v4();  
17 - let upload_uuid = Some(random_uuid.as_bytes().to_vec()); 19 + let upload_uuid = Some(uuid::Uuid::new_v4().as_bytes().to_vec());
18 let size = request.headers().get("content-length") 20 let size = request.headers().get("content-length")
19 . and_then(|h| Some(h.to_str().unwrap().parse::<i32>())) 21 . and_then(|h| Some(h.to_str().unwrap().parse::<i32>()))
20 . unwrap().unwrap(); 22 . unwrap().unwrap();
@@ -22,23 +24,26 @@ pub async fn upload( app_data :web::Data<AppData> @@ -22,23 +24,26 @@ pub async fn upload( app_data :web::Data<AppData>
22 . and_then(|h| Some(h.to_str().unwrap())) 24 . and_then(|h| Some(h.to_str().unwrap()))
23 . unwrap() ); 25 . unwrap() );
24 26
25 - let upload_filename = format!("/tmp/upload_{}", random_uuid); 27 + let upload = Upload {
  28 + upload_uuid,
  29 + size,
  30 + mime_type
  31 + };
  32 +
  33 + DirBuilder::new() . recursive(true)
  34 + . create(CONFIG.upload_dir())
  35 + . await?;
  36 +
  37 + let upload_filename = upload_filename!(upload).unwrap();
26 let mut output = OpenOptions::new(); 38 let mut output = OpenOptions::new();
27 let mut output = output 39 let mut output = output
28 . create(true) 40 . create(true)
29 . write(true) 41 . write(true)
30 . open(&upload_filename).await?; 42 . open(&upload_filename).await?;
31 -  
32 while let Some(item) = body.next().await { 43 while let Some(item) = body.next().await {
33 output.write_all(&item?).await?; 44 output.write_all(&item?).await?;
34 } 45 }
35 46
36 - let upload = Upload {  
37 - upload_uuid,  
38 - size,  
39 - mime_type  
40 - };  
41 -  
42 Ok( match web::block(move || image::upload(pool, upload)).await { 47 Ok( match web::block(move || image::upload(pool, upload)).await {
43 Ok(image) => { 48 Ok(image) => {
44 // TODO handle this as error response... 49 // TODO handle this as error response...
1 use std::io::SeekFrom; 1 use std::io::SeekFrom;
2 -use async_std::fs::File;  
3 -use futures::{AsyncSeekExt, AsyncReadExt}; 2 +use async_std::{fs::File, channel::{Sender, Receiver, bounded}};
  3 +use futures::{ AsyncSeekExt, AsyncReadExt, FutureExt, StreamExt, select
  4 + , stream::FuturesUnordered};
4 5
  6 +use crate::{models::image::Image, upload_filename, upload_uuid};
  7 +use crate::uuid::Uuid;
  8 +
  9 +use std::convert::TryFrom;
  10 +use crate::config::CONFIG;
  11 +
  12 +pub fn launch() -> Sender<Image> {
  13 + let (tx_upload_worker, rx_upload_worker)
  14 + : (Sender<Image>, Receiver<Image>) = bounded(32);
  15 +
  16 + actix_rt::spawn(async move {
  17 + let mut workers = FuturesUnordered::new();
  18 +
  19 + loop {
  20 + select! {
  21 + image = rx_upload_worker.recv().fuse() => {
  22 + match image {
  23 + Err(_) => break,
  24 + Ok(image) => workers.push(worker(image)),
  25 + }
  26 + },
  27 + _result = workers.next() => {},
  28 + }
  29 + }
  30 +
  31 + while workers.len() > 0 {
  32 + workers.next().await;
  33 + }
  34 + });
  35 +
  36 + tx_upload_worker
  37 +}
  38 +
  39 +
  40 +async fn worker(image :Image) {
  41 + let upload_filename = upload_filename!(image).unwrap();
  42 + let mut f = File::open(&upload_filename).await.unwrap();
  43 + let mut buf = vec!['.' as u8; 3 * 4096];
  44 + get_sample(&mut f, buf.as_mut()).await.unwrap();
  45 + println!( "[upload worker] filename: {}"
  46 + , upload_filename );
  47 + println!( "[upload worker] uuid: {}"
  48 + , Uuid::get( "some.unique.namespace"
  49 + , buf.as_mut() ) );
  50 +}
5 51
6 async fn read_at( f :&mut File 52 async fn read_at( f :&mut File
7 - , pos :SeekFrom  
8 - , buf :&mut [u8]) -> std::io::Result<()> { 53 + , pos :SeekFrom
  54 + , buf :&mut [u8]) -> std::io::Result<()> {
9 f.seek(pos).await?; 55 f.seek(pos).await?;
10 f.read_exact(buf).await 56 f.read_exact(buf).await
11 } 57 }
12 58
13 -pub async fn get_sample( f :&mut File 59 +async fn get_sample( f :&mut File
14 , buf :&mut [u8]) -> std::io::Result<()> { 60 , buf :&mut [u8]) -> std::io::Result<()> {
15 let file_len = f.metadata().await?.len(); 61 let file_len = f.metadata().await?.len();
16 let chunk_size = buf.len() / 3; 62 let chunk_size = buf.len() / 3;
@@ -23,10 +23,10 @@ impl Uuid { @@ -23,10 +23,10 @@ impl Uuid {
23 } 23 }
24 } 24 }
25 25
26 -impl TryFrom<Vec<u8>> for Uuid { 26 +impl TryFrom<&[u8]> for Uuid {
27 type Error = Error; 27 type Error = Error;
28 28
29 - fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {  
30 - Ok(Self(uuid::Uuid::from_slice(value.as_slice())?)) 29 + fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
  30 + Ok(Self(uuid::Uuid::from_slice(value)?))
31 } 31 }
32 } 32 }
Please register or login to post a comment