main.rs 5.77 KB
#[macro_use]
extern crate diesel;

mod error;
mod models;
mod routes;
mod schema;
mod uuid;

use routes::markdown::*;
use routes::other::*;
use routes::user::*;
use routes::upload::*;
use crate::uuid::Uuid;

use actix_web::{guard, web, App, HttpResponse, HttpServer};
use async_std::{ channel::{ Sender, bounded }
               , fs::File };
use diesel::r2d2::{self, ConnectionManager};
use diesel::SqliteConnection;
use futures::{ AsyncReadExt, AsyncSeekExt, FutureExt, StreamExt, select
             , stream::FuturesUnordered };
use listenfd::ListenFd;
use std::io::SeekFrom;
use std::sync::Arc;

pub(crate) type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;

#[derive(Clone)]
pub struct AppData {
    pub database_pool: Arc<Pool>,
    pub tx_upload_worker: Sender<String>,
}

async fn read_at( f   :&mut File
                , pos :SeekFrom
                , buf :&mut [u8]) -> std::io::Result<()> {
    f.seek(pos).await?;
    f.read_exact(buf).await
}

async fn get_sample( f   :&mut File
                   , buf :&mut [u8]) -> std::io::Result<()> {
    let file_len = f.metadata().await?.len();
    let chunk_size = buf.len() / 3;

    read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
    if file_len >= 2 * chunk_size as u64 {
        read_at( f
               , SeekFrom::End(-(chunk_size as i64))
               , &mut buf[2*chunk_size..]).await?;
    }
    if file_len >= 3 * chunk_size as u64 {
        read_at( f
               , SeekFrom::Start((file_len-chunk_size as u64) / 2)
               , &mut buf[chunk_size..2*chunk_size]).await?;
    }

    Ok(())
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let mut listenfd = ListenFd::from_env();

    dotenv::dotenv().ok();

    let (tx_upload_worker, rx_upload_worker) = bounded(32);

    let _upload_worker = actix_rt::spawn(async move {
        let mut workers = FuturesUnordered::new();

        loop {
            select! {
                filename = rx_upload_worker.recv().fuse() => {
                    match filename {
                        Err(_) => break,
                        Ok(filename) => workers.push(async move {
                            let mut f = File::open(&filename).await.unwrap();
                            let mut buf = vec!['.' as u8; 3 * 4096];
                            get_sample(&mut f, buf.as_mut()).await.unwrap();
                            println!( "[UPLOAD WORKER] filename: {}"
                                    , filename );
                            println!( "[UPLOAD WORKER] uuid: {}"
                                    , Uuid::get( "some.unique.namespace"
                                               , buf.as_mut() ) );
                        })
                    }
                },
                _result = workers.next() => {},
            }
        }

        while workers.len() > 0 {
            workers.next().await;
        }
    });

    let database_url = std::env::var("DATABASE_URL").expect("NOT FOUND");
    let database_pool = Pool::builder()
        .build(ConnectionManager::new(database_url))
        .unwrap();

    let database_pool = Arc::new(database_pool);
    let app_data = AppData { database_pool, tx_upload_worker };

    let server = HttpServer::new(move || {
        App::new() . data(app_data.clone())
                   . service(actix_files::Files::new("/static", "./static"))
                   . service( web::scope("/api/v0")
                            . service( web::resource("/upload")
                                     . route(web::post().to(upload))
                                     )
                            . service( web::resource("/markdowns")
                                     . route(web::get().to(get_markdowns))
                                     )
                            . service( web::resource("/markdowns/{id}")
                                     . route(web::get().to(get_markdown))
                                     . route(web::put().to(update_markdown))
                                     )
                            . service( web::resource("/markdowns/{id}/patches")
                                     . route(web::get().to(get_patches))
                                     )
                            . service( web::resource("/users")
                                     . route(web::get().to(get_users))
                                     . route(web::put().to(create_user))
                                     )
                            . service( web::resource("/users/{id}")
                                     . route(web::delete().to(delete_user))
                                     . route(web::get().to(get_user))
                                     . route(web::put().to(update_user))
                                     )
                            )
                   . service( web::scope("")
                            . route("/", web::get().to(root))
                            . route("/api.html", web::get().to(apidoc))
                            . route("/index", web::get().to(root))
                            . route("/index.html", web::get().to(root))
                            . route("/favicon", web::get().to(favicon))
                            . route("/favicon.ico", web::get().to(favicon))
                            )
                   . default_service( web::resource("")
                            . route( web::get().to(p404) )
                            . route( web::route()
                                   . guard( guard::Not(guard::Get()) )
                                   . to(HttpResponse::MethodNotAllowed)
                                   )
                            )
    });

    let server = match listenfd.take_tcp_listener(0).unwrap() {
        Some(l) => server.listen(l)?,
        None => server.bind("localhost:8080")?,
    };

    server.run().await
}