spire_api/agent/
delegated_identity.rs

1//! This module provides an API surface to interact with the DelegateIdentity API.
2//! The protobuf definition can be found [here](https://github.com/spiffe/spire-api-sdk/blob/main/proto/spire/api/agent/delegatedidentity/v1/delegatedidentity.proto)
3//!
4//! More information on it's usage can be found in the [SPIFFE docs](https://spiffe.io/docs/latest/deploying/spire_agent/#delegated-identity-api)
5//!
6//! Most importantly, this API cannot be used over the standard endpoint, it must be used over the admin socket.
7//! The admin socket can be configured in the SPIRE agent configuration document.
8
9use crate::proto::spire::api::agent::delegatedidentity::v1::delegated_identity_client::DelegatedIdentityClient as DelegatedIdentityApiClient;
10use crate::proto::spire::api::agent::delegatedidentity::v1::{
11    FetchJwtsviDsRequest, SubscribeToJwtBundlesRequest, SubscribeToJwtBundlesResponse,
12    SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse, SubscribeToX509sviDsRequest,
13    SubscribeToX509sviDsResponse,
14};
15use crate::proto::spire::api::types::Jwtsvid as ProtoJwtSvid;
16use spiffe::endpoint::validate_socket_path;
17use spiffe::{JwtBundle, JwtBundleSet, JwtSvid, TrustDomain, X509Bundle, X509BundleSet, X509Svid};
18use tokio_stream::{Stream, StreamExt};
19
20use crate::selectors::Selector;
21use hyper_util::rt::TokioIo;
22use spiffe::constants::DEFAULT_SVID;
23use spiffe::error::GrpcClientError;
24use std::convert::{Into, TryFrom};
25use std::str::FromStr;
26use tokio::net::UnixStream;
27use tonic::transport::{Endpoint, Uri};
28use tower::service_fn;
29
30/// Name of the environment variable that holds the default socket endpoint path.
31pub const ADMIN_SOCKET_ENV: &str = "SPIRE_ADMIN_ENDPOINT_SOCKET";
32
33/// Gets the endpoint socket endpoint path from the environment variable `ADMIN_SOCKET_ENV`,
34/// as described in [SPIFFE standard](https://github.com/spiffe/spiffe/blob/main/standards/SPIFFE_Workload_Endpoint.md#4-locating-the-endpoint).
35pub fn get_admin_socket_path() -> Option<String> {
36    std::env::var(ADMIN_SOCKET_ENV).ok()
37}
38
39/// Impl for DelegatedIdentity API
40#[derive(Debug, Clone)]
41pub struct DelegatedIdentityClient {
42    client: DelegatedIdentityApiClient<tonic::transport::Channel>,
43}
44
45/// Represents that a delegate attestation request can have one-of
46/// PID (let agent attest PID->selectors) or selectors (delegate has already attested a PID)
47#[derive(Debug, Clone)]
48pub enum DelegateAttestationRequest {
49    /// PID (let agent attest PID->selectors)
50    Pid(i32),
51    /// selectors (delegate has already attested a PID and generated full set of selectors)
52    Selectors(Vec<Selector>),
53}
54
55/// Constructors
56impl DelegatedIdentityClient {
57    const UNIX_PREFIX: &'static str = "unix:";
58    const TONIC_DEFAULT_URI: &'static str = "http://[::]:50051";
59
60    /// Creates a new instance of `DelegatedIdentityClient` by connecting to the specified socket path.
61    ///
62    /// # Arguments
63    ///
64    /// * `path` - The path to the UNIX domain socket, which can optionally start with "unix:".
65    ///
66    /// # Returns
67    ///
68    /// * `Result<Self, ClientError>` - Returns an instance of `DelegatedIdentityClient` if successful, otherwise returns an error.
69    ///
70    /// # Errors
71    ///
72    /// This function will return an error if the provided socket path is invalid or if there are issues connecting.
73    pub async fn new_from_path(path: &str) -> Result<Self, GrpcClientError> {
74        validate_socket_path(path)?;
75
76        // Strip the 'unix:' prefix for tonic compatibility.
77        let stripped_path = path
78            .strip_prefix(Self::UNIX_PREFIX)
79            .unwrap_or(path)
80            .to_string();
81
82        let channel = Endpoint::try_from(Self::TONIC_DEFAULT_URI)?
83            .connect_with_connector(service_fn(move |_: Uri| {
84                let stripped_path = stripped_path.clone();
85                async {
86                    // Connect to the UDS socket using the modified path.
87                    UnixStream::connect(stripped_path).await.map(TokioIo::new)
88                }
89            }))
90            .await?;
91
92        Ok(DelegatedIdentityClient {
93            client: DelegatedIdentityApiClient::new(channel),
94        })
95    }
96
97    /// Creates a new `DelegatedIdentityClient` using the default socket endpoint address.
98    ///
99    /// Requires that the environment variable `SPIFFE_ENDPOINT_SOCKET` be set with
100    /// the path to the Workload API endpoint socket.
101    ///
102    /// # Errors
103    ///
104    /// The function returns a variant of [`GrpcClientError`] if environment variable is not set or if
105    /// the provided socket path is not valid.
106    pub async fn default() -> Result<Self, GrpcClientError> {
107        let socket_path = match get_admin_socket_path() {
108            None => return Err(GrpcClientError::MissingEndpointSocketPath),
109            Some(s) => s,
110        };
111        Self::new_from_path(socket_path.as_str()).await
112    }
113
114    /// Constructs a new `DelegatedIdentityClient` using the provided Tonic transport channel.
115    ///
116    /// # Arguments
117    ///
118    /// * `conn`: A `tonic::transport::Channel` used for gRPC communication.
119    ///
120    /// # Returns
121    ///
122    /// A `Result` containing a `DelegatedIdentityClient` if successful, or a `ClientError` if an error occurs.
123    pub fn new(conn: tonic::transport::Channel) -> Result<Self, GrpcClientError> {
124        Ok(DelegatedIdentityClient {
125            client: DelegatedIdentityApiClient::new(conn),
126        })
127    }
128}
129
130impl DelegatedIdentityClient {
131    /// Fetches a single X509 SPIFFE Verifiable Identity Document (SVID).
132    ///
133    /// This method connects to the SPIFFE Workload API and returns the first X509 SVID in the response.
134    ///
135    /// # Arguments
136    ///
137    /// * `selectors` - A list of selectors to filter the stream of [`X509Svid`] updates.
138    ///
139    /// # Returns
140    ///
141    /// On success, it returns a valid [`X509Svid`] which represents the parsed SVID.
142    /// If the fetch operation or the parsing fails, it returns a [`GrpcClientError`].
143    ///
144    /// # Errors
145    ///
146    /// Returns [`GrpcClientError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response.
147    pub async fn fetch_x509_svid(
148        &mut self,
149        attest_type: DelegateAttestationRequest,
150    ) -> Result<X509Svid, GrpcClientError> {
151        let request = match attest_type {
152            DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
153                selectors: selectors.into_iter().map(|s| s.into()).collect(),
154                pid: 0,
155            },
156            DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
157                selectors: Vec::default(),
158                pid,
159            },
160        };
161
162        self.client
163            .subscribe_to_x509svi_ds(request)
164            .await?
165            .into_inner()
166            .message()
167            .await?
168            .ok_or(GrpcClientError::EmptyResponse)
169            .and_then(DelegatedIdentityClient::parse_x509_svid_from_grpc_response)
170    }
171
172    /// Watches the stream of [`X509Svid`] updates.
173    ///
174    /// This function establishes a stream with the Workload API to continuously receive updates for the [`X509Svid`].
175    /// The returned stream can be used to asynchronously yield new `X509Svid` updates as they become available.
176    ///
177    /// # Arguments
178    ///
179    /// * `selectors` - A list of selectors to filter the stream of [`X509Svid`] updates.
180    ///
181    /// # Returns
182    ///
183    /// Returns a stream of `Result<X509Svid, ClientError>`. Each item represents an updated [`X509Svid`] or an error if
184    /// there was a problem processing an update from the stream.
185    ///
186    /// # Errors
187    ///
188    /// The function can return an error variant of [`GrpcClientError`] in the following scenarios:
189    ///
190    /// * There's an issue connecting to the Workload API.
191    /// * An error occurs while setting up the stream.
192    ///
193    /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
194    pub async fn stream_x509_svids(
195        &mut self,
196        attest_type: DelegateAttestationRequest,
197    ) -> Result<impl Stream<Item = Result<X509Svid, GrpcClientError>>, GrpcClientError> {
198        let request = match attest_type {
199            DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
200                selectors: selectors.into_iter().map(|s| s.into()).collect(),
201                pid: 0,
202            },
203            DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
204                selectors: Vec::default(),
205                pid,
206            },
207        };
208
209        let response: tonic::Response<tonic::Streaming<SubscribeToX509sviDsResponse>> =
210            self.client.subscribe_to_x509svi_ds(request).await?;
211
212        let stream = response.into_inner().map(|message| {
213            message
214                .map_err(GrpcClientError::from)
215                .and_then(DelegatedIdentityClient::parse_x509_svid_from_grpc_response)
216        });
217
218        Ok(stream)
219    }
220
221    /// Fetches [`X509BundleSet`], that is a set of [`X509Bundle`] keyed by the trust domain to which they belong.
222    ///
223    /// # Errors
224    ///
225    /// The function returns a variant of [`GrpcClientError`] if there is en error connecting to the Workload API or
226    /// there is a problem processing the response.
227    pub async fn fetch_x509_bundles(&mut self) -> Result<X509BundleSet, GrpcClientError> {
228        let request = SubscribeToX509BundlesRequest::default();
229
230        let response: tonic::Response<tonic::Streaming<SubscribeToX509BundlesResponse>> =
231            self.client.subscribe_to_x509_bundles(request).await?;
232        let initial = response.into_inner().message().await?;
233        DelegatedIdentityClient::parse_x509_bundle_set_from_grpc_response(
234            initial.unwrap_or_default(),
235        )
236    }
237
238    /// Watches the stream of [`X509Bundle`] updates.
239    ///
240    /// This function establishes a stream with the Workload API to continuously receive updates for the [`X509Bundle`].
241    /// The returned stream can be used to asynchronously yield new `X509Bundle` updates as they become available.
242    ///
243    /// # Returns
244    ///
245    /// Returns a stream of `Result<X509Bundle, ClientError>`. Each item represents an updated [`X509Bundle`] or an error if
246    /// there was a problem processing an update from the stream.
247    ///
248    /// # Errors
249    ///
250    /// The function can return an error variant of [`GrpcClientError`] in the following scenarios:
251    ///
252    /// * There's an issue connecting to the Admin API.
253    /// * An error occurs while setting up the stream.
254    ///
255    /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
256    pub async fn stream_x509_bundles(
257        &mut self,
258    ) -> Result<impl Stream<Item = Result<X509BundleSet, GrpcClientError>>, GrpcClientError> {
259        let request = SubscribeToX509BundlesRequest::default();
260
261        let response: tonic::Response<tonic::Streaming<SubscribeToX509BundlesResponse>> =
262            self.client.subscribe_to_x509_bundles(request).await?;
263
264        let stream = response.into_inner().map(|message| {
265            message
266                .map_err(GrpcClientError::from)
267                .and_then(DelegatedIdentityClient::parse_x509_bundle_set_from_grpc_response)
268        });
269
270        Ok(stream)
271    }
272
273    /// Fetches a list of [`JwtSvid`] parsing the JWT token in the Workload API response, for the given audience and selectors.
274    ///
275    /// # Arguments
276    ///
277    /// * `audience`  - A list of audiences to include in the JWT token. Cannot be empty nor contain only empty strings.
278    /// * `selectors` - A list of selectors to filter the list of [`JwtSvid`].
279    ///
280    /// # Errors
281    ///
282    /// The function returns a variant of [`GrpcClientError`] if there is en error connecting to the Workload API or
283    /// there is a problem processing the response.
284    pub async fn fetch_jwt_svids<T: AsRef<str> + ToString>(
285        &mut self,
286        audience: &[T],
287        attest_type: DelegateAttestationRequest,
288    ) -> Result<Vec<JwtSvid>, GrpcClientError> {
289        let request = match attest_type {
290            DelegateAttestationRequest::Selectors(selectors) => FetchJwtsviDsRequest {
291                audience: audience.iter().map(|s| s.to_string()).collect(),
292                selectors: selectors.into_iter().map(|s| s.into()).collect(),
293                pid: 0,
294            },
295            DelegateAttestationRequest::Pid(pid) => FetchJwtsviDsRequest {
296                audience: audience.iter().map(|s| s.to_string()).collect(),
297                selectors: Vec::default(),
298                pid,
299            },
300        };
301
302        DelegatedIdentityClient::parse_jwt_svid_from_grpc_response(
303            self.client
304                .fetch_jwtsvi_ds(request)
305                .await?
306                .into_inner()
307                .svids,
308        )
309    }
310
311    /// Watches the stream of [`JwtBundleSet`] updates.
312    ///
313    /// This function establishes a stream with the Workload API to continuously receive updates for the [`JwtBundleSet`].
314    /// The returned stream can be used to asynchronously yield new `JwtBundleSet` updates as they become available.
315    ///
316    /// # Returns
317    ///
318    /// Returns a stream of `Result<JwtBundleSet, ClientError>`. Each item represents an updated [`JwtBundleSet`] or an error if
319    /// there was a problem processing an update from the stream.
320    ///
321    /// # Errors
322    ///
323    /// The function can return an error variant of [`GrpcClientError`] in the following scenarios:
324    ///
325    /// * There's an issue connecting to the Workload API.
326    /// * An error occurs while setting up the stream.
327    ///
328    /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
329    pub async fn stream_jwt_bundles(
330        &mut self,
331    ) -> Result<impl Stream<Item = Result<JwtBundleSet, GrpcClientError>>, GrpcClientError> {
332        let request = SubscribeToJwtBundlesRequest::default();
333        let response = self.client.subscribe_to_jwt_bundles(request).await?;
334        Ok(response.into_inner().map(|message| {
335            message
336                .map_err(GrpcClientError::from)
337                .and_then(DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response)
338        }))
339    }
340
341    /// Fetches [`JwtBundleSet`] that is a set of [`JwtBundle`] keyed by the trust domain to which they belong.
342    ///
343    /// # Errors
344    ///
345    /// The function returns a variant of [`GrpcClientError`] if there is en error connecting to the Workload API or
346    /// there is a problem processing the response.
347    pub async fn fetch_jwt_bundles(&mut self) -> Result<JwtBundleSet, GrpcClientError> {
348        let request = SubscribeToJwtBundlesRequest::default();
349        let response = self.client.subscribe_to_jwt_bundles(request).await?;
350        let initial = response.into_inner().message().await?;
351        DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response(
352            initial.ok_or(GrpcClientError::EmptyResponse)?,
353        )
354    }
355}
356
357impl DelegatedIdentityClient {
358    fn parse_x509_svid_from_grpc_response(
359        response: SubscribeToX509sviDsResponse,
360    ) -> Result<X509Svid, GrpcClientError> {
361        let svid = response
362            .x509_svids
363            .get(DEFAULT_SVID)
364            .ok_or(GrpcClientError::EmptyResponse)?;
365
366        let x509_svid = svid
367            .x509_svid
368            .as_ref()
369            .ok_or(GrpcClientError::EmptyResponse)?;
370
371        let total_length = x509_svid.cert_chain.iter().map(|c| c.len()).sum();
372        let mut cert_chain_bytes = Vec::with_capacity(total_length);
373
374        for c in &x509_svid.cert_chain {
375            cert_chain_bytes.extend_from_slice(c);
376        }
377
378        X509Svid::parse_from_der(&cert_chain_bytes, svid.x509_svid_key.as_ref())
379            .map_err(|e| e.into())
380    }
381
382    fn parse_jwt_svid_from_grpc_response(
383        svids: Vec<ProtoJwtSvid>,
384    ) -> Result<Vec<JwtSvid>, GrpcClientError> {
385        let result: Result<Vec<JwtSvid>, GrpcClientError> = svids
386            .iter()
387            .map(|r| JwtSvid::from_str(&r.token).map_err(GrpcClientError::InvalidJwtSvid))
388            .collect();
389        result
390    }
391
392    fn parse_jwt_bundle_set_from_grpc_response(
393        response: SubscribeToJwtBundlesResponse,
394    ) -> Result<JwtBundleSet, GrpcClientError> {
395        let mut bundle_set = JwtBundleSet::new();
396
397        for (td, bundle_data) in response.bundles.into_iter() {
398            let trust_domain = TrustDomain::try_from(td)?;
399            let bundle = JwtBundle::from_jwt_authorities(trust_domain, &bundle_data)
400                .map_err(GrpcClientError::from)?;
401
402            bundle_set.add_bundle(bundle);
403        }
404
405        Ok(bundle_set)
406    }
407
408    fn parse_x509_bundle_set_from_grpc_response(
409        response: SubscribeToX509BundlesResponse,
410    ) -> Result<X509BundleSet, GrpcClientError> {
411        let mut bundle_set = X509BundleSet::new();
412
413        for (td, bundle) in response.ca_certificates.into_iter() {
414            let trust_domain = TrustDomain::try_from(td)?;
415
416            bundle_set.add_bundle(
417                X509Bundle::parse_from_der(trust_domain, &bundle)
418                    .map_err(GrpcClientError::InvalidX509Bundle)?,
419            );
420        }
421        Ok(bundle_set)
422    }
423}