Commit 149cddde3eb6fb3e454fda6424ccdda8c2eedf31

Authored by Georg Hopp
1 parent 76eebfe8

Concurrent upload processing

Showing 1 changed file with 34 additions and 21 deletions
@@ -7,23 +7,20 @@ mod routes; @@ -7,23 +7,20 @@ mod routes;
7 mod schema; 7 mod schema;
8 mod uuid; 8 mod uuid;
9 9
10 -use crate::routes::markdown::*;  
11 -use crate::routes::other::*;  
12 -use crate::routes::user::*; 10 +use routes::markdown::*;
  11 +use routes::other::*;
  12 +use routes::user::*;
  13 +use routes::upload::*;
13 use crate::uuid::Uuid; 14 use crate::uuid::Uuid;
14 15
15 use actix_web::{guard, web, App, HttpResponse, HttpServer}; 16 use actix_web::{guard, web, App, HttpResponse, HttpServer};
16 -use async_std::channel::Receiver;  
17 -use async_std::channel::Sender;  
18 -use async_std::channel::bounded;  
19 -use async_std::fs::File; 17 +use async_std::{ channel::{ Sender, bounded }
  18 + , fs::File };
20 use diesel::r2d2::{self, ConnectionManager}; 19 use diesel::r2d2::{self, ConnectionManager};
21 use diesel::SqliteConnection; 20 use diesel::SqliteConnection;
22 -use futures::AsyncReadExt;  
23 -use futures::AsyncSeekExt; 21 +use futures::{ AsyncReadExt, AsyncSeekExt, FutureExt, StreamExt, select
  22 + , stream::FuturesUnordered };
24 use listenfd::ListenFd; 23 use listenfd::ListenFd;
25 -use routes::markdown::get_markdown;  
26 -use routes::upload::upload;  
27 use std::io::SeekFrom; 24 use std::io::SeekFrom;
28 use std::sync::Arc; 25 use std::sync::Arc;
29 26
@@ -68,18 +65,34 @@ async fn main() -> std::io::Result<()> { @@ -68,18 +65,34 @@ async fn main() -> std::io::Result<()> {
68 65
69 dotenv::dotenv().ok(); 66 dotenv::dotenv().ok();
70 67
71 - let ( tx_upload_worker  
72 - , rx_upload_worker ) :( Sender<String>  
73 - , Receiver<String> ) = bounded(32); 68 + let (tx_upload_worker, rx_upload_worker) = bounded(32);
74 69
75 let _upload_worker = actix_rt::spawn(async move { 70 let _upload_worker = actix_rt::spawn(async move {
76 - while let Ok(filename) = rx_upload_worker.recv().await {  
77 - let mut f = File::open(&filename).await.unwrap();  
78 - let mut buf = vec!['.' as u8; 3 * 4096];  
79 - get_sample(&mut f, buf.as_mut()).await.unwrap();  
80 - println!("[UPLOAD WORKER] filename: {}", filename);  
81 - println!( "[UPLOAD WORKER] uuid: {}"  
82 - , Uuid::get("some.unique.namespace", buf.as_mut()) ); 71 + let mut workers = FuturesUnordered::new();
  72 +
  73 + loop {
  74 + select! {
  75 + filename = rx_upload_worker.recv().fuse() => {
  76 + match filename {
  77 + Err(_) => break,
  78 + Ok(filename) => workers.push(async move {
  79 + let mut f = File::open(&filename).await.unwrap();
  80 + let mut buf = vec!['.' as u8; 3 * 4096];
  81 + get_sample(&mut f, buf.as_mut()).await.unwrap();
  82 + println!( "[UPLOAD WORKER] filename: {}"
  83 + , filename );
  84 + println!( "[UPLOAD WORKER] uuid: {}"
  85 + , Uuid::get( "some.unique.namespace"
  86 + , buf.as_mut() ) );
  87 + })
  88 + }
  89 + },
  90 + _result = workers.next() => {},
  91 + }
  92 + }
  93 +
  94 + while workers.len() > 0 {
  95 + workers.next().await;
83 } 96 }
84 }); 97 });
85 98
Please register or login to post a comment