mas_matrix_synapse/
lib.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use std::{collections::HashSet, time::Duration};
8
9use anyhow::{Context, bail};
10use error::SynapseResponseExt;
11use http::{Method, StatusCode};
12use mas_http::RequestBuilderExt as _;
13use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest};
14use serde::{Deserialize, Serialize};
15use tracing::debug;
16use url::Url;
17
18static SYNAPSE_AUTH_PROVIDER: &str = "oauth-delegated";
19
20/// Encountered when trying to register a user ID which has been taken.
21/// — <https://spec.matrix.org/v1.10/client-server-api/#other-error-codes>
22const M_USER_IN_USE: &str = "M_USER_IN_USE";
23/// Encountered when trying to register a user ID which is not valid.
24/// — <https://spec.matrix.org/v1.10/client-server-api/#other-error-codes>
25const M_INVALID_USERNAME: &str = "M_INVALID_USERNAME";
26
27mod error;
28
29#[derive(Clone)]
30pub struct SynapseConnection {
31    homeserver: String,
32    endpoint: Url,
33    access_token: String,
34    http_client: reqwest::Client,
35}
36
37impl SynapseConnection {
38    #[must_use]
39    pub fn new(
40        homeserver: String,
41        endpoint: Url,
42        access_token: String,
43        http_client: reqwest::Client,
44    ) -> Self {
45        Self {
46            homeserver,
47            endpoint,
48            access_token,
49            http_client,
50        }
51    }
52
53    fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
54        self.http_client
55            .request(
56                method,
57                self.endpoint
58                    .join(url)
59                    .map(String::from)
60                    .unwrap_or_default(),
61            )
62            .bearer_auth(&self.access_token)
63    }
64
65    fn post(&self, url: &str) -> reqwest::RequestBuilder {
66        self.builder(Method::POST, url)
67    }
68
69    fn get(&self, url: &str) -> reqwest::RequestBuilder {
70        self.builder(Method::GET, url)
71    }
72
73    fn put(&self, url: &str) -> reqwest::RequestBuilder {
74        self.builder(Method::PUT, url)
75    }
76
77    fn delete(&self, url: &str) -> reqwest::RequestBuilder {
78        self.builder(Method::DELETE, url)
79    }
80}
81
82#[derive(Serialize, Deserialize)]
83struct ExternalID {
84    auth_provider: String,
85    external_id: String,
86}
87
88#[derive(Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90enum ThreePIDMedium {
91    Email,
92    Msisdn,
93}
94
95#[derive(Serialize, Deserialize)]
96struct ThreePID {
97    medium: ThreePIDMedium,
98    address: String,
99}
100
101#[derive(Default, Serialize, Deserialize)]
102struct SynapseUser {
103    #[serde(
104        default,
105        rename = "displayname",
106        skip_serializing_if = "Option::is_none"
107    )]
108    display_name: Option<String>,
109
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    avatar_url: Option<String>,
112
113    #[serde(default, rename = "threepids", skip_serializing_if = "Option::is_none")]
114    three_pids: Option<Vec<ThreePID>>,
115
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    external_ids: Option<Vec<ExternalID>>,
118
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    deactivated: Option<bool>,
121}
122
123#[derive(Deserialize)]
124struct SynapseDeviceListResponse {
125    devices: Vec<SynapseDevice>,
126}
127
128#[derive(Serialize, Deserialize)]
129struct SynapseDevice {
130    device_id: String,
131
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    dehydrated: Option<bool>,
134}
135
136#[derive(Serialize)]
137struct SynapseUpdateDeviceRequest<'a> {
138    display_name: Option<&'a str>,
139}
140
141#[derive(Serialize)]
142struct SynapseDeleteDevicesRequest {
143    devices: Vec<String>,
144}
145
146#[derive(Serialize)]
147struct SetDisplayNameRequest<'a> {
148    displayname: &'a str,
149}
150
151#[derive(Serialize)]
152struct SynapseDeactivateUserRequest {
153    erase: bool,
154}
155
156#[derive(Serialize)]
157struct SynapseAllowCrossSigningResetRequest {}
158
159/// Response body of
160/// `/_synapse/admin/v1/username_available?username={localpart}`
161#[derive(Deserialize)]
162struct UsernameAvailableResponse {
163    available: bool,
164}
165
166#[async_trait::async_trait]
167impl HomeserverConnection for SynapseConnection {
168    fn homeserver(&self) -> &str {
169        &self.homeserver
170    }
171
172    #[tracing::instrument(
173        name = "homeserver.query_user",
174        skip_all,
175        fields(
176            matrix.homeserver = self.homeserver,
177            matrix.mxid = mxid,
178        ),
179        err(Debug),
180    )]
181    async fn query_user(&self, mxid: &str) -> Result<MatrixUser, anyhow::Error> {
182        let encoded_mxid = urlencoding::encode(mxid);
183
184        let response = self
185            .get(&format!("_synapse/admin/v2/users/{encoded_mxid}"))
186            .send_traced()
187            .await
188            .context("Failed to query user from Synapse")?;
189
190        let response = response
191            .error_for_synapse_error()
192            .await
193            .context("Unexpected HTTP response while querying user from Synapse")?;
194
195        let body: SynapseUser = response
196            .json()
197            .await
198            .context("Failed to deserialize response while querying user from Synapse")?;
199
200        Ok(MatrixUser {
201            displayname: body.display_name,
202            avatar_url: body.avatar_url,
203            deactivated: body.deactivated.unwrap_or(false),
204        })
205    }
206
207    #[tracing::instrument(
208        name = "homeserver.is_localpart_available",
209        skip_all,
210        fields(
211            matrix.homeserver = self.homeserver,
212            matrix.localpart = localpart,
213        ),
214        err(Debug),
215    )]
216    async fn is_localpart_available(&self, localpart: &str) -> Result<bool, anyhow::Error> {
217        // Synapse will give us a M_UNKNOWN error if the localpart is not ASCII,
218        // so we bail out early
219        if !localpart.is_ascii() {
220            return Ok(false);
221        }
222
223        let localpart = urlencoding::encode(localpart);
224
225        let response = self
226            .get(&format!(
227                "_synapse/admin/v1/username_available?username={localpart}"
228            ))
229            .send_traced()
230            .await
231            .context("Failed to query localpart availability from Synapse")?;
232
233        match response.error_for_synapse_error().await {
234            Ok(resp) => {
235                let response: UsernameAvailableResponse = resp.json().await.context(
236                    "Unexpected response while querying localpart availability from Synapse",
237                )?;
238
239                Ok(response.available)
240            }
241
242            Err(err)
243                if err.errcode() == Some(M_INVALID_USERNAME)
244                    || err.errcode() == Some(M_USER_IN_USE) =>
245            {
246                debug!(
247                    error = &err as &dyn std::error::Error,
248                    "Localpart is not available"
249                );
250                Ok(false)
251            }
252
253            Err(err) => Err(err).context("Failed to query localpart availability from Synapse"),
254        }
255    }
256
257    #[tracing::instrument(
258        name = "homeserver.provision_user",
259        skip_all,
260        fields(
261            matrix.homeserver = self.homeserver,
262            matrix.mxid = request.mxid(),
263            user.id = request.sub(),
264        ),
265        err(Debug),
266    )]
267    async fn provision_user(&self, request: &ProvisionRequest) -> Result<bool, anyhow::Error> {
268        let mut body = SynapseUser {
269            external_ids: Some(vec![ExternalID {
270                auth_provider: SYNAPSE_AUTH_PROVIDER.to_owned(),
271                external_id: request.sub().to_owned(),
272            }]),
273            ..SynapseUser::default()
274        };
275
276        request
277            .on_displayname(|displayname| {
278                body.display_name = Some(displayname.unwrap_or_default().to_owned());
279            })
280            .on_avatar_url(|avatar_url| {
281                body.avatar_url = Some(avatar_url.unwrap_or_default().to_owned());
282            })
283            .on_emails(|emails| {
284                body.three_pids = Some(
285                    emails
286                        .unwrap_or_default()
287                        .iter()
288                        .map(|email| ThreePID {
289                            medium: ThreePIDMedium::Email,
290                            address: email.clone(),
291                        })
292                        .collect(),
293                );
294            });
295
296        let encoded_mxid = urlencoding::encode(request.mxid());
297        let response = self
298            .put(&format!("_synapse/admin/v2/users/{encoded_mxid}"))
299            .json(&body)
300            .send_traced()
301            .await
302            .context("Failed to provision user in Synapse")?;
303
304        let response = response
305            .error_for_synapse_error()
306            .await
307            .context("Unexpected HTTP response while provisioning user in Synapse")?;
308
309        match response.status() {
310            StatusCode::CREATED => Ok(true),
311            StatusCode::OK => Ok(false),
312            code => bail!("Unexpected HTTP code while provisioning user in Synapse: {code}"),
313        }
314    }
315
316    #[tracing::instrument(
317        name = "homeserver.create_device",
318        skip_all,
319        fields(
320            matrix.homeserver = self.homeserver,
321            matrix.mxid = mxid,
322            matrix.device_id = device_id,
323        ),
324        err(Debug),
325    )]
326    async fn create_device(
327        &self,
328        mxid: &str,
329        device_id: &str,
330        initial_display_name: Option<&str>,
331    ) -> Result<(), anyhow::Error> {
332        let encoded_mxid = urlencoding::encode(mxid);
333
334        let response = self
335            .post(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices"))
336            .json(&SynapseDevice {
337                device_id: device_id.to_owned(),
338                dehydrated: None,
339            })
340            .send_traced()
341            .await
342            .context("Failed to create device in Synapse")?;
343
344        let response = response
345            .error_for_synapse_error()
346            .await
347            .context("Unexpected HTTP response while creating device in Synapse")?;
348
349        if response.status() != StatusCode::CREATED {
350            bail!(
351                "Unexpected HTTP code while creating device in Synapse: {}",
352                response.status()
353            );
354        }
355
356        // It's annoying, but the POST endpoint doesn't let us set the display name
357        // of the device, so we have to do it manually.
358        if let Some(display_name) = initial_display_name {
359            self.update_device_display_name(mxid, device_id, display_name)
360                .await?;
361        }
362
363        Ok(())
364    }
365
366    #[tracing::instrument(
367        name = "homeserver.update_device_display_name",
368        skip_all,
369        fields(
370            matrix.homeserver = self.homeserver,
371            matrix.mxid = mxid,
372            matrix.device_id = device_id,
373        ),
374        err(Debug),
375    )]
376    async fn update_device_display_name(
377        &self,
378        mxid: &str,
379        device_id: &str,
380        display_name: &str,
381    ) -> Result<(), anyhow::Error> {
382        let encoded_mxid = urlencoding::encode(mxid);
383        let device_id = urlencoding::encode(device_id);
384        let response = self
385            .put(&format!(
386                "_synapse/admin/v2/users/{encoded_mxid}/devices/{device_id}"
387            ))
388            .json(&SynapseUpdateDeviceRequest {
389                display_name: Some(display_name),
390            })
391            .send_traced()
392            .await
393            .context("Failed to update device display name in Synapse")?;
394
395        let response = response
396            .error_for_synapse_error()
397            .await
398            .context("Unexpected HTTP response while updating device display name in Synapse")?;
399
400        if response.status() != StatusCode::OK {
401            bail!(
402                "Unexpected HTTP code while updating device display name in Synapse: {}",
403                response.status()
404            );
405        }
406
407        Ok(())
408    }
409
410    #[tracing::instrument(
411        name = "homeserver.delete_device",
412        skip_all,
413        fields(
414            matrix.homeserver = self.homeserver,
415            matrix.mxid = mxid,
416            matrix.device_id = device_id,
417        ),
418        err(Debug),
419    )]
420    async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> {
421        let encoded_mxid = urlencoding::encode(mxid);
422        let encoded_device_id = urlencoding::encode(device_id);
423
424        let response = self
425            .delete(&format!(
426                "_synapse/admin/v2/users/{encoded_mxid}/devices/{encoded_device_id}"
427            ))
428            .send_traced()
429            .await
430            .context("Failed to delete device in Synapse")?;
431
432        let response = response
433            .error_for_synapse_error()
434            .await
435            .context("Unexpected HTTP response while deleting device in Synapse")?;
436
437        if response.status() != StatusCode::OK {
438            bail!(
439                "Unexpected HTTP code while deleting device in Synapse: {}",
440                response.status()
441            );
442        }
443
444        Ok(())
445    }
446
447    #[tracing::instrument(
448        name = "homeserver.sync_devices",
449        skip_all,
450        fields(
451            matrix.homeserver = self.homeserver,
452            matrix.mxid = mxid,
453        ),
454        err(Debug),
455    )]
456    async fn sync_devices(
457        &self,
458        mxid: &str,
459        devices: HashSet<String>,
460    ) -> Result<(), anyhow::Error> {
461        // Get the list of current devices
462        let encoded_mxid = urlencoding::encode(mxid);
463
464        let response = self
465            .get(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices"))
466            .send_traced()
467            .await
468            .context("Failed to query devices from Synapse")?;
469
470        let response = response.error_for_synapse_error().await?;
471
472        if response.status() != StatusCode::OK {
473            bail!(
474                "Unexpected HTTP code while querying devices from Synapse: {}",
475                response.status()
476            );
477        }
478
479        let body: SynapseDeviceListResponse = response
480            .json()
481            .await
482            .context("Failed to parse response while querying devices from Synapse")?;
483
484        let existing_devices: HashSet<String> = body
485            .devices
486            .into_iter()
487            .filter(|d| d.dehydrated != Some(true))
488            .map(|d| d.device_id)
489            .collect();
490
491        // First, delete all the devices that are not needed anymore
492        let to_delete = existing_devices.difference(&devices).cloned().collect();
493
494        let response = self
495            .post(&format!(
496                "_synapse/admin/v2/users/{encoded_mxid}/delete_devices"
497            ))
498            .json(&SynapseDeleteDevicesRequest { devices: to_delete })
499            .send_traced()
500            .await
501            .context("Failed to delete devices from Synapse")?;
502
503        let response = response
504            .error_for_synapse_error()
505            .await
506            .context("Unexpected HTTP response while deleting devices from Synapse")?;
507
508        if response.status() != StatusCode::OK {
509            bail!(
510                "Unexpected HTTP code while deleting devices from Synapse: {}",
511                response.status()
512            );
513        }
514
515        // Then, create the devices that are missing. There is no batching API to do
516        // this, so we do this sequentially, which is fine as the API is idempotent.
517        for device_id in devices.difference(&existing_devices) {
518            self.create_device(mxid, device_id, None).await?;
519        }
520
521        Ok(())
522    }
523
524    #[tracing::instrument(
525        name = "homeserver.delete_user",
526        skip_all,
527        fields(
528            matrix.homeserver = self.homeserver,
529            matrix.mxid = mxid,
530            erase = erase,
531        ),
532        err(Debug),
533    )]
534    async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> {
535        let encoded_mxid = urlencoding::encode(mxid);
536
537        let response = self
538            .post(&format!("_synapse/admin/v1/deactivate/{encoded_mxid}"))
539            .json(&SynapseDeactivateUserRequest { erase })
540            // Deactivation can take a while, so we set a longer timeout
541            .timeout(Duration::from_secs(60 * 5))
542            .send_traced()
543            .await
544            .context("Failed to deactivate user in Synapse")?;
545
546        let response = response
547            .error_for_synapse_error()
548            .await
549            .context("Unexpected HTTP response while deactivating user in Synapse")?;
550
551        if response.status() != StatusCode::OK {
552            bail!(
553                "Unexpected HTTP code while deactivating user in Synapse: {}",
554                response.status()
555            );
556        }
557
558        Ok(())
559    }
560
561    #[tracing::instrument(
562        name = "homeserver.reactivate_user",
563        skip_all,
564        fields(
565            matrix.homeserver = self.homeserver,
566            matrix.mxid = mxid,
567        ),
568        err(Debug),
569    )]
570    async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> {
571        let encoded_mxid = urlencoding::encode(mxid);
572        let response = self
573            .put(&format!("_synapse/admin/v2/users/{encoded_mxid}"))
574            .json(&SynapseUser {
575                deactivated: Some(false),
576                ..SynapseUser::default()
577            })
578            .send_traced()
579            .await
580            .context("Failed to reactivate user in Synapse")?;
581
582        let response = response
583            .error_for_synapse_error()
584            .await
585            .context("Unexpected HTTP response while reactivating user in Synapse")?;
586
587        match response.status() {
588            StatusCode::CREATED | StatusCode::OK => Ok(()),
589            code => bail!("Unexpected HTTP code while reactivating user in Synapse: {code}",),
590        }
591    }
592
593    #[tracing::instrument(
594        name = "homeserver.set_displayname",
595        skip_all,
596        fields(
597            matrix.homeserver = self.homeserver,
598            matrix.mxid = mxid,
599            matrix.displayname = displayname,
600        ),
601        err(Debug),
602    )]
603    async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> {
604        let encoded_mxid = urlencoding::encode(mxid);
605        let response = self
606            .put(&format!(
607                "_matrix/client/v3/profile/{encoded_mxid}/displayname"
608            ))
609            .json(&SetDisplayNameRequest { displayname })
610            .send_traced()
611            .await
612            .context("Failed to set displayname in Synapse")?;
613
614        let response = response
615            .error_for_synapse_error()
616            .await
617            .context("Unexpected HTTP response while setting displayname in Synapse")?;
618
619        if response.status() != StatusCode::OK {
620            bail!(
621                "Unexpected HTTP code while setting displayname in Synapse: {}",
622                response.status()
623            );
624        }
625
626        Ok(())
627    }
628
629    #[tracing::instrument(
630        name = "homeserver.unset_displayname",
631        skip_all,
632        fields(
633            matrix.homeserver = self.homeserver,
634            matrix.mxid = mxid,
635        ),
636        err(Display),
637    )]
638    async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> {
639        self.set_displayname(mxid, "").await
640    }
641
642    #[tracing::instrument(
643        name = "homeserver.allow_cross_signing_reset",
644        skip_all,
645        fields(
646            matrix.homeserver = self.homeserver,
647            matrix.mxid = mxid,
648        ),
649        err(Debug),
650    )]
651    async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> {
652        let encoded_mxid = urlencoding::encode(mxid);
653
654        let response = self
655            .post(&format!(
656                "_synapse/admin/v1/users/{encoded_mxid}/_allow_cross_signing_replacement_without_uia"
657            ))
658            .json(&SynapseAllowCrossSigningResetRequest {})
659            .send_traced()
660            .await
661            .context("Failed to allow cross-signing reset in Synapse")?;
662
663        let response = response
664            .error_for_synapse_error()
665            .await
666            .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?;
667
668        if response.status() != StatusCode::OK {
669            bail!(
670                "Unexpected HTTP code while allowing cross-signing reset in Synapse: {}",
671                response.status(),
672            );
673        }
674
675        Ok(())
676    }
677}