spire_api/agent/delegated_identity.rs
1//! This module provides an API surface to interact with the DelegateIdentity API.
2//! The protobuf definition can be found [here](https://github.com/spiffe/spire-api-sdk/blob/main/proto/spire/api/agent/delegatedidentity/v1/delegatedidentity.proto)
3//!
4//! More information on it's usage can be found in the [SPIFFE docs](https://spiffe.io/docs/latest/deploying/spire_agent/#delegated-identity-api)
5//!
6//! Most importantly, this API cannot be used over the standard endpoint, it must be used over the admin socket.
7//! The admin socket can be configured in the SPIRE agent configuration document.
8
9use crate::proto::spire::api::agent::delegatedidentity::v1::delegated_identity_client::DelegatedIdentityClient as DelegatedIdentityApiClient;
10use crate::proto::spire::api::agent::delegatedidentity::v1::{
11 FetchJwtsviDsRequest, SubscribeToJwtBundlesRequest, SubscribeToJwtBundlesResponse,
12 SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse, SubscribeToX509sviDsRequest,
13 SubscribeToX509sviDsResponse,
14};
15use crate::proto::spire::api::types::Jwtsvid as ProtoJwtSvid;
16use spiffe::endpoint::validate_socket_path;
17use spiffe::{JwtBundle, JwtBundleSet, JwtSvid, TrustDomain, X509Bundle, X509BundleSet, X509Svid};
18use tokio_stream::{Stream, StreamExt};
19
20use crate::selectors::Selector;
21use hyper_util::rt::TokioIo;
22use spiffe::constants::DEFAULT_SVID;
23use spiffe::error::GrpcClientError;
24use std::convert::{Into, TryFrom};
25use std::str::FromStr;
26use tokio::net::UnixStream;
27use tonic::transport::{Endpoint, Uri};
28use tower::service_fn;
29
30/// Name of the environment variable that holds the default socket endpoint path.
31pub const ADMIN_SOCKET_ENV: &str = "SPIRE_ADMIN_ENDPOINT_SOCKET";
32
33/// Gets the endpoint socket endpoint path from the environment variable `ADMIN_SOCKET_ENV`,
34/// as described in [SPIFFE standard](https://github.com/spiffe/spiffe/blob/main/standards/SPIFFE_Workload_Endpoint.md#4-locating-the-endpoint).
35pub fn get_admin_socket_path() -> Option<String> {
36 std::env::var(ADMIN_SOCKET_ENV).ok()
37}
38
39/// Impl for DelegatedIdentity API
40#[derive(Debug, Clone)]
41pub struct DelegatedIdentityClient {
42 client: DelegatedIdentityApiClient<tonic::transport::Channel>,
43}
44
45/// Represents that a delegate attestation request can have one-of
46/// PID (let agent attest PID->selectors) or selectors (delegate has already attested a PID)
47#[derive(Debug, Clone)]
48pub enum DelegateAttestationRequest {
49 /// PID (let agent attest PID->selectors)
50 Pid(i32),
51 /// selectors (delegate has already attested a PID and generated full set of selectors)
52 Selectors(Vec<Selector>),
53}
54
55/// Constructors
56impl DelegatedIdentityClient {
57 const UNIX_PREFIX: &'static str = "unix:";
58 const TONIC_DEFAULT_URI: &'static str = "http://[::]:50051";
59
60 /// Creates a new instance of `DelegatedIdentityClient` by connecting to the specified socket path.
61 ///
62 /// # Arguments
63 ///
64 /// * `path` - The path to the UNIX domain socket, which can optionally start with "unix:".
65 ///
66 /// # Returns
67 ///
68 /// * `Result<Self, ClientError>` - Returns an instance of `DelegatedIdentityClient` if successful, otherwise returns an error.
69 ///
70 /// # Errors
71 ///
72 /// This function will return an error if the provided socket path is invalid or if there are issues connecting.
73 pub async fn new_from_path(path: &str) -> Result<Self, GrpcClientError> {
74 validate_socket_path(path)?;
75
76 // Strip the 'unix:' prefix for tonic compatibility.
77 let stripped_path = path
78 .strip_prefix(Self::UNIX_PREFIX)
79 .unwrap_or(path)
80 .to_string();
81
82 let channel = Endpoint::try_from(Self::TONIC_DEFAULT_URI)?
83 .connect_with_connector(service_fn(move |_: Uri| {
84 let stripped_path = stripped_path.clone();
85 async {
86 // Connect to the UDS socket using the modified path.
87 UnixStream::connect(stripped_path).await.map(TokioIo::new)
88 }
89 }))
90 .await?;
91
92 Ok(DelegatedIdentityClient {
93 client: DelegatedIdentityApiClient::new(channel),
94 })
95 }
96
97 /// Creates a new `DelegatedIdentityClient` using the default socket endpoint address.
98 ///
99 /// Requires that the environment variable `SPIFFE_ENDPOINT_SOCKET` be set with
100 /// the path to the Workload API endpoint socket.
101 ///
102 /// # Errors
103 ///
104 /// The function returns a variant of [`GrpcClientError`] if environment variable is not set or if
105 /// the provided socket path is not valid.
106 pub async fn default() -> Result<Self, GrpcClientError> {
107 let socket_path = match get_admin_socket_path() {
108 None => return Err(GrpcClientError::MissingEndpointSocketPath),
109 Some(s) => s,
110 };
111 Self::new_from_path(socket_path.as_str()).await
112 }
113
114 /// Constructs a new `DelegatedIdentityClient` using the provided Tonic transport channel.
115 ///
116 /// # Arguments
117 ///
118 /// * `conn`: A `tonic::transport::Channel` used for gRPC communication.
119 ///
120 /// # Returns
121 ///
122 /// A `Result` containing a `DelegatedIdentityClient` if successful, or a `ClientError` if an error occurs.
123 pub fn new(conn: tonic::transport::Channel) -> Result<Self, GrpcClientError> {
124 Ok(DelegatedIdentityClient {
125 client: DelegatedIdentityApiClient::new(conn),
126 })
127 }
128}
129
130impl DelegatedIdentityClient {
131 /// Fetches a single X509 SPIFFE Verifiable Identity Document (SVID).
132 ///
133 /// This method connects to the SPIFFE Workload API and returns the first X509 SVID in the response.
134 ///
135 /// # Arguments
136 ///
137 /// * `selectors` - A list of selectors to filter the stream of [`X509Svid`] updates.
138 ///
139 /// # Returns
140 ///
141 /// On success, it returns a valid [`X509Svid`] which represents the parsed SVID.
142 /// If the fetch operation or the parsing fails, it returns a [`GrpcClientError`].
143 ///
144 /// # Errors
145 ///
146 /// Returns [`GrpcClientError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response.
147 pub async fn fetch_x509_svid(
148 &mut self,
149 attest_type: DelegateAttestationRequest,
150 ) -> Result<X509Svid, GrpcClientError> {
151 let request = match attest_type {
152 DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
153 selectors: selectors.into_iter().map(|s| s.into()).collect(),
154 pid: 0,
155 },
156 DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
157 selectors: Vec::default(),
158 pid,
159 },
160 };
161
162 self.client
163 .subscribe_to_x509svi_ds(request)
164 .await?
165 .into_inner()
166 .message()
167 .await?
168 .ok_or(GrpcClientError::EmptyResponse)
169 .and_then(DelegatedIdentityClient::parse_x509_svid_from_grpc_response)
170 }
171
172 /// Watches the stream of [`X509Svid`] updates.
173 ///
174 /// This function establishes a stream with the Workload API to continuously receive updates for the [`X509Svid`].
175 /// The returned stream can be used to asynchronously yield new `X509Svid` updates as they become available.
176 ///
177 /// # Arguments
178 ///
179 /// * `selectors` - A list of selectors to filter the stream of [`X509Svid`] updates.
180 ///
181 /// # Returns
182 ///
183 /// Returns a stream of `Result<X509Svid, ClientError>`. Each item represents an updated [`X509Svid`] or an error if
184 /// there was a problem processing an update from the stream.
185 ///
186 /// # Errors
187 ///
188 /// The function can return an error variant of [`GrpcClientError`] in the following scenarios:
189 ///
190 /// * There's an issue connecting to the Workload API.
191 /// * An error occurs while setting up the stream.
192 ///
193 /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
194 pub async fn stream_x509_svids(
195 &mut self,
196 attest_type: DelegateAttestationRequest,
197 ) -> Result<impl Stream<Item = Result<X509Svid, GrpcClientError>>, GrpcClientError> {
198 let request = match attest_type {
199 DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
200 selectors: selectors.into_iter().map(|s| s.into()).collect(),
201 pid: 0,
202 },
203 DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
204 selectors: Vec::default(),
205 pid,
206 },
207 };
208
209 let response: tonic::Response<tonic::Streaming<SubscribeToX509sviDsResponse>> =
210 self.client.subscribe_to_x509svi_ds(request).await?;
211
212 let stream = response.into_inner().map(|message| {
213 message
214 .map_err(GrpcClientError::from)
215 .and_then(DelegatedIdentityClient::parse_x509_svid_from_grpc_response)
216 });
217
218 Ok(stream)
219 }
220
221 /// Fetches [`X509BundleSet`], that is a set of [`X509Bundle`] keyed by the trust domain to which they belong.
222 ///
223 /// # Errors
224 ///
225 /// The function returns a variant of [`GrpcClientError`] if there is en error connecting to the Workload API or
226 /// there is a problem processing the response.
227 pub async fn fetch_x509_bundles(&mut self) -> Result<X509BundleSet, GrpcClientError> {
228 let request = SubscribeToX509BundlesRequest::default();
229
230 let response: tonic::Response<tonic::Streaming<SubscribeToX509BundlesResponse>> =
231 self.client.subscribe_to_x509_bundles(request).await?;
232 let initial = response.into_inner().message().await?;
233 DelegatedIdentityClient::parse_x509_bundle_set_from_grpc_response(
234 initial.unwrap_or_default(),
235 )
236 }
237
238 /// Watches the stream of [`X509Bundle`] updates.
239 ///
240 /// This function establishes a stream with the Workload API to continuously receive updates for the [`X509Bundle`].
241 /// The returned stream can be used to asynchronously yield new `X509Bundle` updates as they become available.
242 ///
243 /// # Returns
244 ///
245 /// Returns a stream of `Result<X509Bundle, ClientError>`. Each item represents an updated [`X509Bundle`] or an error if
246 /// there was a problem processing an update from the stream.
247 ///
248 /// # Errors
249 ///
250 /// The function can return an error variant of [`GrpcClientError`] in the following scenarios:
251 ///
252 /// * There's an issue connecting to the Admin API.
253 /// * An error occurs while setting up the stream.
254 ///
255 /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
256 pub async fn stream_x509_bundles(
257 &mut self,
258 ) -> Result<impl Stream<Item = Result<X509BundleSet, GrpcClientError>>, GrpcClientError> {
259 let request = SubscribeToX509BundlesRequest::default();
260
261 let response: tonic::Response<tonic::Streaming<SubscribeToX509BundlesResponse>> =
262 self.client.subscribe_to_x509_bundles(request).await?;
263
264 let stream = response.into_inner().map(|message| {
265 message
266 .map_err(GrpcClientError::from)
267 .and_then(DelegatedIdentityClient::parse_x509_bundle_set_from_grpc_response)
268 });
269
270 Ok(stream)
271 }
272
273 /// Fetches a list of [`JwtSvid`] parsing the JWT token in the Workload API response, for the given audience and selectors.
274 ///
275 /// # Arguments
276 ///
277 /// * `audience` - A list of audiences to include in the JWT token. Cannot be empty nor contain only empty strings.
278 /// * `selectors` - A list of selectors to filter the list of [`JwtSvid`].
279 ///
280 /// # Errors
281 ///
282 /// The function returns a variant of [`GrpcClientError`] if there is en error connecting to the Workload API or
283 /// there is a problem processing the response.
284 pub async fn fetch_jwt_svids<T: AsRef<str> + ToString>(
285 &mut self,
286 audience: &[T],
287 attest_type: DelegateAttestationRequest,
288 ) -> Result<Vec<JwtSvid>, GrpcClientError> {
289 let request = match attest_type {
290 DelegateAttestationRequest::Selectors(selectors) => FetchJwtsviDsRequest {
291 audience: audience.iter().map(|s| s.to_string()).collect(),
292 selectors: selectors.into_iter().map(|s| s.into()).collect(),
293 pid: 0,
294 },
295 DelegateAttestationRequest::Pid(pid) => FetchJwtsviDsRequest {
296 audience: audience.iter().map(|s| s.to_string()).collect(),
297 selectors: Vec::default(),
298 pid,
299 },
300 };
301
302 DelegatedIdentityClient::parse_jwt_svid_from_grpc_response(
303 self.client
304 .fetch_jwtsvi_ds(request)
305 .await?
306 .into_inner()
307 .svids,
308 )
309 }
310
311 /// Watches the stream of [`JwtBundleSet`] updates.
312 ///
313 /// This function establishes a stream with the Workload API to continuously receive updates for the [`JwtBundleSet`].
314 /// The returned stream can be used to asynchronously yield new `JwtBundleSet` updates as they become available.
315 ///
316 /// # Returns
317 ///
318 /// Returns a stream of `Result<JwtBundleSet, ClientError>`. Each item represents an updated [`JwtBundleSet`] or an error if
319 /// there was a problem processing an update from the stream.
320 ///
321 /// # Errors
322 ///
323 /// The function can return an error variant of [`GrpcClientError`] in the following scenarios:
324 ///
325 /// * There's an issue connecting to the Workload API.
326 /// * An error occurs while setting up the stream.
327 ///
328 /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
329 pub async fn stream_jwt_bundles(
330 &mut self,
331 ) -> Result<impl Stream<Item = Result<JwtBundleSet, GrpcClientError>>, GrpcClientError> {
332 let request = SubscribeToJwtBundlesRequest::default();
333 let response = self.client.subscribe_to_jwt_bundles(request).await?;
334 Ok(response.into_inner().map(|message| {
335 message
336 .map_err(GrpcClientError::from)
337 .and_then(DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response)
338 }))
339 }
340
341 /// Fetches [`JwtBundleSet`] that is a set of [`JwtBundle`] keyed by the trust domain to which they belong.
342 ///
343 /// # Errors
344 ///
345 /// The function returns a variant of [`GrpcClientError`] if there is en error connecting to the Workload API or
346 /// there is a problem processing the response.
347 pub async fn fetch_jwt_bundles(&mut self) -> Result<JwtBundleSet, GrpcClientError> {
348 let request = SubscribeToJwtBundlesRequest::default();
349 let response = self.client.subscribe_to_jwt_bundles(request).await?;
350 let initial = response.into_inner().message().await?;
351 DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response(
352 initial.ok_or(GrpcClientError::EmptyResponse)?,
353 )
354 }
355}
356
357impl DelegatedIdentityClient {
358 fn parse_x509_svid_from_grpc_response(
359 response: SubscribeToX509sviDsResponse,
360 ) -> Result<X509Svid, GrpcClientError> {
361 let svid = response
362 .x509_svids
363 .get(DEFAULT_SVID)
364 .ok_or(GrpcClientError::EmptyResponse)?;
365
366 let x509_svid = svid
367 .x509_svid
368 .as_ref()
369 .ok_or(GrpcClientError::EmptyResponse)?;
370
371 let total_length = x509_svid.cert_chain.iter().map(|c| c.len()).sum();
372 let mut cert_chain_bytes = Vec::with_capacity(total_length);
373
374 for c in &x509_svid.cert_chain {
375 cert_chain_bytes.extend_from_slice(c);
376 }
377
378 X509Svid::parse_from_der(&cert_chain_bytes, svid.x509_svid_key.as_ref())
379 .map_err(|e| e.into())
380 }
381
382 fn parse_jwt_svid_from_grpc_response(
383 svids: Vec<ProtoJwtSvid>,
384 ) -> Result<Vec<JwtSvid>, GrpcClientError> {
385 let result: Result<Vec<JwtSvid>, GrpcClientError> = svids
386 .iter()
387 .map(|r| JwtSvid::from_str(&r.token).map_err(GrpcClientError::InvalidJwtSvid))
388 .collect();
389 result
390 }
391
392 fn parse_jwt_bundle_set_from_grpc_response(
393 response: SubscribeToJwtBundlesResponse,
394 ) -> Result<JwtBundleSet, GrpcClientError> {
395 let mut bundle_set = JwtBundleSet::new();
396
397 for (td, bundle_data) in response.bundles.into_iter() {
398 let trust_domain = TrustDomain::try_from(td)?;
399 let bundle = JwtBundle::from_jwt_authorities(trust_domain, &bundle_data)
400 .map_err(GrpcClientError::from)?;
401
402 bundle_set.add_bundle(bundle);
403 }
404
405 Ok(bundle_set)
406 }
407
408 fn parse_x509_bundle_set_from_grpc_response(
409 response: SubscribeToX509BundlesResponse,
410 ) -> Result<X509BundleSet, GrpcClientError> {
411 let mut bundle_set = X509BundleSet::new();
412
413 for (td, bundle) in response.ca_certificates.into_iter() {
414 let trust_domain = TrustDomain::try_from(td)?;
415
416 bundle_set.add_bundle(
417 X509Bundle::parse_from_der(trust_domain, &bundle)
418 .map_err(GrpcClientError::InvalidX509Bundle)?,
419 );
420 }
421 Ok(bundle_set)
422 }
423}