-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dealer: Rework context retrieval #1414
base: dev
Are you sure you want to change the base?
Changes from 15 commits
2f24606
e0f11f1
c8131be
29cd9b3
d8969da
28588c4
c73871c
d2f1dee
673ded3
d3ae5c0
11040d9
cd9f822
b13ab3a
932c6fa
0752b8d
08624dd
710e2fc
e6fe2ac
f59b43b
dfbac73
0dfced8
1873e2c
fbe1657
67ceecd
7d7e786
309ca8d
3cf4197
eb14e46
acc2b39
771f9e9
0749aa6
ba7fed3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,363 @@ | ||
use crate::state::context::ContextType; | ||
use crate::{ | ||
core::{Error, Session}, | ||
protocol::{ | ||
autoplay_context_request::AutoplayContextRequest, | ||
player::{Context, TransferState}, | ||
}, | ||
state::{context::UpdateContext, ConnectState}, | ||
}; | ||
use std::cmp::PartialEq; | ||
use std::{ | ||
collections::{HashMap, VecDeque}, | ||
fmt::{Display, Formatter}, | ||
hash::{Hash, Hasher}, | ||
time::Duration, | ||
}; | ||
use thiserror::Error as ThisError; | ||
use tokio::time::Instant; | ||
|
||
#[derive(Debug, Clone)] | ||
enum Resolve { | ||
Uri(String), | ||
Context(Context), | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub(super) enum ContextAction { | ||
Append, | ||
Replace, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub(super) struct ResolveContext { | ||
resolve: Resolve, | ||
fallback: Option<String>, | ||
update: UpdateContext, | ||
action: ContextAction, | ||
} | ||
|
||
impl ResolveContext { | ||
fn append_context(uri: impl Into<String>) -> Self { | ||
Self { | ||
resolve: Resolve::Uri(uri.into()), | ||
fallback: None, | ||
update: UpdateContext::Default, | ||
action: ContextAction::Append, | ||
} | ||
} | ||
|
||
pub fn from_uri( | ||
uri: impl Into<String>, | ||
fallback: impl Into<String>, | ||
update: UpdateContext, | ||
action: ContextAction, | ||
) -> Self { | ||
let fallback_uri = fallback.into(); | ||
Self { | ||
resolve: Resolve::Uri(uri.into()), | ||
fallback: (!fallback_uri.is_empty()).then_some(fallback_uri), | ||
update, | ||
action, | ||
} | ||
} | ||
|
||
pub fn from_context(context: Context, update: UpdateContext, action: ContextAction) -> Self { | ||
Self { | ||
resolve: Resolve::Context(context), | ||
fallback: None, | ||
update, | ||
action, | ||
} | ||
} | ||
|
||
/// the uri which should be used to resolve the context, might not be the context uri | ||
fn resolve_uri(&self) -> Option<&str> { | ||
// it's important to call this always, or at least for every ResolveContext | ||
// otherwise we might not even check if we need to fallback and just use the fallback uri | ||
match self.resolve { | ||
Resolve::Uri(ref uri) => ConnectState::valid_resolve_uri(uri), | ||
Resolve::Context(ref ctx) => ConnectState::get_context_uri_from_context(ctx), | ||
} | ||
.or(self.fallback.as_deref()) | ||
} | ||
|
||
/// the actual context uri | ||
fn context_uri(&self) -> &str { | ||
match self.resolve { | ||
Resolve::Uri(ref uri) => uri, | ||
Resolve::Context(ref ctx) => &ctx.uri, | ||
} | ||
} | ||
} | ||
|
||
impl Display for ResolveContext { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!( | ||
f, | ||
"resolve_uri: <{:?}>, context_uri: <{}>, update: <{:?}>", | ||
self.resolve_uri(), | ||
self.context_uri(), | ||
self.update, | ||
) | ||
} | ||
} | ||
|
||
impl PartialEq for ResolveContext { | ||
fn eq(&self, other: &Self) -> bool { | ||
let eq_context = self.context_uri() == other.context_uri(); | ||
let eq_resolve = self.resolve_uri() == other.resolve_uri(); | ||
let eq_autoplay = self.update == other.update; | ||
|
||
eq_context && eq_resolve && eq_autoplay | ||
} | ||
} | ||
|
||
impl Eq for ResolveContext {} | ||
|
||
impl Hash for ResolveContext { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess deriving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not quit easily as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No problem. Certainly if you need to patch up protobuf generated code, I'd say don't bother. |
||
fn hash<H: Hasher>(&self, state: &mut H) { | ||
self.context_uri().hash(state); | ||
self.resolve_uri().hash(state); | ||
self.update.hash(state); | ||
} | ||
} | ||
|
||
#[derive(Debug, ThisError)] | ||
enum ContextResolverError { | ||
#[error("no next context to resolve")] | ||
NoNext, | ||
#[error("tried appending context with {0} pages")] | ||
UnexpectedPagesSize(usize), | ||
#[error("tried resolving not allowed context: {0:?}")] | ||
NotAllowedContext(String), | ||
} | ||
|
||
impl From<ContextResolverError> for Error { | ||
fn from(value: ContextResolverError) -> Self { | ||
Error::failed_precondition(value) | ||
} | ||
} | ||
|
||
pub struct ContextResolver { | ||
session: Session, | ||
queue: VecDeque<ResolveContext>, | ||
unavailable_contexts: HashMap<ResolveContext, Instant>, | ||
} | ||
|
||
// time after which an unavailable context is retried | ||
const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600); | ||
|
||
impl ContextResolver { | ||
pub fn new(session: Session) -> Self { | ||
Self { | ||
session, | ||
queue: VecDeque::new(), | ||
unavailable_contexts: HashMap::new(), | ||
} | ||
} | ||
|
||
pub fn add(&mut self, resolve: ResolveContext) { | ||
let last_try = self | ||
.unavailable_contexts | ||
.get(&resolve) | ||
.map(|i| i.duration_since(Instant::now())); | ||
|
||
let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) { | ||
let _ = self.unavailable_contexts.remove(&resolve); | ||
debug!( | ||
"context was requested {}s ago, trying again to resolve the requested context", | ||
last_try.expect("checked by condition").as_secs() | ||
); | ||
None | ||
} else { | ||
last_try | ||
}; | ||
|
||
if last_try.is_some() { | ||
debug!("tried loading unavailable context: {resolve}"); | ||
return; | ||
} else if self.queue.contains(&resolve) { | ||
debug!("update for {resolve} is already added"); | ||
return; | ||
} | ||
|
||
self.queue.push_back(resolve) | ||
} | ||
|
||
pub fn add_list(&mut self, resolve: Vec<ResolveContext>) { | ||
for resolve in resolve { | ||
self.add(resolve) | ||
} | ||
} | ||
|
||
pub fn remove_used_and_invalid(&mut self) { | ||
if let Some((_, _, remove)) = self.find_next() { | ||
for _ in 0..remove { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe nicer to use |
||
let _ = self.queue.pop_front(); | ||
} | ||
} | ||
self.queue.pop_front(); | ||
photovoltex marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
pub fn clear(&mut self) { | ||
self.queue = VecDeque::new() | ||
} | ||
|
||
fn find_next(&self) -> Option<(&ResolveContext, &str, usize)> { | ||
let mut idx = 0; | ||
loop { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not change into a |
||
let next = self.queue.front()?; | ||
match next.resolve_uri() { | ||
None if idx < self.queue.len() => { | ||
warn!("skipped {idx} because of no valid resolve_uri: {next}"); | ||
idx += 1; | ||
continue; | ||
} | ||
value => break value.map(|uri| (next, uri, idx)), | ||
} | ||
} | ||
} | ||
|
||
pub fn has_next(&self) -> bool { | ||
self.find_next().is_some() | ||
} | ||
|
||
pub async fn get_next_context( | ||
&self, | ||
recent_track_uri: impl Fn() -> Vec<String>, | ||
) -> Result<Context, Error> { | ||
let (next, resolve_uri, _) = self.find_next().ok_or(ContextResolverError::NoNext)?; | ||
|
||
match next.update { | ||
UpdateContext::Default => { | ||
let mut ctx = self.session.spclient().get_context(resolve_uri).await; | ||
if let Ok(ctx) = ctx.as_mut() { | ||
ctx.uri = next.context_uri().to_string(); | ||
ctx.url = format!("context://{}", ctx.uri); | ||
} | ||
|
||
ctx | ||
} | ||
UpdateContext::Autoplay => { | ||
if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:") | ||
{ | ||
// autoplay is not supported for podcasts | ||
Err(ContextResolverError::NotAllowedContext( | ||
resolve_uri.to_string(), | ||
))? | ||
} | ||
|
||
let request = AutoplayContextRequest { | ||
context_uri: Some(resolve_uri.to_string()), | ||
recent_track_uri: recent_track_uri(), | ||
..Default::default() | ||
}; | ||
self.session.spclient().get_autoplay_context(&request).await | ||
} | ||
} | ||
} | ||
|
||
pub fn mark_next_unavailable(&mut self) { | ||
if let Some((next, _, _)) = self.find_next() { | ||
self.unavailable_contexts | ||
.insert(next.clone(), Instant::now()); | ||
} | ||
} | ||
|
||
pub fn handle_next_context( | ||
&self, | ||
state: &mut ConnectState, | ||
mut context: Context, | ||
) -> Result<Option<Vec<ResolveContext>>, Error> { | ||
let (next, _, _) = self.find_next().ok_or(ContextResolverError::NoNext)?; | ||
|
||
let remaining = match next.action { | ||
ContextAction::Append if context.pages.len() == 1 => state | ||
.fill_context_from_page(context.pages.remove(0)) | ||
.map(|_| None), | ||
ContextAction::Replace => { | ||
let remaining = state.update_context(context, next.update); | ||
if let Resolve::Context(ref ctx) = next.resolve { | ||
state.merge_context(Some(ctx.clone())); | ||
} | ||
|
||
remaining | ||
} | ||
ContextAction::Append => { | ||
warn!("unexpected page size: {context:#?}"); | ||
Err(ContextResolverError::UnexpectedPagesSize(context.pages.len()).into()) | ||
} | ||
}?; | ||
|
||
Ok(remaining.map(|remaining| { | ||
remaining | ||
.into_iter() | ||
.map(ResolveContext::append_context) | ||
.collect::<Vec<_>>() | ||
})) | ||
} | ||
|
||
pub fn try_finish( | ||
&self, | ||
state: &mut ConnectState, | ||
transfer_state: &mut Option<TransferState>, | ||
) -> bool { | ||
let (next, _, _) = match self.find_next() { | ||
None => return false, | ||
Some(next) => next, | ||
}; | ||
|
||
// when there is only one update type, we are the last of our kind, so we should update the state | ||
if self | ||
.queue | ||
.iter() | ||
.filter(|resolve| resolve.update == next.update) | ||
.count() | ||
!= 1 | ||
{ | ||
return false; | ||
} | ||
|
||
debug!("last item of type <{:?}>, finishing state", next.update); | ||
|
||
match (next.update, state.active_context) { | ||
(UpdateContext::Default, ContextType::Default) => {} | ||
(UpdateContext::Default, _) => { | ||
debug!("skipped finishing default, because it isn't the active context"); | ||
return false; | ||
} | ||
(UpdateContext::Autoplay, _) => {} | ||
} | ||
|
||
if let Some(transfer_state) = transfer_state.take() { | ||
if let Err(why) = state.finish_transfer(transfer_state) { | ||
error!("finishing setup of transfer failed: {why}") | ||
} | ||
} | ||
|
||
let res = if state.shuffling_context() { | ||
state.shuffle() | ||
} else if let Ok(ctx) = state.get_context(state.active_context) { | ||
let idx = ConnectState::find_index_in_context(ctx, |t| { | ||
state.current_track(|c| t.uri == c.uri) | ||
}) | ||
.ok(); | ||
|
||
state | ||
.reset_playback_to_position(idx) | ||
.and_then(|_| state.fill_up_next_tracks()) | ||
} else { | ||
state.fill_up_next_tracks() | ||
}; | ||
|
||
if let Err(why) = res { | ||
error!("setting up state failed after updating contexts: {why}") | ||
} | ||
|
||
state.update_restrictions(); | ||
state.update_queue_revision(); | ||
|
||
true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work correctly? I believe
or
is eagerly evaluated.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it doesn't evaluated any values it shouldn't matter if it's eagerly or not. Unless there isn't any benefit to it, I would just keep it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking that this will probably always evaluate to
None
because of the eager evaluation at compile time, not sure if that's the behavior you intend?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why would it always evaluate to
None
, I think eagerly evaluated just means always/early evaluated? Asself.fallback
is anOption<String>
it should only provide the value in place. I think the only difference would be the evaluation ofas_deref()
. But this is only used here to convertOption<String>
intoOption<&str>
.Just want to understand where you are coming from, as from my perspective it shouldn't make a difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.