upload_worker.rs 3.55 KB
use std::{io::{SeekFrom, ErrorKind}, sync::Arc};
use actix_web::web;
use async_std::{ fs::{File, DirBuilder, copy, metadata, remove_file}
               , channel::{Sender, Receiver, bounded}
               , path::PathBuf
               , io::Result };
use futures::{ AsyncSeekExt, AsyncReadExt, FutureExt, StreamExt, select
             , stream::FuturesUnordered};

use crate::{models::image::{Image, finalize}, upload_filename, config::CONFIG, Pool};
use crate::uuid::Uuid;

use std::convert::TryFrom;
use image::{io::Reader as ImageReader, GenericImageView};

pub fn launch() -> Sender<(Arc<Pool>, Image)> {
    let (tx_upload_worker, rx_upload_worker)
        : (Sender<(Arc<Pool>, Image)>, Receiver<(Arc<Pool>, Image)>) = bounded(32);

    actix_rt::spawn(async move {
        let mut workers = FuturesUnordered::new();

        loop {
            select! {
                image = rx_upload_worker.recv().fuse() => {
                    match image {
                        Err(_) => break,
                        Ok((pool, image)) => workers.push(worker(pool, image)),
                    }
                },
                _result = workers.next() => {},
            }
        }

        while workers.len() > 0 {
            workers.next().await;
        }
    });

    tx_upload_worker
}


async fn worker(pool :Arc<Pool>, mut image :Image) {
    let upload_filename = upload_filename!(image).unwrap();
    let mut f = File::open(&upload_filename).await.unwrap();

    let mut buf = vec!['.' as u8; 3 * 3 * 4096];
    get_sample(&mut f, buf.as_mut()).await.unwrap();
    let uuid = Uuid::get(CONFIG.namespace(), buf.as_mut());
    let uuid_string = format!("{}", uuid);

    let mut image_path = PathBuf::from(CONFIG.images_dir());
    image_path.push(&uuid_string.as_str()[..2]);
    image_path.push(&uuid_string.as_str()[..5]);

    DirBuilder::new() . recursive(true)
                      . create(&image_path)
                      . await
                      . unwrap();

    image_path.push(&uuid_string);

    image.upload_uuid = None;
    image.uuid = Some(uuid.0.as_bytes().to_vec());

    match metadata(&image_path).await {
        Err(e) if e.kind() == ErrorKind::NotFound => {
            copy(&upload_filename, &image_path).await.unwrap();

            let img = ImageReader::open(&image_path).unwrap()
                    . with_guessed_format().unwrap()
                    . decode().unwrap();
            let (dim_x, dim_y) = img.dimensions();

            image.dim_x = Some(dim_x as i32);
            image.dim_y = Some(dim_y as i32);
        },
        Err(e) => {
            let e :Result<()> = Err(e);
            e.unwrap();
        },
        Ok(_) => {},
    }

    remove_file(&upload_filename).await.unwrap();
    web::block(move || finalize(pool, image)).await.unwrap();
}

async fn read_at( f   :&mut File
                , pos :SeekFrom
                , buf :&mut [u8]) -> std::io::Result<()> {
    f.seek(pos).await?;
    f.read_exact(buf).await
}

async fn get_sample( f   :&mut File
                       , buf :&mut [u8]) -> std::io::Result<()> {
    let file_len = f.metadata().await?.len();
    let chunk_size = buf.len() / 3;

    read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
    if file_len >= 2 * chunk_size as u64 {
        read_at( f
               , SeekFrom::End(-(chunk_size as i64))
               , &mut buf[2*chunk_size..]).await?;
    }
    if file_len >= 3 * chunk_size as u64 {
        read_at( f
               , SeekFrom::Start((file_len-chunk_size as u64) / 2)
               , &mut buf[chunk_size..2*chunk_size]).await?;
    }

    Ok(())
}