Commit 76eebfe80dd18350eacccc38c45e10fe2b441952

Authored by Georg Hopp
1 parent abe77b68

Added first image processing decoupled from the upload

@@ -20,9 +20,10 @@ diffy = "0.2" @@ -20,9 +20,10 @@ diffy = "0.2"
20 dotenv = "0.15.0" 20 dotenv = "0.15.0"
21 flate2 = "^1.0" 21 flate2 = "^1.0"
22 futures = "^0.3" 22 futures = "^0.3"
  23 +futures-util = { version = "0", features = ["std"] }
23 listenfd = "0.3" 24 listenfd = "0.3"
24 r2d2 = "0.8.9" 25 r2d2 = "0.8.9"
25 serde = { version = "^1.0", features = ["derive"] } 26 serde = { version = "^1.0", features = ["derive"] }
26 serde_derive = "1.0" 27 serde_derive = "1.0"
27 serde_json = "1.0" 28 serde_json = "1.0"
28 -uuid = { version = "^0.8", features = ["v4"] } 29 +uuid = { version = "^0.8", features = ["v4", "v5"] }
@@ -5,33 +5,94 @@ mod error; @@ -5,33 +5,94 @@ mod error;
5 mod models; 5 mod models;
6 mod routes; 6 mod routes;
7 mod schema; 7 mod schema;
  8 +mod uuid;
8 9
9 use crate::routes::markdown::*; 10 use crate::routes::markdown::*;
10 use crate::routes::other::*; 11 use crate::routes::other::*;
11 use crate::routes::user::*; 12 use crate::routes::user::*;
  13 +use crate::uuid::Uuid;
12 14
13 use actix_web::{guard, web, App, HttpResponse, HttpServer}; 15 use actix_web::{guard, web, App, HttpResponse, HttpServer};
  16 +use async_std::channel::Receiver;
  17 +use async_std::channel::Sender;
  18 +use async_std::channel::bounded;
  19 +use async_std::fs::File;
14 use diesel::r2d2::{self, ConnectionManager}; 20 use diesel::r2d2::{self, ConnectionManager};
15 use diesel::SqliteConnection; 21 use diesel::SqliteConnection;
  22 +use futures::AsyncReadExt;
  23 +use futures::AsyncSeekExt;
16 use listenfd::ListenFd; 24 use listenfd::ListenFd;
17 use routes::markdown::get_markdown; 25 use routes::markdown::get_markdown;
18 use routes::upload::upload; 26 use routes::upload::upload;
  27 +use std::io::SeekFrom;
  28 +use std::sync::Arc;
19 29
20 pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>; 30 pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
21 31
  32 +#[derive(Clone)]
  33 +pub struct AppData {
  34 + pub database_pool: Arc<Pool>,
  35 + pub tx_upload_worker: Sender<String>,
  36 +}
  37 +
  38 +async fn read_at( f :&mut File
  39 + , pos :SeekFrom
  40 + , buf :&mut [u8]) -> std::io::Result<()> {
  41 + f.seek(pos).await?;
  42 + f.read_exact(buf).await
  43 +}
  44 +
  45 +async fn get_sample( f :&mut File
  46 + , buf :&mut [u8]) -> std::io::Result<()> {
  47 + let file_len = f.metadata().await?.len();
  48 + let chunk_size = buf.len() / 3;
  49 +
  50 + read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
  51 + if file_len >= 2 * chunk_size as u64 {
  52 + read_at( f
  53 + , SeekFrom::End(-(chunk_size as i64))
  54 + , &mut buf[2*chunk_size..]).await?;
  55 + }
  56 + if file_len >= 3 * chunk_size as u64 {
  57 + read_at( f
  58 + , SeekFrom::Start((file_len-chunk_size as u64) / 2)
  59 + , &mut buf[chunk_size..2*chunk_size]).await?;
  60 + }
  61 +
  62 + Ok(())
  63 +}
  64 +
