upload_worker.rs 5.2 KB
use std::{ io::ErrorKind
         , sync::Arc };
use actix_web::{web, http::StatusCode};
use async_std::{ fs::{DirBuilder, copy, metadata, remove_file}
               , channel::{Sender, Receiver, bounded}
               , task::spawn_blocking };
use futures::{ FutureExt, StreamExt, select
             , stream::FuturesUnordered};
use rexiv2::Metadata;
use steganography::encoder::Encoder;

use crate::{ models::image::{Image, finalize, ImageContext}
           , config::CONFIG
           , Pool
           , routes::image::Size
           , error::Error };

use image::{ io::Reader as ImageReader
           , GenericImageView
           , imageops::{FilterType::Lanczos3, overlay}
           , ImageFormat::Jpeg, DynamicImage };

pub fn launch() -> Sender<(Arc<Pool>, Image)> {
    let (tx_upload_worker, rx_upload_worker)
        : (Sender<(Arc<Pool>, Image)>, Receiver<(Arc<Pool>, Image)>) = bounded(32);

    actix_rt::spawn(async move {
        let mut workers = FuturesUnordered::new();

        loop {
            if workers.len() <= 3 {
                select! {
                    image = rx_upload_worker.recv().fuse() => {
                        match image {
                            Err(_) => break,
                            Ok((pool, image)) => workers.push(worker(pool, image)),
                        }
                    },
                    _result = workers.next() => {},
                }
            } else {
                workers.next().await;
            }
        }

        while workers.len() > 0 {
            workers.next().await;
        }
    });

    tx_upload_worker
}

async fn store_original(context :&mut ImageContext) -> Result<(), Error> {
    let upload_path = context
                    . upload_path().await
                    . ok_or(Error::new( "Can't retreive upload path"
                                      , StatusCode::INTERNAL_SERVER_ERROR ))?
                    . to_owned();
    let original_path = context
                      . path(Size::Original).await
                      . ok_or(Error::new( "No path for given size"
                                        , StatusCode::INTERNAL_SERVER_ERROR ))?;

    match metadata(&original_path).await {
        Err(e) if e.kind() == ErrorKind::NotFound => {
            copy(&upload_path, &original_path).await?;
            remove_file(&upload_path).await?;
            Ok(())
        },
        Err(e) => Err(e)?,
        Ok(_) => Ok(()),
    }
}

async fn load_original(context :&mut ImageContext) -> Result<DynamicImage, Error> {
    let original_path = context
                      . path(Size::Original).await
                      . ok_or(Error::new( "Unable to load original image"
                                        , StatusCode::INTERNAL_SERVER_ERROR ))?;

    spawn_blocking(move || -> Result<DynamicImage, Error> {
        Ok(ImageReader::open(&original_path)? . with_guessed_format()?
                                              . decode()?)
    }).await
}

async fn save_resized( original :&DynamicImage
                     , context :&mut ImageContext
                     , size :Size ) -> Result<(), Error> {
    let width = CONFIG.width(size)
              . ok_or(Error::new( "Can't get width for size"
                                , StatusCode::INTERNAL_SERVER_ERROR ))?;
    let height = CONFIG.height(size)
               . ok_or(Error::new( "Can't get height for size"
                                 , StatusCode::INTERNAL_SERVER_ERROR ))?;

    let path = context.path(size).await
             . ok_or(Error::new( "Can't get path for size"
                               , StatusCode::INTERNAL_SERVER_ERROR ))?;

    let original = original.to_owned();

    spawn_blocking(move || -> Result<(), Error> {
        let mut scaled = original.resize(width, height, Lanczos3);
        overlay(&mut scaled, CONFIG.copyright_image(), 0_u32, 0_u32);

        if let Size::Thumbnail = size {
            scaled.save_with_format(&path, Jpeg)?;
        } else {
            let stegonography = CONFIG.copyright_steganography().as_bytes();
            let encoder = Encoder::new(stegonography, scaled);
            let scaled = encoder.encode_alpha();
            scaled.save_with_format(&path, Jpeg)?;
        }

        let exiv = Metadata::new_from_path(&path)?;
        exiv.set_tag_string("Exif.Image.Copyright", CONFIG.copyright_exiv())?;
        exiv.save_to_file(&path)?;

        Ok(())
    }).await
}

async fn worker(pool :Arc<Pool>, image :Image) -> Result<(), Error> {
    let mut context = image.context();

    DirBuilder::new() . recursive(true)
                      . create(context.base_path().await.unwrap())
                      . await
                      . unwrap();

    store_original(&mut context).await.unwrap();

    let original = load_original(&mut context).await.unwrap();
    let (dim_x, dim_y) = original.dimensions();

    context.image.dim_x = Some(dim_x as i32);
    context.image.dim_y = Some(dim_y as i32);

    save_resized(&original, &mut context, Size::Large).await?;
    save_resized(&original, &mut context, Size::Medium).await?;
    save_resized(&original, &mut context, Size::Small).await?;
    save_resized(&original, &mut context, Size::Thumbnail).await?;

    web::block(move || finalize(pool, context.image)).await?;

    Ok(())
}