azure_core/pageable.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
use futures::stream::unfold;
use futures::Stream;
/// Helper macro for unwrapping `Result`s into the right types
/// that `futures::stream::unfold` expects.
macro_rules! r#try {
($expr:expr $(,)?) => {
match $expr {
Result::Ok(val) => val,
Result::Err(err) => {
return Some((Err(err.into()), State::Done));
}
}
};
}
/// Helper macro for declaring the `Pageable` and `Continuable` types which easily allows
/// for conditionally compiling with a `Send` constraint or not.
macro_rules! declare {
($($extra:tt)*) => {
// The use of a module here is a hack to get around the fact that `pin_project`
// generates a method `project_ref` which is never used and generates a warning.
// The module allows us to declare that `dead_code` is allowed but only for
// the `Pageable` type.
mod pageable {
#![allow(dead_code)]
use super::*;
/// A pageable stream that yields items of type `T`
///
/// Internally uses the Azure specific continuation header to
/// make repeated requests to Azure yielding a new page each time.
#[pin_project::pin_project]
// This is to surpress the unused `project_ref` warning
pub struct Pageable<T, E> {
#[pin]
pub(crate) stream: std::pin::Pin<Box<dyn Stream<Item = Result<T, E>> $($extra)*>>,
}
}
pub use pageable::Pageable;
impl<T, E> Pageable<T, E>
where
T: Continuable,
{
pub fn new<F>(
make_request: impl Fn(Option<T::Continuation>) -> F + Clone $($extra)* + 'static,
) -> Self
where
F: std::future::Future<Output = Result<T, E>> $($extra)* + 'static,
{
let stream = unfold(State::Init, move |state: State<T::Continuation>| {
let make_request = make_request.clone();
async move {
let response = match state {
State::Init => {
let request = make_request(None);
r#try!(request.await)
}
State::Continuation(token) => {
let request = make_request(Some(token));
r#try!(request.await)
}
State::Done => {
return None;
}
};
let next_state = response
.continuation()
.map_or(State::Done, State::Continuation);
Some((Ok(response), next_state))
}
});
Self {
stream: Box::pin(stream),
}
}
}
/// A type that can yield an optional continuation token
pub trait Continuable {
type Continuation: 'static $($extra)*;
fn continuation(&self) -> Option<Self::Continuation>;
}
};
}
#[cfg(not(target_arch = "wasm32"))]
declare!(+ Send);
#[cfg(target_arch = "wasm32")]
declare!();
impl<T, E> Stream for Pageable<T, E> {
type Item = Result<T, E>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.stream.poll_next(cx)
}
}
impl<T, O> std::fmt::Debug for Pageable<T, O> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pageable").finish_non_exhaustive()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum State<T> {
Init,
Continuation(T),
Done,
}