1use std::{collections::HashSet, time::Duration};
8
9use anyhow::{Context, bail};
10use error::SynapseResponseExt;
11use http::{Method, StatusCode};
12use mas_http::RequestBuilderExt as _;
13use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest};
14use serde::{Deserialize, Serialize};
15use tracing::debug;
16use url::Url;
17
18static SYNAPSE_AUTH_PROVIDER: &str = "oauth-delegated";
19
20const M_USER_IN_USE: &str = "M_USER_IN_USE";
23const M_INVALID_USERNAME: &str = "M_INVALID_USERNAME";
26
27mod error;
28
29#[derive(Clone)]
30pub struct SynapseConnection {
31 homeserver: String,
32 endpoint: Url,
33 access_token: String,
34 http_client: reqwest::Client,
35}
36
37impl SynapseConnection {
38 #[must_use]
39 pub fn new(
40 homeserver: String,
41 endpoint: Url,
42 access_token: String,
43 http_client: reqwest::Client,
44 ) -> Self {
45 Self {
46 homeserver,
47 endpoint,
48 access_token,
49 http_client,
50 }
51 }
52
53 fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
54 self.http_client
55 .request(
56 method,
57 self.endpoint
58 .join(url)
59 .map(String::from)
60 .unwrap_or_default(),
61 )
62 .bearer_auth(&self.access_token)
63 }
64
65 fn post(&self, url: &str) -> reqwest::RequestBuilder {
66 self.builder(Method::POST, url)
67 }
68
69 fn get(&self, url: &str) -> reqwest::RequestBuilder {
70 self.builder(Method::GET, url)
71 }
72
73 fn put(&self, url: &str) -> reqwest::RequestBuilder {
74 self.builder(Method::PUT, url)
75 }
76
77 fn delete(&self, url: &str) -> reqwest::RequestBuilder {
78 self.builder(Method::DELETE, url)
79 }
80}
81
82#[derive(Serialize, Deserialize)]
83struct ExternalID {
84 auth_provider: String,
85 external_id: String,
86}
87
88#[derive(Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90enum ThreePIDMedium {
91 Email,
92 Msisdn,
93}
94
95#[derive(Serialize, Deserialize)]
96struct ThreePID {
97 medium: ThreePIDMedium,
98 address: String,
99}
100
101#[derive(Default, Serialize, Deserialize)]
102struct SynapseUser {
103 #[serde(
104 default,
105 rename = "displayname",
106 skip_serializing_if = "Option::is_none"
107 )]
108 display_name: Option<String>,
109
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 avatar_url: Option<String>,
112
113 #[serde(default, rename = "threepids", skip_serializing_if = "Option::is_none")]
114 three_pids: Option<Vec<ThreePID>>,
115
116 #[serde(default, skip_serializing_if = "Option::is_none")]
117 external_ids: Option<Vec<ExternalID>>,
118
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 deactivated: Option<bool>,
121}
122
123#[derive(Deserialize)]
124struct SynapseDeviceListResponse {
125 devices: Vec<SynapseDevice>,
126}
127
128#[derive(Serialize, Deserialize)]
129struct SynapseDevice {
130 device_id: String,
131
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 dehydrated: Option<bool>,
134}
135
136#[derive(Serialize)]
137struct SynapseUpdateDeviceRequest<'a> {
138 display_name: Option<&'a str>,
139}
140
141#[derive(Serialize)]
142struct SynapseDeleteDevicesRequest {
143 devices: Vec<String>,
144}
145
146#[derive(Serialize)]
147struct SetDisplayNameRequest<'a> {
148 displayname: &'a str,
149}
150
151#[derive(Serialize)]
152struct SynapseDeactivateUserRequest {
153 erase: bool,
154}
155
156#[derive(Serialize)]
157struct SynapseAllowCrossSigningResetRequest {}
158
159#[derive(Deserialize)]
162struct UsernameAvailableResponse {
163 available: bool,
164}
165
166#[async_trait::async_trait]
167impl HomeserverConnection for SynapseConnection {
168 fn homeserver(&self) -> &str {
169 &self.homeserver
170 }
171
172 #[tracing::instrument(
173 name = "homeserver.query_user",
174 skip_all,
175 fields(
176 matrix.homeserver = self.homeserver,
177 matrix.mxid = mxid,
178 ),
179 err(Debug),
180 )]
181 async fn query_user(&self, mxid: &str) -> Result<MatrixUser, anyhow::Error> {
182 let encoded_mxid = urlencoding::encode(mxid);
183
184 let response = self
185 .get(&format!("_synapse/admin/v2/users/{encoded_mxid}"))
186 .send_traced()
187 .await
188 .context("Failed to query user from Synapse")?;
189
190 let response = response
191 .error_for_synapse_error()
192 .await
193 .context("Unexpected HTTP response while querying user from Synapse")?;
194
195 let body: SynapseUser = response
196 .json()
197 .await
198 .context("Failed to deserialize response while querying user from Synapse")?;
199
200 Ok(MatrixUser {
201 displayname: body.display_name,
202 avatar_url: body.avatar_url,
203 deactivated: body.deactivated.unwrap_or(false),
204 })
205 }
206
207 #[tracing::instrument(
208 name = "homeserver.is_localpart_available",
209 skip_all,
210 fields(
211 matrix.homeserver = self.homeserver,
212 matrix.localpart = localpart,
213 ),
214 err(Debug),
215 )]
216 async fn is_localpart_available(&self, localpart: &str) -> Result<bool, anyhow::Error> {
217 if !localpart.is_ascii() {
220 return Ok(false);
221 }
222
223 let localpart = urlencoding::encode(localpart);
224
225 let response = self
226 .get(&format!(
227 "_synapse/admin/v1/username_available?username={localpart}"
228 ))
229 .send_traced()
230 .await
231 .context("Failed to query localpart availability from Synapse")?;
232
233 match response.error_for_synapse_error().await {
234 Ok(resp) => {
235 let response: UsernameAvailableResponse = resp.json().await.context(
236 "Unexpected response while querying localpart availability from Synapse",
237 )?;
238
239 Ok(response.available)
240 }
241
242 Err(err)
243 if err.errcode() == Some(M_INVALID_USERNAME)
244 || err.errcode() == Some(M_USER_IN_USE) =>
245 {
246 debug!(
247 error = &err as &dyn std::error::Error,
248 "Localpart is not available"
249 );
250 Ok(false)
251 }
252
253 Err(err) => Err(err).context("Failed to query localpart availability from Synapse"),
254 }
255 }
256
257 #[tracing::instrument(
258 name = "homeserver.provision_user",
259 skip_all,
260 fields(
261 matrix.homeserver = self.homeserver,
262 matrix.mxid = request.mxid(),
263 user.id = request.sub(),
264 ),
265 err(Debug),
266 )]
267 async fn provision_user(&self, request: &ProvisionRequest) -> Result<bool, anyhow::Error> {
268 let mut body = SynapseUser {
269 external_ids: Some(vec![ExternalID {
270 auth_provider: SYNAPSE_AUTH_PROVIDER.to_owned(),
271 external_id: request.sub().to_owned(),
272 }]),
273 ..SynapseUser::default()
274 };
275
276 request
277 .on_displayname(|displayname| {
278 body.display_name = Some(displayname.unwrap_or_default().to_owned());
279 })
280 .on_avatar_url(|avatar_url| {
281 body.avatar_url = Some(avatar_url.unwrap_or_default().to_owned());
282 })
283 .on_emails(|emails| {
284 body.three_pids = Some(
285 emails
286 .unwrap_or_default()
287 .iter()
288 .map(|email| ThreePID {
289 medium: ThreePIDMedium::Email,
290 address: email.clone(),
291 })
292 .collect(),
293 );
294 });
295
296 let encoded_mxid = urlencoding::encode(request.mxid());
297 let response = self
298 .put(&format!("_synapse/admin/v2/users/{encoded_mxid}"))
299 .json(&body)
300 .send_traced()
301 .await
302 .context("Failed to provision user in Synapse")?;
303
304 let response = response
305 .error_for_synapse_error()
306 .await
307 .context("Unexpected HTTP response while provisioning user in Synapse")?;
308
309 match response.status() {
310 StatusCode::CREATED => Ok(true),
311 StatusCode::OK => Ok(false),
312 code => bail!("Unexpected HTTP code while provisioning user in Synapse: {code}"),
313 }
314 }
315
316 #[tracing::instrument(
317 name = "homeserver.create_device",
318 skip_all,
319 fields(
320 matrix.homeserver = self.homeserver,
321 matrix.mxid = mxid,
322 matrix.device_id = device_id,
323 ),
324 err(Debug),
325 )]
326 async fn create_device(
327 &self,
328 mxid: &str,
329 device_id: &str,
330 initial_display_name: Option<&str>,
331 ) -> Result<(), anyhow::Error> {
332 let encoded_mxid = urlencoding::encode(mxid);
333
334 let response = self
335 .post(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices"))
336 .json(&SynapseDevice {
337 device_id: device_id.to_owned(),
338 dehydrated: None,
339 })
340 .send_traced()
341 .await
342 .context("Failed to create device in Synapse")?;
343
344 let response = response
345 .error_for_synapse_error()
346 .await
347 .context("Unexpected HTTP response while creating device in Synapse")?;
348
349 if response.status() != StatusCode::CREATED {
350 bail!(
351 "Unexpected HTTP code while creating device in Synapse: {}",
352 response.status()
353 );
354 }
355
356 if let Some(display_name) = initial_display_name {
359 self.update_device_display_name(mxid, device_id, display_name)
360 .await?;
361 }
362
363 Ok(())
364 }
365
366 #[tracing::instrument(
367 name = "homeserver.update_device_display_name",
368 skip_all,
369 fields(
370 matrix.homeserver = self.homeserver,
371 matrix.mxid = mxid,
372 matrix.device_id = device_id,
373 ),
374 err(Debug),
375 )]
376 async fn update_device_display_name(
377 &self,
378 mxid: &str,
379 device_id: &str,
380 display_name: &str,
381 ) -> Result<(), anyhow::Error> {
382 let encoded_mxid = urlencoding::encode(mxid);
383 let device_id = urlencoding::encode(device_id);
384 let response = self
385 .put(&format!(
386 "_synapse/admin/v2/users/{encoded_mxid}/devices/{device_id}"
387 ))
388 .json(&SynapseUpdateDeviceRequest {
389 display_name: Some(display_name),
390 })
391 .send_traced()
392 .await
393 .context("Failed to update device display name in Synapse")?;
394
395 let response = response
396 .error_for_synapse_error()
397 .await
398 .context("Unexpected HTTP response while updating device display name in Synapse")?;
399
400 if response.status() != StatusCode::OK {
401 bail!(
402 "Unexpected HTTP code while updating device display name in Synapse: {}",
403 response.status()
404 );
405 }
406
407 Ok(())
408 }
409
410 #[tracing::instrument(
411 name = "homeserver.delete_device",
412 skip_all,
413 fields(
414 matrix.homeserver = self.homeserver,
415 matrix.mxid = mxid,
416 matrix.device_id = device_id,
417 ),
418 err(Debug),
419 )]
420 async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> {
421 let encoded_mxid = urlencoding::encode(mxid);
422 let encoded_device_id = urlencoding::encode(device_id);
423
424 let response = self
425 .delete(&format!(
426 "_synapse/admin/v2/users/{encoded_mxid}/devices/{encoded_device_id}"
427 ))
428 .send_traced()
429 .await
430 .context("Failed to delete device in Synapse")?;
431
432 let response = response
433 .error_for_synapse_error()
434 .await
435 .context("Unexpected HTTP response while deleting device in Synapse")?;
436
437 if response.status() != StatusCode::OK {
438 bail!(
439 "Unexpected HTTP code while deleting device in Synapse: {}",
440 response.status()
441 );
442 }
443
444 Ok(())
445 }
446
447 #[tracing::instrument(
448 name = "homeserver.sync_devices",
449 skip_all,
450 fields(
451 matrix.homeserver = self.homeserver,
452 matrix.mxid = mxid,
453 ),
454 err(Debug),
455 )]
456 async fn sync_devices(
457 &self,
458 mxid: &str,
459 devices: HashSet<String>,
460 ) -> Result<(), anyhow::Error> {
461 let encoded_mxid = urlencoding::encode(mxid);
463
464 let response = self
465 .get(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices"))
466 .send_traced()
467 .await
468 .context("Failed to query devices from Synapse")?;
469
470 let response = response.error_for_synapse_error().await?;
471
472 if response.status() != StatusCode::OK {
473 bail!(
474 "Unexpected HTTP code while querying devices from Synapse: {}",
475 response.status()
476 );
477 }
478
479 let body: SynapseDeviceListResponse = response
480 .json()
481 .await
482 .context("Failed to parse response while querying devices from Synapse")?;
483
484 let existing_devices: HashSet<String> = body
485 .devices
486 .into_iter()
487 .filter(|d| d.dehydrated != Some(true))
488 .map(|d| d.device_id)
489 .collect();
490
491 let to_delete = existing_devices.difference(&devices).cloned().collect();
493
494 let response = self
495 .post(&format!(
496 "_synapse/admin/v2/users/{encoded_mxid}/delete_devices"
497 ))
498 .json(&SynapseDeleteDevicesRequest { devices: to_delete })
499 .send_traced()
500 .await
501 .context("Failed to delete devices from Synapse")?;
502
503 let response = response
504 .error_for_synapse_error()
505 .await
506 .context("Unexpected HTTP response while deleting devices from Synapse")?;
507
508 if response.status() != StatusCode::OK {
509 bail!(
510 "Unexpected HTTP code while deleting devices from Synapse: {}",
511 response.status()
512 );
513 }
514
515 for device_id in devices.difference(&existing_devices) {
518 self.create_device(mxid, device_id, None).await?;
519 }
520
521 Ok(())
522 }
523
524 #[tracing::instrument(
525 name = "homeserver.delete_user",
526 skip_all,
527 fields(
528 matrix.homeserver = self.homeserver,
529 matrix.mxid = mxid,
530 erase = erase,
531 ),
532 err(Debug),
533 )]
534 async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> {
535 let encoded_mxid = urlencoding::encode(mxid);
536
537 let response = self
538 .post(&format!("_synapse/admin/v1/deactivate/{encoded_mxid}"))
539 .json(&SynapseDeactivateUserRequest { erase })
540 .timeout(Duration::from_secs(60 * 5))
542 .send_traced()
543 .await
544 .context("Failed to deactivate user in Synapse")?;
545
546 let response = response
547 .error_for_synapse_error()
548 .await
549 .context("Unexpected HTTP response while deactivating user in Synapse")?;
550
551 if response.status() != StatusCode::OK {
552 bail!(
553 "Unexpected HTTP code while deactivating user in Synapse: {}",
554 response.status()
555 );
556 }
557
558 Ok(())
559 }
560
561 #[tracing::instrument(
562 name = "homeserver.reactivate_user",
563 skip_all,
564 fields(
565 matrix.homeserver = self.homeserver,
566 matrix.mxid = mxid,
567 ),
568 err(Debug),
569 )]
570 async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> {
571 let encoded_mxid = urlencoding::encode(mxid);
572 let response = self
573 .put(&format!("_synapse/admin/v2/users/{encoded_mxid}"))
574 .json(&SynapseUser {
575 deactivated: Some(false),
576 ..SynapseUser::default()
577 })
578 .send_traced()
579 .await
580 .context("Failed to reactivate user in Synapse")?;
581
582 let response = response
583 .error_for_synapse_error()
584 .await
585 .context("Unexpected HTTP response while reactivating user in Synapse")?;
586
587 match response.status() {
588 StatusCode::CREATED | StatusCode::OK => Ok(()),
589 code => bail!("Unexpected HTTP code while reactivating user in Synapse: {code}",),
590 }
591 }
592
593 #[tracing::instrument(
594 name = "homeserver.set_displayname",
595 skip_all,
596 fields(
597 matrix.homeserver = self.homeserver,
598 matrix.mxid = mxid,
599 matrix.displayname = displayname,
600 ),
601 err(Debug),
602 )]
603 async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> {
604 let encoded_mxid = urlencoding::encode(mxid);
605 let response = self
606 .put(&format!(
607 "_matrix/client/v3/profile/{encoded_mxid}/displayname"
608 ))
609 .json(&SetDisplayNameRequest { displayname })
610 .send_traced()
611 .await
612 .context("Failed to set displayname in Synapse")?;
613
614 let response = response
615 .error_for_synapse_error()
616 .await
617 .context("Unexpected HTTP response while setting displayname in Synapse")?;
618
619 if response.status() != StatusCode::OK {
620 bail!(
621 "Unexpected HTTP code while setting displayname in Synapse: {}",
622 response.status()
623 );
624 }
625
626 Ok(())
627 }
628
629 #[tracing::instrument(
630 name = "homeserver.unset_displayname",
631 skip_all,
632 fields(
633 matrix.homeserver = self.homeserver,
634 matrix.mxid = mxid,
635 ),
636 err(Display),
637 )]
638 async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> {
639 self.set_displayname(mxid, "").await
640 }
641
642 #[tracing::instrument(
643 name = "homeserver.allow_cross_signing_reset",
644 skip_all,
645 fields(
646 matrix.homeserver = self.homeserver,
647 matrix.mxid = mxid,
648 ),
649 err(Debug),
650 )]
651 async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> {
652 let encoded_mxid = urlencoding::encode(mxid);
653
654 let response = self
655 .post(&format!(
656 "_synapse/admin/v1/users/{encoded_mxid}/_allow_cross_signing_replacement_without_uia"
657 ))
658 .json(&SynapseAllowCrossSigningResetRequest {})
659 .send_traced()
660 .await
661 .context("Failed to allow cross-signing reset in Synapse")?;
662
663 let response = response
664 .error_for_synapse_error()
665 .await
666 .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?;
667
668 if response.status() != StatusCode::OK {
669 bail!(
670 "Unexpected HTTP code while allowing cross-signing reset in Synapse: {}",
671 response.status(),
672 );
673 }
674
675 Ok(())
676 }
677}