Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio migration #665

Merged
merged 92 commits into from
Apr 13, 2021
Merged
Changes from 1 commit
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
6f5607d
[Core] Bump hyper to ~0.12
ashthespy Jan 23, 2021
931c820
[Connect] Migrate to hyper ~v12
ashthespy Jan 23, 2021
962d7af
Clean up hyper from binary
ashthespy Jan 23, 2021
9bbf8c3
WIP tokio-core -> tokio migration
ashthespy Jan 23, 2021
53b4ab0
Migrate to `tokio` 0.1
ashthespy Jan 23, 2021
c69ccf7
[Connect] Migrate to `tokio` 0.1
ashthespy Jan 23, 2021
47a1575
WIP Futures
ashthespy Jan 23, 2021
94fc0a1
[Core/connection] Refactor to async/await
ashthespy Jan 23, 2021
c273d51
[AudioKeyManager] Convert to async
ashthespy Jan 23, 2021
20dd94f
Fix tokio dependency in main
ashthespy Jan 23, 2021
0892587
[Core] WIP: Sessions
ashthespy Jan 23, 2021
40e6355
Migrate core to tokio 1.0
Johannesd3 Jan 21, 2021
6867ad0
Added test
Johannesd3 Jan 21, 2021
424ba3a
Migrated metadata crate to futures 0.3
Johannesd3 Jan 21, 2021
80d384e
Migrated audio crate to futures 0.3
Johannesd3 Jan 21, 2021
90905b8
Improved RangeSet implementation
Johannesd3 Jan 21, 2021
0895f17
Migrated playback crate to futures 0.3
Johannesd3 Jan 21, 2021
6c9d8c8
Replace pin_project and updated dependencies
Johannesd3 Jan 22, 2021
fe37186
Make librespot_playback work
Johannesd3 Jan 22, 2021
91d7d04
Preparing main crate for testing
Johannesd3 Jan 22, 2021
9546fb6
Merge branch 'futures_migration' of https://github.com/Johannesd3/lib…
ashthespy Jan 25, 2021
07514c9
Add proxy support to apresolve
Johannesd3 Jan 25, 2021
c97fdeb
Replaced .fold(0, add) by .sum()
Johannesd3 Jan 25, 2021
aa90278
Merge pull request #581 from ashthespy/tokio_migration
ashthespy Jan 26, 2021
a45fe85
Enable logging in test
Johannesd3 Jan 30, 2021
c1d62d7
Fixed ProxyTunnel
Johannesd3 Jan 30, 2021
bb44b99
Use proxytunnel in apresolve
Johannesd3 Jan 30, 2021
d662e03
Merge pull request #583 from Johannesd3/tokio_migration_proxy
ashthespy Feb 10, 2021
872fab6
Merge branch 'dev' into tokio_migration
Johannesd3 Feb 10, 2021
1f40aff
Merge pull request #599 from Johannesd3/tokio_migration
ashthespy Feb 11, 2021
2f05ddf
Fix bugs in player
Johannesd3 Feb 12, 2021
b2f1be4
Make `RodioSink` `Send` and improve error handling
Johannesd3 Feb 12, 2021
689415a
Improved error handling in rodio backend
Johannesd3 Feb 12, 2021
b77f0a1
Fix formatting
Johannesd3 Feb 13, 2021
afacaea
Merge pull request #601 from Johannesd3/tokio_migration
ashthespy Feb 20, 2021
daf7ecd
Migrate librespot-connect to tokio 1.0
Johannesd3 Feb 19, 2021
2c81aaa
Implement MercurySender not as sink
Johannesd3 Feb 20, 2021
1c4d57c
Add shutdown to discovery server
Johannesd3 Feb 20, 2021
007e653
Restore original blocking player behaviour
Johannesd3 Feb 20, 2021
220061e
Migrate application to tokio 1.0
Johannesd3 Feb 21, 2021
c9b3b95
Merge pull request #606 from Johannesd3/tokio_connect_migration
ashthespy Feb 23, 2021
1fc5267
Revert "Merge pull request #548 from Lcchy/rodiojack-backend"
Johannesd3 Feb 23, 2021
678d177
Merge branch 'dev' into tokio_migration
Johannesd3 Feb 23, 2021
c0942f1
Restore rodiojack support
Johannesd3 Feb 23, 2021
9253be7
Small refactor of librespot-core
Johannesd3 Feb 10, 2021
8cff10e
Put apresolve behind feature flag
Johannesd3 Feb 10, 2021
10827bd
Clean up dependencies of librespot-core
Johannesd3 Feb 10, 2021
a6ed685
Clean up dependencies in librespot-metadata
Johannesd3 Feb 10, 2021
746e6c8
Put lewton behind feature flag
Johannesd3 Feb 10, 2021
b83976a
Remove "extern crate"s from librespot-audio
Johannesd3 Feb 10, 2021
5c42d2e
Clean up dependencies in librespot-audio
Johannesd3 Feb 10, 2021
5aeb733
Clean up dependencies in librespot-playback
Johannesd3 Feb 22, 2021
45f42ac
Refactor 'find_available_alternatives'
Johannesd3 Feb 22, 2021
27f308b
Replace error_chain by thiserror
Johannesd3 Feb 13, 2021
f9c0e26
Simplify code
Johannesd3 Feb 13, 2021
d064ffc
Use tokio channels and fix compilation errors
Johannesd3 Feb 21, 2021
59c5566
Clean up librespot-connect dependencies
Johannesd3 Feb 21, 2021
18179e7
Remove unused dependencies and fix feature flags
Johannesd3 Feb 21, 2021
b606d8c
Replace "extern crate"s
Johannesd3 Feb 21, 2021
f22b419
Update url crate to 2.1
Johannesd3 Feb 22, 2021
9d77fef
Merge pull request #649 from Johannesd3/tokio-migration-refactor-deps
ashthespy Feb 26, 2021
6a33eb4
minor cleanup
leshow Mar 1, 2021
3388508
use current_thread
leshow Mar 1, 2021
3876139
Merge pull request #652 from leshow/cleanup
ashthespy Mar 9, 2021
5616004
Fix many clippy lints
Johannesd3 Mar 10, 2021
059b902
Remove redundant field names
Johannesd3 Mar 10, 2021
173a363
Merge pull request #667 from Johannesd3/small-fixes
ashthespy Mar 11, 2021
e71a004
Refactor AudioFileFetch using async/await
Johannesd3 Feb 28, 2021
ca255c1
Split file fetch.rs
Johannesd3 Feb 28, 2021
963d50e
Merge pull request #658 from Johannesd3/refactor-audio-file-fetch
sashahilton00 Mar 17, 2021
ec1ec59
update examples
ThouCheese Mar 6, 2021
1051f98
Merge pull request #663 from ThouCheese/tokio_migration
ashthespy Mar 24, 2021
95fedf5
Add back hyper-proxy
Johannesd3 Mar 17, 2021
d4dfd48
Merge pull request #674 from Johannesd3/proxy-support
ashthespy Mar 29, 2021
f5274f5
Merge branch 'dev' into tokio_migration
Johannesd3 Apr 9, 2021
7c3d891
Fix clippy warnings
Johannesd3 Mar 31, 2021
11ce290
Fix formatting
Johannesd3 Apr 1, 2021
7ddb1a2
Reuse librespot-core's Diffie Hellman in discovery
Johannesd3 Mar 10, 2021
9378ae5
Bump num-bigint dependency
Johannesd3 Mar 18, 2021
e688e7e
Almost eliminate util module
Johannesd3 Mar 18, 2021
cb8c9c2
Enable apresolve always in binary
Johannesd3 Apr 1, 2021
b7350b7
Restore previous feature flags
Johannesd3 Apr 9, 2021
9a3a666
Bump MSRV to 1.45
Johannesd3 Apr 9, 2021
5435ab3
Fix compile errors in backends
Johannesd3 Apr 1, 2021
690e0d2
Add simple tests to librespot-core
Johannesd3 Apr 2, 2021
ff49982
Add missing feature flag to tokio
Johannesd3 Apr 9, 2021
317e586
Improve CI
Johannesd3 Apr 9, 2021
26c127c
Merge branch 'dev' into tokio_migration
Johannesd3 Apr 10, 2021
a576194
Fix bug in rodio backend
Johannesd3 Apr 10, 2021
b4f9ae3
Fix clippy warnings
Johannesd3 Apr 10, 2021
3e9aee1
Renamed variable
Johannesd3 Apr 10, 2021
f158d23
Merge pull request #687 from Johannesd3/tokio_migration
sashahilton00 Apr 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP Futures
Fix apresolve

