Line data Source code
1 : use crate::config::DatabaseConfig;
2 : use sqlx_core::migrate::Migrator;
3 : use sqlx_postgres::{PgPool, PgPoolOptions};
4 : use std::path::{Path, PathBuf};
5 : use std::time::{Duration, Instant};
6 : use tokio::time::sleep;
7 : use tracing::{info, warn};
8 :
9 : /// Connect to the database and run migrations.
10 : ///
11 : /// This function implements exponential backoff retry logic to handle
12 : /// startup race conditions when the database container is still initializing.
13 : ///
14 : /// # Errors
15 : /// Returns an error if the database connection cannot be established or
16 : /// migrations fail to run after exhausting retries.
17 0 : pub async fn setup_database(config: &DatabaseConfig) -> Result<PgPool, anyhow::Error> {
18 0 : let retry_deadline = Duration::from_secs(60); // overall retry budget
19 0 : let max_interval = Duration::from_secs(30); // cap single waits
20 0 : let mut delay = Duration::from_millis(500);
21 0 : let start = Instant::now();
22 :
23 0 : let pool = loop {
24 0 : info!("Attempting to connect to Postgres...");
25 :
26 0 : match PgPoolOptions::new()
27 0 : .max_connections(config.max_connections)
28 : // Allow extra time to acquire a connection during startup bursts
29 0 : .acquire_timeout(Duration::from_secs(30))
30 0 : .connect(&config.url)
31 0 : .await
32 : {
33 0 : Ok(pool) => break pool,
34 0 : Err(err) => {
35 0 : if start.elapsed() >= retry_deadline {
36 0 : warn!(error = %err, "Postgres not ready; retries exhausted");
37 0 : return Err(err.into());
38 0 : }
39 :
40 0 : warn!(error = %err, "Postgres not ready yet; retrying");
41 0 : sleep(delay).await;
42 0 : delay = (delay.saturating_mul(2)).min(max_interval);
43 : }
44 : }
45 : };
46 :
47 : // Resolve the migrations directory in a way that works in release images too.
48 : // Preference order:
49 : // 1. config.migrations_dir (from config file or TC_DATABASE__MIGRATIONS_DIR env)
50 : // 2. ./migrations relative to the running binary
51 : // 3. The compile-time manifest directory for local `cargo run`
52 0 : let candidate_dirs = [
53 0 : config.migrations_dir.as_ref().map(PathBuf::from),
54 0 : Some(PathBuf::from("./migrations")),
55 0 : Some(PathBuf::from(concat!(
56 0 : env!("CARGO_MANIFEST_DIR"),
57 0 : "/migrations"
58 0 : ))),
59 0 : ];
60 :
61 0 : let mut last_error = None;
62 0 : let mut migrator = None;
63 :
64 0 : for dir in candidate_dirs.into_iter().flatten() {
65 0 : match Migrator::new(Path::new(&dir)).await {
66 0 : Ok(found) => {
67 0 : info!("Using migrations from {}", dir.display());
68 0 : migrator = Some(found);
69 0 : break;
70 : }
71 0 : Err(err) => {
72 0 : last_error = Some((dir, err));
73 0 : }
74 : }
75 : }
76 :
77 0 : let migrator = migrator.ok_or_else(|| match last_error {
78 0 : Some((dir, err)) => {
79 0 : anyhow::anyhow!("failed to load migrations from {}: {}", dir.display(), err)
80 : }
81 0 : None => anyhow::anyhow!("failed to resolve migrations directory"),
82 0 : })?;
83 :
84 0 : migrator.run(&pool).await?;
85 0 : info!("Migrations applied");
86 0 : Ok(pool)
87 0 : }
|