Commit 1667225a0db723c110603ec3f5f822584acdff52

Authored by Georg Hopp
1 parent 149cddde

Create database entries on file upload

@@ -20,3 +20,15 @@ pub struct MarkdownDiffJson { @@ -20,3 +20,15 @@ pub struct MarkdownDiffJson {
20 pub id: i32, 20 pub id: i32,
21 pub date_created: String, 21 pub date_created: String,
22 } 22 }
  23 +
  24 +#[derive(Clone, Debug, Serialize, Deserialize)]
  25 +pub struct ImageJson {
  26 + pub upload_uuid :Option<Vec<u8>>,
  27 + pub uuid :Option<Vec<u8>>,
  28 + pub size :i32,
  29 + pub dim_x :Option<i32>,
  30 + pub dim_y :Option<i32>,
  31 + pub mime_type :String,
  32 + pub date_created :String,
  33 + pub date_updated :String
  34 +}
No preview for this file type
  1 +-- This file should undo anything in `up.sql`
  2 +DROP TABLE "images";
  1 +CREATE TABLE "images" (
  2 + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
  3 + -- identical uuid means identical file.
  4 + upload_uuid BLOB(16) UNIQUE,
  5 + uuid BLOB(16) UNIQUE,
  6 + size INTEGER NOT NULL,
  7 + dim_x INTEGER,
  8 + dim_y INTEGER,
  9 + mime_type VARCHAR(256) NOT NULL,
  10 + date_created TEXT NOT NULL,
  11 + date_updated TEXT NOT NULL
  12 +);
@@ -7,7 +7,7 @@ use r2d2; @@ -7,7 +7,7 @@ use r2d2;
7 type ParentError = Option<Pin<Box<dyn std::error::Error>>>; 7 type ParentError = Option<Pin<Box<dyn std::error::Error>>>;
8 8
9 #[derive(Debug)] 9 #[derive(Debug)]
10 -pub(crate) struct Error { 10 +pub struct Error {
11 source: ParentError, 11 source: ParentError,
12 message: String, 12 message: String,
13 } 13 }
@@ -77,3 +77,11 @@ impl From<ParsePatchError> for Error { @@ -77,3 +77,11 @@ impl From<ParsePatchError> for Error {
77 } 77 }
78 } 78 }
79 } 79 }
  80 +
  81 +impl From<uuid::Error> for Error {
  82 + fn from(source: uuid::Error) -> Self {
  83 + Self { source: Some(Box::pin(source))
  84 + , message: String::from("UUID error")
  85 + }
  86 + }
  87 +}
@@ -6,11 +6,15 @@ mod models; @@ -6,11 +6,15 @@ mod models;
6 mod routes; 6 mod routes;
7 mod schema; 7 mod schema;
8 mod uuid; 8 mod uuid;
  9 +mod upload;
9 10
  11 +use async_std::channel::Receiver;
  12 +use models::image::Image;
10 use routes::markdown::*; 13 use routes::markdown::*;
11 use routes::other::*; 14 use routes::other::*;
12 use routes::user::*; 15 use routes::user::*;
13 use routes::upload::*; 16 use routes::upload::*;
  17 +use crate::upload::get_sample;
