upload_worker.rs 6.08 KB
use std::{ io::ErrorKind
         , sync::Arc };
use actix_web::{web, http::StatusCode};
use async_std::{ fs::{DirBuilder, copy, metadata, remove_file}
               , channel::{Sender, Receiver, bounded}
               , task::spawn_blocking };
use futures::{ FutureExt, StreamExt, select
             , stream::FuturesUnordered};
use rexiv2::Metadata;
use steganography::encoder::Encoder;

use crate::{ models::image::{Image, finalize, ImageContext}
           , config::CONFIG
           , Pool
           , routes::image::Size
           , error::Error };

use image::{ io::Reader as ImageReader
           , GenericImageView
           , imageops::{FilterType::Lanczos3, overlay}
           , ImageFormat::Jpeg, DynamicImage };

pub fn launch() -> Sender<(Arc<Pool>, Image)> {
    let (tx_upload_worker, rx_upload_worker)
        : (Sender<(Arc<Pool>, Image)>, Receiver<(Arc<Pool>, Image)>) = bounded(100);

    actix_rt::spawn(async move {
        let mut workers = FuturesUnordered::new();

        loop {
            if workers.len() <= 3 {
                select! {
                    image = rx_upload_worker.recv().fuse() => {
                        match image {
                            Err(_) => break,
                            Ok((pool, image)) => workers.push(worker(pool, image)),
                        }
                    },
                    _result = workers.next() => {},
                }
            } else {
                workers.next().await;
            }
        }

        while workers.len() > 0 {
            workers.next().await;
        }
    });

    tx_upload_worker
}

async fn store_original(context :&mut ImageContext) -> Result<u64, Error> {
    let upload_path = context
                    . upload_path().await
                    . ok_or(Error::new( "Can't retreive upload path"
                                      , StatusCode::INTERNAL_SERVER_ERROR ))?
                    . to_owned();
    let original_path = context
                      . path(Size::Original).await
                      . ok_or(Error::new( "No path for given size"
                                        , StatusCode::INTERNAL_SERVER_ERROR ))?;

    let result = match metadata(&original_path).await {
        Err(e) if e.kind() == ErrorKind::NotFound => {
            match copy(&upload_path, &original_path).await {
                Ok(size) => Ok(size),
                Err(e) => Err(Error::from(e)),
            }
        },
        Err(e) => Err(Error::from(e)),
        Ok(_) => Err(Error::new( "File already exists"
                               , StatusCode::CONFLICT )),
    };

    remove_file(&upload_path).await?;

    result
}

async fn load_original(context :&mut ImageContext) -> Result<DynamicImage, Error> {
    let original_path = context
                      . path(Size::Original).await
                      . ok_or(Error::new( "Unable to load original image"
                                        , StatusCode::INTERNAL_SERVER_ERROR ))?;

    spawn_blocking(move || -> Result<DynamicImage, Error> {
        Ok(ImageReader::open(&original_path)? . with_guessed_format()?
                                              . decode()?)
    }).await
}

async fn save_resized( original :&DynamicImage
                     , context :&mut ImageContext
                     , size :Size ) -> Result<(), Error> {
    let width = CONFIG.width(size)
              . ok_or(Error::new( "Can't get width for size"
                                , StatusCode::INTERNAL_SERVER_ERROR ))?;
    let height = CONFIG.height(size)
               . ok_or(Error::new( "Can't get height for size"
                                 , StatusCode::INTERNAL_SERVER_ERROR ))?;

    let path = context.path(size).await
             . ok_or(Error::new( "Can't get path for size"
                               , StatusCode::INTERNAL_SERVER_ERROR ))?;

    let original = original.to_owned();

    match metadata(&path).await {
        Err(e) if e.kind() == ErrorKind::NotFound =>
            spawn_blocking(move || -> Result<(), Error> {
                let mut scaled = original.resize(width, height, Lanczos3);

                if let Size::Thumbnail = size {
                } else {
                    overlay(&mut scaled, CONFIG.copyright_image(), 0_u32, 0_u32);
                }

                let stegonography = CONFIG.copyright_steganography().as_bytes();
                let encoder = Encoder::new(stegonography, scaled);
                let scaled = encoder.encode_alpha();
                scaled.save_with_format(&path, Jpeg)?;

                let exiv = Metadata::new_from_path(&path)?;
                exiv.set_tag_string("Exif.Image.Copyright", CONFIG.copyright_exiv())?;
                exiv.save_to_file(&path)?;

                Ok(())
            }).await,
        Err(e) => Err(e)?,
        Ok(_) => Err(Error::new( "File already exists"
                               , StatusCode::CONFLICT )),
    }
}

async fn worker(pool :Arc<Pool>, image :Image) -> Result<(), Error> {
    let mut context = image.context();
    let base_path = context.base_path().await
                  . ok_or(Error::new( "Missing base_path"
                                    , StatusCode::INTERNAL_SERVER_ERROR ))?;

    DirBuilder::new() . recursive(true)
                      . create(base_path)
                      . await?;

    store_original(&mut context).await.unwrap_or(0);

    if let Ok(original) = load_original(&mut context).await {
        let (dim_x, dim_y) = original.dimensions();

        context.image.dim_x = Some(dim_x as i32);
        context.image.dim_y = Some(dim_y as i32);

        macro_rules! save_resized{
            ($s:expr) => { save_resized(&original, &mut context, $s) }
        }

        save_resized!(Size::Large).await.unwrap_or(());
        save_resized!(Size::Medium).await.unwrap_or(());
        save_resized!(Size::Small).await.unwrap_or(());
        save_resized!(Size::Thumbnail).await.unwrap_or(());
    }

    // TODO Think about two simpler functions than finanlize...
    // One to update one do remove the new entry depending if the
    // entry exists already...
    web::block(move || finalize(pool, context.image)).await?;

    Ok(())
}