22 #[actix_rt::main] 65 #[actix_rt::main]
23 async fn main() -> std::io::Result<()> { 66 async fn main() -> std::io::Result<()> {
24 let mut listenfd = ListenFd::from_env(); 67 let mut listenfd = ListenFd::from_env();
25 68
26 dotenv::dotenv().ok(); 69 dotenv::dotenv().ok();
27 70
  71 + let ( tx_upload_worker
  72 + , rx_upload_worker ) :( Sender<String>
  73 + , Receiver<String> ) = bounded(32);
  74 +
  75 + let _upload_worker = actix_rt::spawn(async move {
  76 + while let Ok(filename) = rx_upload_worker.recv().await {
  77 + let mut f = File::open(&filename).await.unwrap();
  78 + let mut buf = vec!['.' as u8; 3 * 4096];
  79 + get_sample(&mut f, buf.as_mut()).await.unwrap();
  80 + println!("[UPLOAD WORKER] filename: {}", filename);
  81 + println!( "[UPLOAD WORKER] uuid: {}"
  82 + , Uuid::get("some.unique.namespace", buf.as_mut()) );
  83 + }
  84 + });
  85 +
28 let database_url = std::env::var("DATABASE_URL").expect("NOT FOUND"); 86 let database_url = std::env::var("DATABASE_URL").expect("NOT FOUND");
29 let database_pool = Pool::builder() 87 let database_pool = Pool::builder()
30 .build(ConnectionManager::new(database_url)) 88 .build(ConnectionManager::new(database_url))
31 .unwrap(); 89 .unwrap();
32 90
  91 + let database_pool = Arc::new(database_pool);
  92 + let app_data = AppData { database_pool, tx_upload_worker };
  93 +
33 let server = HttpServer::new(move || { 94 let server = HttpServer::new(move || {
34 - App::new() . data(database_pool.clone()) 95 + App::new() . data(app_data.clone())
35 . service(actix_files::Files::new("/static", "./static")) 96 . service(actix_files::Files::new("/static", "./static"))
36 . service( web::scope("/api/v0") 97 . service( web::scope("/api/v0")
37 . service( web::resource("/upload") 98 . service( web::resource("/upload")
1 -use crate::models::markdown;  
2 -use crate::Pool; 1 +use crate::{models::markdown, AppData};
3 2
4 use actix_web::{Error, HttpResponse, web}; 3 use actix_web::{Error, HttpResponse, web};
5 use anyhow::Result; 4 use anyhow::Result;
@@ -11,36 +10,40 @@ pub struct Patchset { @@ -11,36 +10,40 @@ pub struct Patchset {
11 patch: Option<i32>, 10 patch: Option<i32>,
12 } 11 }
13 12
14 -pub async fn get_markdowns(pool: web::Data<Pool>) 13 +pub async fn get_markdowns(app_data: web::Data<AppData>)
15 -> Result<HttpResponse, Error> 14 -> Result<HttpResponse, Error>
16 { 15 {
17 - Ok( web::block(move || markdown::get_markdowns(pool.into_inner())) 16 + let pool = app_data.database_pool.clone();
  17 +
  18 + Ok( web::block(move || markdown::get_markdowns(pool))
18 . await 19 . await
19 . map(|markdowns| HttpResponse::Ok().json(markdowns)) 20 . map(|markdowns| HttpResponse::Ok().json(markdowns))
20 . map_err(|_| HttpResponse::InternalServerError())? 21 . map_err(|_| HttpResponse::InternalServerError())?
21 ) 22 )
22 } 23 }
23 24
24 -pub async fn get_markdown( pool: web::Data<Pool> 25 +pub async fn get_markdown( app_data: web::Data<AppData>
25 , name: web::Path<String> 26 , name: web::Path<String>
26 , patch: web::Query<Patchset> 27 , patch: web::Query<Patchset>
27 ) -> Result<HttpResponse, Error> 28 ) -> Result<HttpResponse, Error>
28 { 29 {
29 - let pool = pool.into_inner(); 30 + let pool = app_data.database_pool.clone();
30 let name = name.into_inner(); 31 let name = name.into_inner();
31 let patch = patch.into_inner(); 32 let patch = patch.into_inner();
32 33
33 - Ok( web::block(move || markdown::get_markdown(pool, name.as_str(), patch.patch)) 34 + Ok( web::block(move || markdown::get_markdown( pool
  35 + , name.as_str()
  36 + , patch.patch) )
34 . await 37 . await
35 . map(|markdowns| HttpResponse::Ok().json(markdowns)) 38 . map(|markdowns| HttpResponse::Ok().json(markdowns))
36 . map_err(|_| HttpResponse::InternalServerError())? 39 . map_err(|_| HttpResponse::InternalServerError())?
37 ) 40 )
38 } 41 }
39 42
40 -pub async fn get_patches( pool: web::Data<Pool> 43 +pub async fn get_patches( app_data: web::Data<AppData>
41 , name: web::Path<String> 44 , name: web::Path<String>
42 ) -> Result<HttpResponse, Error> { 45 ) -> Result<HttpResponse, Error> {
43 - let pool = pool.into_inner(); 46 + let pool = app_data.database_pool.clone();
44 let name = name.into_inner(); 47 let name = name.into_inner();
45 48
46 Ok( web::block(move || markdown::get_patches(pool, name.as_str())) 49 Ok( web::block(move || markdown::get_patches(pool, name.as_str()))
@@ -50,12 +53,12 @@ pub async fn get_patches( pool: web::Data<Pool> @@ -50,12 +53,12 @@ pub async fn get_patches( pool: web::Data<Pool>
50 ) 53 )
51 } 54 }
52 55
53 -pub async fn update_markdown( pool: web::Data<Pool> 56 +pub async fn update_markdown( app_data: web::Data<AppData>
54 , name: web::Path<String> 57 , name: web::Path<String>
55 , item: web::Json<MarkdownJson> ) 58 , item: web::Json<MarkdownJson> )
56 -> Result<HttpResponse, Error> 59 -> Result<HttpResponse, Error>
57 { 60 {
58 - let pool = pool.into_inner(); 61 + let pool = app_data.database_pool.clone();
59 let name = name.into_inner(); 62 let name = name.into_inner();
60 let item = item.into_inner(); 63 let item = item.into_inner();
61 64
@@ -4,19 +4,25 @@ use futures::stream::StreamExt; @@ -4,19 +4,25 @@ use futures::stream::StreamExt;
4 use async_std::{fs::OpenOptions, io::WriteExt}; 4 use async_std::{fs::OpenOptions, io::WriteExt};
5 use uuid::Uuid; 5 use uuid::Uuid;
6 6
7 -pub async fn upload(mut body: web::Payload) -> Result<HttpResponse, Error> 7 +use crate::AppData;
  8 +
  9 +pub async fn upload( app_data: web::Data<AppData>
  10 + , mut body: web::Payload) -> Result<HttpResponse, Error>
8 { 11 {
  12 + let worker = app_data.tx_upload_worker.clone();
  13 +
  14 + let upload_filename = format!("/tmp/upload_{}", Uuid::new_v4());
9 let mut output = OpenOptions::new(); 15 let mut output = OpenOptions::new();
10 output . create(true) 16 output . create(true)
11 . write(true); 17 . write(true);
12 - let mut output = output  
13 - . open(format!("/tmp/upload_{}", Uuid::new_v4()))  
14 - . await  
15 - . unwrap(); 18 + let mut output = output.open(&upload_filename).await?;
16 19
17 while let Some(item) = body.next().await { 20 while let Some(item) = body.next().await {
18 - output.write_all(&item?).await.unwrap(); 21 + output.write_all(&item?).await?;
19 } 22 }
20 23
  24 + // TODO handle this as error response...
  25 + worker.send(upload_filename).await.unwrap();
  26 +
21 Ok(HttpResponse::Ok().finish()) 27 Ok(HttpResponse::Ok().finish())
22 } 28 }
1 use crate::models::user::{self, Action}; 1 use crate::models::user::{self, Action};
2 -use crate::Pool; 2 +use crate::AppData;
3 3
4 use actix_web::{Error, HttpResponse, web}; 4 use actix_web::{Error, HttpResponse, web};
5 use anyhow::Result; 5 use anyhow::Result;
6 6
7 -pub async fn create_user( pool: web::Data<Pool> 7 +pub async fn create_user( app_data: web::Data<AppData>
8 , item: web::Json<user::UserJson> ) 8 , item: web::Json<user::UserJson> )
9 -> Result<HttpResponse, Error> 9 -> Result<HttpResponse, Error>
10 { 10 {
11 - let pool = pool.into_inner(); 11 + let pool = app_data.database_pool.clone();
12 let item = item.into_inner(); 12 let item = item.into_inner();
13 13
14 Ok(web::block(move || user::create_user(pool, item)) 14 Ok(web::block(move || user::create_user(pool, item))
@@ -21,19 +21,21 @@ pub async fn create_user( pool: web::Data<Pool> @@ -21,19 +21,21 @@ pub async fn create_user( pool: web::Data<Pool>
21 . map_err(|_| HttpResponse::InternalServerError())?) 21 . map_err(|_| HttpResponse::InternalServerError())?)
22 } 22 }
23 23
24 -pub async fn get_users(pool: web::Data<Pool>) 24 +pub async fn get_users(app_data: web::Data<AppData>)
25 -> Result<HttpResponse, Error> 25 -> Result<HttpResponse, Error>
26 { 26 {
27 - Ok(web::block(move || user::get_users(pool.into_inner())) 27 + let pool = app_data.database_pool.clone();
  28 +
  29 + Ok(web::block(move || user::get_users(pool))
28 . await 30 . await
29 . map(|users| HttpResponse::Ok().json(users)) 31 . map(|users| HttpResponse::Ok().json(users))
30 . map_err(|_| HttpResponse::InternalServerError())?) 32 . map_err(|_| HttpResponse::InternalServerError())?)
31 } 33 }
32 34
33 -pub async fn get_user(pool: web::Data<Pool>, id: web::Path<i32>) 35 +pub async fn get_user(app_data: web::Data<AppData>, id: web::Path<i32>)
34 -> Result<HttpResponse, Error> 36 -> Result<HttpResponse, Error>
35 { 37 {
36 - let pool = pool.into_inner(); 38 + let pool = app_data.database_pool.clone();
37 let id = id.into_inner(); 39 let id = id.into_inner();
38 40
39 Ok(web::block(move || user::get_user(pool, id)) 41 Ok(web::block(move || user::get_user(pool, id))
@@ -42,10 +44,10 @@ pub async fn get_user(pool: web::Data<Pool>, id: web::Path<i32>) @@ -42,10 +44,10 @@ pub async fn get_user(pool: web::Data<Pool>, id: web::Path<i32>)
42 . map_err(|_| HttpResponse::InternalServerError())?) 44 . map_err(|_| HttpResponse::InternalServerError())?)
43 } 45 }
44 46
45 -pub async fn delete_user(pool: web::Data<Pool>, id: web::Path<i32>) 47 +pub async fn delete_user(app_data: web::Data<AppData>, id: web::Path<i32>)
46 -> Result<HttpResponse, Error> 48 -> Result<HttpResponse, Error>
47 { 49 {
48 - let pool = pool.into_inner(); 50 + let pool = app_data.database_pool.clone();
49 let id = id.into_inner(); 51 let id = id.into_inner();
50 52
51 Ok(web::block(move || user::delete_user(pool, id)) 53 Ok(web::block(move || user::delete_user(pool, id))
@@ -54,12 +56,12 @@ pub async fn delete_user(pool: web::Data<Pool>, id: web::Path<i32>) @@ -54,12 +56,12 @@ pub async fn delete_user(pool: web::Data<Pool>, id: web::Path<i32>)
54 . map_err(|_| HttpResponse::InternalServerError())?) 56 . map_err(|_| HttpResponse::InternalServerError())?)
55 } 57 }
56 58
57 -pub async fn update_user( pool: web::Data<Pool> 59 +pub async fn update_user( app_data: web::Data<AppData>
58 , id: web::Path<i32> 60 , id: web::Path<i32>
59 , item: web::Json<user::UserJson> ) 61 , item: web::Json<user::UserJson> )
60 -> Result<HttpResponse, Error> 62 -> Result<HttpResponse, Error>
61 { 63 {
62 - let pool = pool.into_inner(); 64 + let pool = app_data.database_pool.clone();
63 let id = id.into_inner(); 65 let id = id.into_inner();
64 let item = item.into_inner(); 66 let item = item.into_inner();
65 67
  1 +use std::fmt::Display;
  2 +
  3 +#[derive(Clone,Copy,Debug)]
  4 +pub struct Uuid(pub uuid::Uuid);
  5 +
  6 +impl Display for Uuid {
  7 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  8 + write!(f, "{}", self.0)
  9 + }
  10 +}
  11 +
  12 +macro_rules! ns {
  13 + ($n:expr) => {
  14 + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, $n.as_bytes())
  15 + }
  16 +}
  17 +
  18 +impl Uuid {
  19 + pub fn get(ns: &str, buf: &mut [u8]) -> Self {
  20 + Self(uuid::Uuid::new_v5(&ns!(ns), buf))
  21 + }
  22 +}
Please register or login to post a comment