14 use crate::uuid::Uuid; 18 use crate::uuid::Uuid;
15 19
16 use actix_web::{guard, web, App, HttpResponse, HttpServer}; 20 use actix_web::{guard, web, App, HttpResponse, HttpServer};
@@ -18,10 +22,10 @@ use async_std::{ channel::{ Sender, bounded } @@ -18,10 +22,10 @@ use async_std::{ channel::{ Sender, bounded }
18 , fs::File }; 22 , fs::File };
19 use diesel::r2d2::{self, ConnectionManager}; 23 use diesel::r2d2::{self, ConnectionManager};
20 use diesel::SqliteConnection; 24 use diesel::SqliteConnection;
21 -use futures::{ AsyncReadExt, AsyncSeekExt, FutureExt, StreamExt, select 25 +use futures::{ FutureExt, StreamExt, select
22 , stream::FuturesUnordered }; 26 , stream::FuturesUnordered };
23 use listenfd::ListenFd; 27 use listenfd::ListenFd;
24 -use std::io::SeekFrom; 28 +use std::convert::TryFrom;
25 use std::sync::Arc; 29 use std::sync::Arc;
26 30
27 pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>; 31 pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
@@ -29,34 +33,7 @@ pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>; @@ -29,34 +33,7 @@ pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
29 #[derive(Clone)] 33 #[derive(Clone)]
30 pub struct AppData { 34 pub struct AppData {
31 pub database_pool: Arc<Pool>, 35 pub database_pool: Arc<Pool>,
32 - pub tx_upload_worker: Sender<String>,  
33 -}  
34 -  
35 -async fn read_at( f :&mut File  
36 - , pos :SeekFrom  
37 - , buf :&mut [u8]) -> std::io::Result<()> {  
38 - f.seek(pos).await?;  
39 - f.read_exact(buf).await  
40 -}  
41 -  
42 -async fn get_sample( f :&mut File  
43 - , buf :&mut [u8]) -> std::io::Result<()> {  
44 - let file_len = f.metadata().await?.len();  
45 - let chunk_size = buf.len() / 3;  
46 -  
47 - read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;  
48 - if file_len >= 2 * chunk_size as u64 {  
49 - read_at( f  
50 - , SeekFrom::End(-(chunk_size as i64))  
51 - , &mut buf[2*chunk_size..]).await?;  
52 - }  
53 - if file_len >= 3 * chunk_size as u64 {  
54 - read_at( f  
55 - , SeekFrom::Start((file_len-chunk_size as u64) / 2)  
56 - , &mut buf[chunk_size..2*chunk_size]).await?;  
57 - }  
58 -  
59 - Ok(()) 36 + pub tx_upload_worker: Sender<Image>,
60 } 37 }
61 38
62 #[actix_rt::main] 39 #[actix_rt::main]
@@ -65,7 +42,8 @@ async fn main() -> std::io::Result<()> { @@ -65,7 +42,8 @@ async fn main() -> std::io::Result<()> {
65 42
66 dotenv::dotenv().ok(); 43 dotenv::dotenv().ok();
67 44
68 - let (tx_upload_worker, rx_upload_worker) = bounded(32); 45 + let (tx_upload_worker, rx_upload_worker)
  46 + : (Sender<Image>, Receiver<Image>) = bounded(32);
69 47
70 let _upload_worker = actix_rt::spawn(async move { 48 let _upload_worker = actix_rt::spawn(async move {
71 let mut workers = FuturesUnordered::new(); 49 let mut workers = FuturesUnordered::new();
@@ -75,12 +53,14 @@ async fn main() -> std::io::Result<()> { @@ -75,12 +53,14 @@ async fn main() -> std::io::Result<()> {
75 filename = rx_upload_worker.recv().fuse() => { 53 filename = rx_upload_worker.recv().fuse() => {
76 match filename { 54 match filename {
77 Err(_) => break, 55 Err(_) => break,
78 - Ok(filename) => workers.push(async move {  
79 - let mut f = File::open(&filename).await.unwrap(); 56 + Ok(upload) => workers.push(async move {
  57 + let upload_uuid = Uuid::try_from(upload.upload_uuid.unwrap()).unwrap();
  58 + let upload_filename = format!("/tmp/upload_{}", upload_uuid);
  59 + let mut f = File::open(&upload_filename).await.unwrap();
80 let mut buf = vec!['.' as u8; 3 * 4096]; 60 let mut buf = vec!['.' as u8; 3 * 4096];
81 get_sample(&mut f, buf.as_mut()).await.unwrap(); 61 get_sample(&mut f, buf.as_mut()).await.unwrap();
82 println!( "[UPLOAD WORKER] filename: {}" 62 println!( "[UPLOAD WORKER] filename: {}"
83 - , filename ); 63 + , upload_filename );
84 println!( "[UPLOAD WORKER] uuid: {}" 64 println!( "[UPLOAD WORKER] uuid: {}"
85 , Uuid::get( "some.unique.namespace" 65 , Uuid::get( "some.unique.namespace"
86 , buf.as_mut() ) ); 66 , buf.as_mut() ) );
  1 +use std::sync::Arc;
  2 +
  3 +use crate::error::*;
  4 +use crate::{schema::*, Pool};
  5 +use diesel::{Connection, insert_into};
  6 +use diesel::prelude::*;
  7 +use serde::{Deserialize, Serialize};
  8 +
  9 +#[derive(Clone, Debug, Serialize, Deserialize, Queryable, Identifiable)]
  10 +pub struct Image {
  11 + pub id :i32,
  12 + pub upload_uuid :Option<Vec<u8>>,
  13 + pub uuid :Option<Vec<u8>>,
  14 + pub size :i32,
  15 + pub dim_x :Option<i32>,
  16 + pub dim_y :Option<i32>,
  17 + pub mime_type :String,
  18 + pub date_created :String,
  19 + pub date_updated :String
  20 +}
  21 +
  22 +#[derive(Debug, Insertable)]
  23 +#[table_name = "images"]
  24 +pub struct ImageNew<'a> {
  25 + pub upload_uuid :Option<&'a [u8]>,
  26 + pub size :i32,
  27 + pub mime_type :&'a str,
  28 + pub date_created :&'a str,
  29 + pub date_updated :&'a str
  30 +}
  31 +
  32 +#[derive(Clone, Debug, Serialize, Deserialize, AsChangeset)]
  33 +#[table_name = "images"]
  34 +pub struct Upload {
  35 + pub upload_uuid :Option<Vec<u8>>,
  36 + pub size :i32,
  37 + pub mime_type :String,
  38 +}
  39 +
  40 +pub(crate) fn upload( pool: Arc<Pool>
  41 + , item: Upload ) -> Result<Image> {
  42 + use crate::schema::images::dsl::*;
  43 + let db_connection = pool.get()?;
  44 +
  45 + let now = chrono::Local::now().naive_local();
  46 + let new_image = ImageNew {
  47 + upload_uuid : item.upload_uuid.as_deref(),
  48 + size : item.size,
  49 + mime_type : &item.mime_type,
  50 + date_created : &format!("{}", now),
  51 + date_updated : &format!("{}", now)
  52 + };
  53 +
  54 + Ok(db_connection.transaction(|| {
  55 + insert_into(images) . values(&new_image)
  56 + . execute(&db_connection)?;
  57 + images . order(id.desc())
  58 + . first::<Image>(&db_connection)
  59 + })?)
  60 +}
1 -pub(crate) mod user; 1 +pub(crate) mod image;
2 pub(crate) mod markdown; 2 pub(crate) mod markdown;
  3 +pub(crate) mod user;
@@ -4,25 +4,47 @@ use futures::stream::StreamExt; @@ -4,25 +4,47 @@ use futures::stream::StreamExt;
4 use async_std::{fs::OpenOptions, io::WriteExt}; 4 use async_std::{fs::OpenOptions, io::WriteExt};
5 use uuid::Uuid; 5 use uuid::Uuid;
6 6
7 -use crate::AppData; 7 +use crate::{AppData, models::image::{Upload, self}};
8 8
9 -pub async fn upload( app_data: web::Data<AppData>  
10 - , mut body: web::Payload) -> Result<HttpResponse, Error> 9 +pub async fn upload( app_data :web::Data<AppData>
  10 + , mut body :web::Payload
  11 + , request :web::HttpRequest ) -> Result<HttpResponse, Error>
11 { 12 {
  13 + let pool = app_data.database_pool.clone();
12 let worker = app_data.tx_upload_worker.clone(); 14 let worker = app_data.tx_upload_worker.clone();
13 15
14 - let upload_filename = format!("/tmp/upload_{}", Uuid::new_v4()); 16 + let random_uuid = Uuid::new_v4();
  17 + let upload_uuid = Some(random_uuid.as_bytes().to_vec());
  18 + let size = request.headers().get("content-length")
  19 + . and_then(|h| Some(h.to_str().unwrap().parse::<i32>()))
  20 + . unwrap().unwrap();
  21 + let mime_type = String::from( request.headers().get("content-type")
  22 + . and_then(|h| Some(h.to_str().unwrap()))
  23 + . unwrap() );
  24 +
  25 + let upload_filename = format!("/tmp/upload_{}", random_uuid);
15 let mut output = OpenOptions::new(); 26 let mut output = OpenOptions::new();
16 - output . create(true)  
17 - . write(true);  
18 - let mut output = output.open(&upload_filename).await?; 27 + let mut output = output
  28 + . create(true)
  29 + . write(true)
  30 + . open(&upload_filename).await?;
19 31
20 while let Some(item) = body.next().await { 32 while let Some(item) = body.next().await {
21 output.write_all(&item?).await?; 33 output.write_all(&item?).await?;
22 } 34 }
23 35
24 - // TODO handle this as error response...  
25 - worker.send(upload_filename).await.unwrap(); 36 + let upload = Upload {
  37 + upload_uuid,
  38 + size,
  39 + mime_type
  40 + };
26 41
27 - Ok(HttpResponse::Ok().finish()) 42 + Ok( match web::block(move || image::upload(pool, upload)).await {
  43 + Ok(image) => {
  44 + // TODO handle this as error response...
  45 + worker.send(image.clone()).await.unwrap();
  46 + HttpResponse::Ok().json(image)
  47 + },
  48 + Err(_) => HttpResponse::InternalServerError().finish()
  49 + } )
28 } 50 }
1 table! { 1 table! {
  2 + images (id) {
  3 + id -> Integer,
  4 + upload_uuid -> Nullable<Binary>,
  5 + uuid -> Nullable<Binary>,
  6 + size -> Integer,
  7 + dim_x -> Nullable<Integer>,
  8 + dim_y -> Nullable<Integer>,
  9 + mime_type -> Text,
  10 + date_created -> Text,
  11 + date_updated -> Text,
  12 + }
  13 +}
  14 +
  15 +table! {
2 markdown_diffs (markdown_id, diff_id) { 16 markdown_diffs (markdown_id, diff_id) {
3 markdown_id -> Integer, 17 markdown_id -> Integer,
4 diff_id -> Integer, 18 diff_id -> Integer,
@@ -28,6 +42,7 @@ table! { @@ -28,6 +42,7 @@ table! {
28 } 42 }
29 43
30 allow_tables_to_appear_in_same_query!( 44 allow_tables_to_appear_in_same_query!(
  45 + images,
31 markdown_diffs, 46 markdown_diffs,
32 markdowns, 47 markdowns,
33 users, 48 users,
  1 +use std::io::SeekFrom;
  2 +use async_std::fs::File;
  3 +use futures::{AsyncSeekExt, AsyncReadExt};
  4 +
  5 +
  6 +async fn read_at( f :&mut File
  7 + , pos :SeekFrom
  8 + , buf :&mut [u8]) -> std::io::Result<()> {
  9 + f.seek(pos).await?;
  10 + f.read_exact(buf).await
  11 +}
  12 +
  13 +pub async fn get_sample( f :&mut File
  14 + , buf :&mut [u8]) -> std::io::Result<()> {
  15 + let file_len = f.metadata().await?.len();
  16 + let chunk_size = buf.len() / 3;
  17 +
  18 + read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
  19 + if file_len >= 2 * chunk_size as u64 {
  20 + read_at( f
  21 + , SeekFrom::End(-(chunk_size as i64))
  22 + , &mut buf[2*chunk_size..]).await?;
  23 + }
  24 + if file_len >= 3 * chunk_size as u64 {
  25 + read_at( f
  26 + , SeekFrom::Start((file_len-chunk_size as u64) / 2)
  27 + , &mut buf[chunk_size..2*chunk_size]).await?;
  28 + }
  29 +
  30 + Ok(())
  31 +}
1 -use std::fmt::Display; 1 +use std::{fmt::Display, convert::TryFrom};
  2 +
  3 +use crate::error::Error;
2 4
3 #[derive(Clone,Copy,Debug)] 5 #[derive(Clone,Copy,Debug)]
4 pub struct Uuid(pub uuid::Uuid); 6 pub struct Uuid(pub uuid::Uuid);
@@ -20,3 +22,11 @@ impl Uuid { @@ -20,3 +22,11 @@ impl Uuid {
20 Self(uuid::Uuid::new_v5(&ns!(ns), buf)) 22 Self(uuid::Uuid::new_v5(&ns!(ns), buf))
21 } 23 }
22 } 24 }
  25 +
  26 +impl TryFrom<Vec<u8>> for Uuid {
  27 + type Error = Error;
  28 +
  29 + fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
  30 + Ok(Self(uuid::Uuid::from_slice(value.as_slice())?))
  31 + }
  32 +}
Please register or login to post a comment