upload_worker.rs 2.26 KB
use std::io::SeekFrom;
use async_std::{fs::File, channel::{Sender, Receiver, bounded}};
use futures::{ AsyncSeekExt, AsyncReadExt, FutureExt, StreamExt, select
             , stream::FuturesUnordered};

use crate::{models::image::Image, upload_filename, upload_uuid};
use crate::uuid::Uuid;

use std::convert::TryFrom;
use crate::config::CONFIG;

pub fn launch() -> Sender<Image> {
    let (tx_upload_worker, rx_upload_worker)
        : (Sender<Image>, Receiver<Image>) = bounded(32);

    actix_rt::spawn(async move {
        let mut workers = FuturesUnordered::new();

        loop {
            select! {
                image = rx_upload_worker.recv().fuse() => {
                    match image {
                        Err(_) => break,
                        Ok(image) => workers.push(worker(image)),
                    }
                },
                _result = workers.next() => {},
            }
        }

        while workers.len() > 0 {
            workers.next().await;
        }
    });

    tx_upload_worker
}


async fn worker(image :Image) {
    let upload_filename = upload_filename!(image).unwrap();
    let mut f = File::open(&upload_filename).await.unwrap();
    let mut buf = vec!['.' as u8; 3 * 4096];
    get_sample(&mut f, buf.as_mut()).await.unwrap();
    println!( "[upload worker] filename: {}"
            , upload_filename );
    println!( "[upload worker] uuid: {}"
            , Uuid::get( "some.unique.namespace"
                       , buf.as_mut() ) );
}

async fn read_at( f   :&mut File
                , pos :SeekFrom
                , buf :&mut [u8]) -> std::io::Result<()> {
    f.seek(pos).await?;
    f.read_exact(buf).await
}

async fn get_sample( f   :&mut File
                       , buf :&mut [u8]) -> std::io::Result<()> {
    let file_len = f.metadata().await?.len();
    let chunk_size = buf.len() / 3;

    read_at(f, SeekFrom::Start(0), &mut buf[0..chunk_size]).await?;
    if file_len >= 2 * chunk_size as u64 {
        read_at( f
               , SeekFrom::End(-(chunk_size as i64))
               , &mut buf[2*chunk_size..]).await?;
    }
    if file_len >= 3 * chunk_size as u64 {
        read_at( f
               , SeekFrom::Start((file_len-chunk_size as u64) / 2)
               , &mut buf[chunk_size..2*chunk_size]).await?;
    }

    Ok(())
}