From d44b8e167e657cf088c2c28f15159e7de31d5886 Mon Sep 17 00:00:00 2001 From: Nathan Perry Date: Wed, 14 Feb 2018 15:51:46 -0500 Subject: fix lock contention --- src/commands.rs | 128 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 75 insertions(+), 53 deletions(-) (limited to 'src/commands.rs') diff --git a/src/commands.rs b/src/commands.rs index dce373c..61621f4 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex as SMutex}; +use std::sync::{Arc, RwLock}; use std::collections::VecDeque; use std::thread; use std::time::Duration; @@ -46,7 +46,7 @@ pub struct PlayQueue { } impl Key for PlayQueue { - type Value = Arc>; + type Value = Arc>; } impl PlayQueue { @@ -61,46 +61,48 @@ impl PlayQueue { let voice_manager = Arc::clone(&c.voice_manager); let mut data = c.data.lock(); - let queue = Arc::new(SMutex::new(PlayQueue::new())); + let queue = Arc::new(RwLock::new(PlayQueue::new())); data.insert::(Arc::clone(&queue)); thread::spawn(move || { let queue_lck = Arc::clone(&queue); - let sleep = || { - trace!("poll thread sleep"); - thread::sleep(Duration::from_millis(250)); - trace!("poll thread awake"); - }; let voice_manager = voice_manager; loop { - let mut queue = queue_lck.lock().unwrap(); + thread::sleep(Duration::from_millis(250)); + let (queue_is_empty, queue_has_playing) = { + let queue = queue_lck.read().unwrap(); - let allow_continue = queue.playing.clone().map_or(false, |x| !x.audio.lock().finished); + let allow_continue = queue.playing.clone().map_or(false, |x| !x.audio.lock().finished); - if allow_continue { - sleep(); - continue; - } + if allow_continue { + continue; + } + + (queue.queue.is_empty(), queue.playing.is_some()) + }; + + if queue_is_empty { + if queue_has_playing { + let mut queue = queue_lck.write().unwrap(); - if queue.queue.is_empty() { - if queue.playing.is_some() { // must be finished assert!({ let audio_lck = queue.playing.clone().unwrap().audio; let audio = audio_lck.lock(); audio.finished }); + queue.playing = None; + let mut manager = voice_manager.lock(); manager.leave(*TARGET_GUILD_ID); debug!("disconnected due to inactivity"); } - - sleep(); continue; } + let mut queue = queue_lck.write().unwrap(); let item = queue.queue.pop_front().unwrap(); trace!("checking ytdl for: {}", item.url); @@ -110,7 +112,6 @@ impl PlayQueue { Err(e) => { error!("bad link: {}; {:?}", &item.url, e); let _ = send(item.sender_channel, &format!("what the fuck"), false); - sleep(); continue; } }; @@ -134,8 +135,6 @@ impl PlayQueue { let _ = send(item.sender_channel, "something happened somewhere somehow.", false); } } - - sleep(); } }); } @@ -206,8 +205,12 @@ command!(play(ctx, msg, args) { return Ok(()); } + trace!("acquiring queue lock"); + let mut queue_lock = ctx.data.lock().get::().cloned().unwrap(); - let mut play_queue = queue_lock.lock().unwrap(); + let mut play_queue = queue_lock.write().unwrap(); + + trace!("queue lock acquired"); play_queue.queue.push_back(PlayArgs{ initiator: msg.author.name.clone(), @@ -218,50 +221,64 @@ command!(play(ctx, msg, args) { command!(pause(ctx, msg) { let mut queue_lock = ctx.data.lock().get::().cloned().unwrap(); - let mut play_queue = queue_lock.lock().unwrap(); let done = || send(msg.channel_id, "r u srs", msg.tts); - - let current_item = match play_queue.playing { - Some(ref x) => x, - None => { - done()?; - return Ok(()); - }, + let playing = { + let play_queue = queue_lock.read().unwrap(); + + let current_item = match play_queue.playing { + Some(ref x) => x, + None => { + done()?; + return Ok(()); + }, + }; + + let audio = current_item.audio.lock(); + audio.playing }; - let mut audio = current_item.audio.lock(); - - if !audio.playing { + if !playing { done()?; return Ok(()); } - audio.pause(); + { + let queue = queue_lock.write().unwrap(); + let ref audio = queue.playing.clone().unwrap().audio; + audio.lock().pause(); + } }); command!(resume(ctx, msg) { let mut queue_lock = ctx.data.lock().get::().cloned().unwrap(); - let mut play_queue = queue_lock.lock().unwrap(); let done = || send(msg.channel_id, "r u srs", msg.tts); - - let current_item = match play_queue.playing { - Some(ref x) => x, - None => { - done()?; - return Ok(()); - }, + let playing = { + let play_queue = queue_lock.read().unwrap(); + + let current_item = match play_queue.playing { + Some(ref x) => x, + None => { + done()?; + return Ok(()); + }, + }; + + let audio = current_item.audio.lock(); + audio.playing }; - let mut audio = current_item.audio.lock(); - - if audio.playing { + if playing { done()?; return Ok(()); } - audio.play(); + { + let queue = queue_lock.write().unwrap(); + let ref audio = queue.playing.clone().unwrap().audio; + audio.lock().play(); + } }); command!(skip(ctx, _msg) { @@ -271,10 +288,10 @@ command!(skip(ctx, _msg) { let mut manager = mgr_lock.lock(); let mut queue_lock = data.get::().cloned().unwrap(); - let mut play_queue = queue_lock.lock().unwrap(); - + if let Some(handler) = manager.get_mut(*TARGET_GUILD_ID) { handler.stop(); + let mut play_queue = queue_lock.write().unwrap(); play_queue.playing = None; } else { debug!("got skip with no handler attached"); @@ -288,10 +305,13 @@ command!(die(ctx, msg) { let mut manager = mgr_lock.lock(); let mut queue_lock = data.get::().cloned().unwrap(); - let mut play_queue = queue_lock.lock().unwrap(); - play_queue.playing = None; - play_queue.queue.clear(); + { + let mut play_queue = queue_lock.write().unwrap(); + + play_queue.playing = None; + play_queue.queue.clear(); + } if let Some(handler) = manager.get_mut(*TARGET_GUILD_ID) { handler.stop(); @@ -304,14 +324,16 @@ command!(die(ctx, msg) { command!(list(ctx, msg) { let mut queue_lock = ctx.data.lock().get::().cloned().unwrap(); - let mut play_queue = queue_lock.lock().unwrap(); + let mut play_queue = queue_lock.read().unwrap(); let channel_tmp = msg.channel().unwrap().guild().unwrap(); let channel = channel_tmp.read(); match play_queue.playing { Some(ref info) => { - send(msg.channel_id, &format!("Currently playing `{}` ({})", info.init_args.url, info.init_args.initiator), msg.tts)?; + let audio = info.audio.lock(); + let status = if audio.playing { "playing" } else { "paused:" }; + send(msg.channel_id, &format!("Currently {} `{}` ({})", status, info.init_args.url, info.init_args.initiator), msg.tts)?; }, None => { debug!("`list` called with no items in queue"); -- cgit v1.3.1