WIP session

[Core] More migration

Playing with `ReadExact` and `WriteAll`

Add some simple checks

Take little steps
  • Loading branch information
ashthespy committed Jan 23, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 47a1575c00ee1b0abee98599e3e2d478638e18ca
3 changes: 2 additions & 1 deletion audio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,7 +14,8 @@ version = "0.1.3"
bit-set = "0.5"
byteorder = "1.3"
bytes = "0.4"
futures = "0.1"
futures = "0.3"
tokio = { version = "0.2", features = ["full"] } # Temp "rt-core", "sync"
lewton = "0.9"
log = "0.4"
num-bigint = "0.3"
122 changes: 59 additions & 63 deletions audio/src/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
use crate::range_set::{Range, RangeSet};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::Stream;
use futures::{Async, Future, Poll};
use std::cmp::{max, min};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use tempfile::NamedTempFile;

use futures::sync::mpsc::unbounded;
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session;
use librespot_core::spotify_id::FileId;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use futures::{
channel::{mpsc, mpsc::unbounded, oneshot},
ready, Future, Stream,
};
use std::{
pin::Pin,
task::{Context, Poll},
};

use tokio::task;

const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
// The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file.
@@ -329,6 +336,7 @@ impl AudioFileOpenStreaming {
complete_tx,
);
self.session.spawn(fetcher);
// tokio::spawn(move |_| fetcher);

