upload_worker.rs 4.77 KB
use std::{ io::{SeekFrom, ErrorKind}
         , sync::Arc
         , convert::TryFrom };
use actix_web::web;
use async_std::{ fs::{File, DirBuilder, copy, metadata, remove_file}
               , channel::{Sender, Receiver, bounded}
               , path::PathBuf
               , io::Result, task::spawn_blocking };
use futures::{ AsyncSeekExt, AsyncReadExt, FutureExt, StreamExt, select
             , stream::FuturesUnordered};

use crate::{ models::image::{Image, finalize}
           , upload_filename
           , config::CONFIG
           , Pool
           , routes::image::Size
           , uuid::Uuid };

use image::{ io::Reader as ImageReader
           , GenericImageView
           , imageops::FilterType::Lanczos3
           , ImageFormat::Jpeg };

pub fn launch() -> Sender<(Arc<Pool>, Image)> {
    let (tx_upload_worker, rx_upload_worker)
        : (Sender<(Arc<Pool>, Image)>, Receiver<(Arc<Pool>, Image)>) = bounded(32);

    actix_rt::spawn(async move {
        let mut workers = FuturesUnordered::new();

        loop {
            select! {
                image = rx_upload_worker.recv().fuse() => {
                    match image {
                        Err(_) => break,
                        Ok((pool, image)) => workers.push(worker(pool, image)),
                    }
                },
                _result = workers.next() => {},
            }
        }

        while workers.len() > 0 {
            workers.next().await;
        }
    });

    tx_upload_worker
}


async fn worker(pool :Arc<Pool>, mut image :Image) {
    let upload_filename = upload_filename!(image).unwrap();
    let mut f = File::open(&upload_filename).await.unwrap();

    let mut buf = vec!['.' as u8; 3 * 3 * 4096];
    get_sample(&mut f, buf.as_mut()).await.unwrap();
    let uuid = Uuid::get(CONFIG.namespace(), buf.as_mut());
    let uuid_string = format!("{}", uuid);

    let image_path = PathBuf::from(CONFIG.images_dir());
    let mut orig_path = image_path.clone();
    let mut large_path = image_path.clone();
    let mut medium_path = image_path.clone();
    let mut small_path = image_path.clone();

    macro_rules! prepare {
        ($p:expr, $n:expr) => (
            $p.push(&uuid_string.as_str()[..1]);
            $p.push(&uuid_string.as_str()[..2]);
            $p.push(&uuid_string.as_str()[..3]);

            DirBuilder::new() . recursive(true)
                              . create(&$p)
                              . await
                              . unwrap();

            $p.push(&format!("{}_{}", &uuid_string, $n));
        )
    }

    prepare!(orig_path, Size::Original);
    prepare!(large_path, Size::Large);
    prepare!(medium_path, Size::Medium);
    prepare!(small_path, Size::Small);

    image.upload_uuid = None;
    image.uuid = Some(uuid.0.as_bytes().to_vec());

    match metadata(&orig_path).await {
        Err(e) if e.kind() == ErrorKind::NotFound => {
            copy(&upload_filename, &orig_path).await.unwrap();

            let (dim_x, dim_y) = spawn_blocking(move || {
                let img = ImageReader::open(&orig_path).unwrap()
                        . with_guessed_format().unwrap()
                        . decode().unwrap();
                let (dim_x, dim_y) = img.dimensions();

                img . resize(1280, 1280, Lanczos3)
                    . save_with_format(&large_path, Jpeg)
                    . unwrap();
                img . resize(800, 800, Lanczos3)
                    . save_with_format(&medium_path, Jpeg)
                    . unwrap();
                img . resize(400, 400, Lanczos3)
                    . save_with_format(&small_path, Jpeg)
                    . unwrap();

                (dim_x as i32, dim_y as i32)
            }).await;

            image.dim_x = Some(dim_x);
            image.dim_y = Some(dim_y);
        },
        Err(e) => {
            let e :Result<()> = Err(e);
            e.unwrap();
        },
        Ok(_) => {},
    }

    remove_file(&upload_filename).await.unwrap();
    web::block(move || finalize(pool, image)).await.unwrap();
}

async fn read_at( f   :&mut File
                , pos :SeekFrom
                , buf :&mut [u8]) -> std::io::Result<()> {
    f.seek(pos).await?;
    f.read_exact(buf).await
}

async fn get_sample( f   :&mut File
                       , buf :&mut [u8]) -> std::io::Result<()> {
    let file_len = f.metadata().await?.len();
    let chunk_size = buf.len() / 3;

    read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
    if file_len >= 2 * chunk_size as u64 {
        read_at( f
               , SeekFrom::End(-(chunk_size as i64))
               , &mut buf[2*chunk_size..]).await?;
    }
    if file_len >= 3 * chunk_size as u64 {
        read_at( f
               , SeekFrom::Start((file_len-chunk_size as u64) / 2)
               , &mut buf[chunk_size..2*chunk_size]).await?;
    }

    Ok(())
}