AudioFileStreaming {
read_file: read_file,
@@ -343,36 +351,37 @@ impl AudioFileOpenStreaming {
}

impl Future for AudioFileOpen {
type Item = AudioFile;
type Error = ChannelError;
type Output = Result<AudioFile, ChannelError>;

fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<AudioFile, ChannelError>> {
match *self {
AudioFileOpen::Streaming(ref mut open) => {
let file = try_ready!(open.poll());
Ok(Async::Ready(AudioFile::Streaming(file)))
let file = ready!(open.poll());
Poll::Ready(Ok(AudioFile::Streaming(file)))
}
AudioFileOpen::Cached(ref mut file) => {
let file = file.take().unwrap();
Ok(Async::Ready(AudioFile::Cached(file)))
Poll::Ready(Ok(AudioFile::Cached(file)))
}
}
}
}

impl Future for AudioFileOpenStreaming {
type Item = AudioFileStreaming;
type Error = ChannelError;
type Output = Result<AudioFileStreaming, ChannelError>;

fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
fn poll(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<AudioFileStreaming, ChannelError>> {
loop {
let (id, data) = try_ready!(self.headers.poll()).unwrap();
let (id, data) = ready!(self.headers.poll()).unwrap();

if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size);

return Ok(Async::Ready(file));
return Poll::Ready(Ok(file));
}
}
}
@@ -563,13 +572,12 @@ impl AudioFileFetchDataReceiver {
}

impl Future for AudioFileFetchDataReceiver {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<(), ()> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
loop {
match self.data_rx.poll() {
Ok(Async::Ready(Some(data))) => {
Poll::Ready(Some(data)) => {
if self.measure_ping_time {
if let Some(request_sent_time) = self.request_sent_time {
let duration = Instant::now() - request_sent_time;
@@ -603,26 +611,24 @@ impl Future for AudioFileFetchDataReceiver {
}
if self.request_length == 0 {
self.finish();
return Ok(Async::Ready(()));
return Poll::Ready(());
}
}
Ok(Async::Ready(None)) => {
Poll::Ready(None) => {
if self.request_length > 0 {
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
}
self.finish();
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
return Poll::Ready(());
}
Poll::Pending => return Poll::Pending,
Err(ChannelError) => {
warn!(
"Error from channel for data receiver for range {} (+{}).",
self.initial_data_offset, self.initial_request_length
);
self.finish();
return Ok(Async::Ready(()));
return Poll::Ready(());
}
}
}
@@ -672,6 +678,7 @@ impl AudioFileFetch {
);

session.spawn(initial_data_receiver);
// tokio::spawn(move |_| initial_data_receiver);

AudioFileFetch {
session: session,
@@ -747,6 +754,7 @@ impl AudioFileFetch {
);

self.session.spawn(receiver);
// tokio::spawn(move |_| receiver);
}
}

@@ -794,13 +802,11 @@ impl AudioFileFetch {
}
}

fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
fn poll_file_data_rx(&mut self) -> Poll<()> {
loop {
match self.file_data_rx.poll() {
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => {
trace!("Ping time estimated as: {} ms.", response_time_ms);

// record the response time
@@ -832,7 +838,7 @@ impl AudioFileFetch {
.ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed);
}
Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
Poll::Ready(Some(ReceivedData::Data(data))) => {
self.output
.as_mut()
.unwrap()
@@ -864,39 +870,34 @@ impl AudioFileFetch {

if full {
self.finish();
return Ok(Async::Ready(()));
return Poll::Ready(());
}
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(()) => unreachable!(),
Poll::Pending => return Poll::Pending,
// Err(()) => unreachable!(),
}
}
}

fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
fn poll_stream_loader_command_rx(&mut self) -> Poll<()> {
loop {
match self.stream_loader_command_rx.poll() {
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
Poll::Ready(None) => return Poll::Ready(()),

Poll::Ready(Some(StreamLoaderCommand::Fetch(request))) => {
self.download_range(request.start, request.length);
}
Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
Poll::Ready(Some(StreamLoaderCommand::RandomAccessMode())) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
Poll::Ready(Some(StreamLoaderCommand::StreamMode())) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
Poll::Ready(Some(StreamLoaderCommand::Close())) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
// Err(()) => unreachable!(),
}
}
}
@@ -911,24 +912,19 @@ impl AudioFileFetch {
}

impl Future for AudioFileFetch {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<(), ()> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
match self.poll_stream_loader_command_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Err(()) => unreachable!(),
Poll::Pending => (),
Poll::Ready(_) => return Poll::Ready(()),
// Err(()) => unreachable!(),
}

match self.poll_file_data_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Err(()) => unreachable!(),
Poll::Pending => (),
Poll::Ready(_) => return Poll::Ready(()),
// Err(()) => unreachable!(),
}

if let DownloadStrategy::Streaming() = self.get_download_strategy() {
@@ -969,7 +965,7 @@ impl Future for AudioFileFetch {
}
}

return Ok(Async::NotReady);
return Poll::Pending;
}
}

13 changes: 7 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,10 +17,10 @@ base64 = "0.13"
byteorder = "1.3"
bytes = "0.4"
error-chain = { version = "0.12", default_features = false }
futures = "0.1"
futures = {version = "0.3",features =["unstable","bilock"]}
httparse = "1.3"
hyper = "0.12"
hyper-proxy = { version = "0.5", default_features = false }
hyper = "0.13"
hyper-proxy = { version = "0.6", default_features = false }
lazy_static = "1.3"
log = "0.4"
num-bigint = "0.3"
@@ -32,9 +32,10 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
shannon = "0.2.0"
tokio-codec = "0.1"
tokio = "0.1"
tokio-io = "0.1"
tokio = {version = "0.2", features = ["full","io-util","tcp"]} # io-util
tokio-util = {version = "0.3", features = ["compat","codec"]}
# tokio-codec = "0.1"
# tokio-io = "0.1"
url = "1.7"
uuid = { version = "0.8", features = ["v4"] }
sha-1 = "0.8"
Loading