Source code for pytest_databases.docker.postgres

from __future__ import annotations

import dataclasses
import os
from contextlib import contextmanager
from typing import TYPE_CHECKING

import psycopg
import pytest

from pytest_databases.helpers import get_xdist_worker_num
from pytest_databases.types import ServiceContainer, XdistIsolationLevel

if TYPE_CHECKING:
    from collections.abc import Generator

    from pytest_databases._service import DockerService


def _make_connection_string(host: str, port: int, user: str, password: str, database: str) -> str:
    return f"dbname={database} user={user} host={host} port={port} password={password}"


[docs] @pytest.fixture(scope="session") def xdist_postgres_isolation_level() -> XdistIsolationLevel: return "database"
[docs] @dataclasses.dataclass class PostgresService(ServiceContainer): database: str password: str user: str
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_host() -> str: return os.environ.get("POSTGRES_HOST", "127.0.0.1")
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_password() -> str: return "super-secret"
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_user() -> str: return "postgres"
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_port() -> int | None: value = os.environ.get("POSTGRES_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_11_port() -> int | None: value = os.environ.get("POSTGRES_11_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_12_port() -> int | None: value = os.environ.get("POSTGRES_12_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_13_port() -> int | None: value = os.environ.get("POSTGRES_13_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_14_port() -> int | None: value = os.environ.get("POSTGRES_14_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_15_port() -> int | None: value = os.environ.get("POSTGRES_15_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_16_port() -> int | None: value = os.environ.get("POSTGRES_16_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_17_port() -> int | None: value = os.environ.get("POSTGRES_17_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_18_port() -> int | None: value = os.environ.get("POSTGRES_18_PORT") return int(value) if value else None
@contextmanager def _provide_postgres_service( docker_service: DockerService, image: str, name: str, host: str, user: str, password: str, xdist_postgres_isolate: XdistIsolationLevel, host_port: int | None = None, ) -> Generator[PostgresService, None, None]: def check(_service: ServiceContainer) -> bool: try: with psycopg.connect( _make_connection_string( host=_service.host, port=_service.port, user=user, password=password, database="postgres", ) ) as conn: db_open = conn.execute("SELECT 1").fetchone() return bool(db_open is not None and db_open[0] == 1) except Exception: # noqa: BLE001 return False worker_num = get_xdist_worker_num() db_name = "pytest_databases" if worker_num is not None: suffix = f"_{worker_num}" if xdist_postgres_isolate == "server": name += suffix else: db_name += suffix with docker_service.run( image=image, check=check, container_host=host, container_port=5432, name=name, env={ "POSTGRES_PASSWORD": password, }, exec_after_start=f"psql -U postgres -d postgres -c 'CREATE DATABASE {db_name};'", transient=xdist_postgres_isolate == "server", host_port=host_port, ) as service: yield PostgresService( host=service.host, port=service.port, container=service.container, database=db_name, user=user, password=password, )
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_11_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_11_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:11", name="postgres-11", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_11_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_12_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_12_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:12", name="postgres-12", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_12_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_13_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_13_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:13", name="postgres-13", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_13_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_14_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_14_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:14", name="postgres-14", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_14_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_15_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_15_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:15", name="postgres-15", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_15_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_16_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_16_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:16", name="postgres-16", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_16_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_17_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_17_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:17", name="postgres-17", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_17_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_18_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_18_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="postgres:18", name="postgres-18", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_18_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_11_connection( postgres_11_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_11_service.host, port=postgres_11_service.port, user=postgres_11_service.user, password=postgres_11_service.password, database=postgres_11_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_12_connection( postgres_12_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_12_service.host, port=postgres_12_service.port, user=postgres_12_service.user, password=postgres_12_service.password, database=postgres_12_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_13_connection( postgres_13_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_13_service.host, port=postgres_13_service.port, user=postgres_13_service.user, password=postgres_13_service.password, database=postgres_13_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_14_connection( postgres_14_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_14_service.host, port=postgres_14_service.port, user=postgres_14_service.user, password=postgres_14_service.password, database=postgres_14_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_15_connection( postgres_15_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_15_service.host, port=postgres_15_service.port, user=postgres_15_service.user, password=postgres_15_service.password, database=postgres_15_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_16_connection( postgres_16_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_16_service.host, port=postgres_16_service.port, user=postgres_16_service.user, password=postgres_16_service.password, database=postgres_16_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_17_connection( postgres_17_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_17_service.host, port=postgres_17_service.port, user=postgres_17_service.user, password=postgres_17_service.password, database=postgres_17_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_18_connection( postgres_18_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_18_service.host, port=postgres_18_service.port, user=postgres_18_service.user, password=postgres_18_service.password, database=postgres_18_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_image() -> str: return "postgres:18"
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_service( docker_service: DockerService, postgres_image: str, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, postgres_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image=postgres_image, name="postgres", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=postgres_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def postgres_connection( postgres_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=postgres_service.host, port=postgres_service.port, user=postgres_service.user, password=postgres_service.password, database=postgres_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_image() -> str: return "pgvector/pgvector:pg18"
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_port() -> int | None: value = os.environ.get("PGVECTOR_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_13_port() -> int | None: value = os.environ.get("PGVECTOR_13_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_14_port() -> int | None: value = os.environ.get("PGVECTOR_14_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_15_port() -> int | None: value = os.environ.get("PGVECTOR_15_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_16_port() -> int | None: value = os.environ.get("PGVECTOR_16_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_17_port() -> int | None: value = os.environ.get("PGVECTOR_17_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_18_port() -> int | None: value = os.environ.get("PGVECTOR_18_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_service( docker_service: DockerService, pgvector_image: str, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image=pgvector_image, name="pgvector", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_13_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_13_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="pgvector/pgvector:pg13", name="pgvector-13", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_13_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_14_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_14_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="pgvector/pgvector:pg14", name="pgvector-14", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_14_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_15_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_15_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="pgvector/pgvector:pg15", name="pgvector-15", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_15_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_16_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_16_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="pgvector/pgvector:pg16", name="pgvector-16", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_16_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_17_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_17_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="pgvector/pgvector:pg17", name="pgvector-17", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_17_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_18_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, pgvector_18_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="pgvector/pgvector:pg18", name="pgvector-18", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=pgvector_18_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_connection( pgvector_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_service.host, port=pgvector_service.port, user=pgvector_service.user, password=pgvector_service.password, database=pgvector_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_13_connection( pgvector_13_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_13_service.host, port=pgvector_13_service.port, user=pgvector_13_service.user, password=pgvector_13_service.password, database=pgvector_13_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_14_connection( pgvector_14_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_14_service.host, port=pgvector_14_service.port, user=pgvector_14_service.user, password=pgvector_14_service.password, database=pgvector_14_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_15_connection( pgvector_15_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_15_service.host, port=pgvector_15_service.port, user=pgvector_15_service.user, password=pgvector_15_service.password, database=pgvector_15_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_16_connection( pgvector_16_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_16_service.host, port=pgvector_16_service.port, user=pgvector_16_service.user, password=pgvector_16_service.password, database=pgvector_16_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_17_connection( pgvector_17_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_17_service.host, port=pgvector_17_service.port, user=pgvector_17_service.user, password=pgvector_17_service.password, database=pgvector_17_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def pgvector_18_connection( pgvector_18_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=pgvector_18_service.host, port=pgvector_18_service.port, user=pgvector_18_service.user, password=pgvector_18_service.password, database=pgvector_18_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_image() -> str: return "paradedb/paradedb:latest-pg18"
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_port() -> int | None: value = os.environ.get("PARADEDB_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_15_port() -> int | None: value = os.environ.get("PARADEDB_15_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_16_port() -> int | None: value = os.environ.get("PARADEDB_16_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_17_port() -> int | None: value = os.environ.get("PARADEDB_17_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_18_port() -> int | None: value = os.environ.get("PARADEDB_18_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_service( docker_service: DockerService, paradedb_image: str, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, paradedb_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image=paradedb_image, name="paradedb", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=paradedb_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_15_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, paradedb_15_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="paradedb/paradedb:latest-pg15", name="paradedb-15", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=paradedb_15_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_16_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, paradedb_16_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="paradedb/paradedb:latest-pg16", name="paradedb-16", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=paradedb_16_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_17_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, paradedb_17_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="paradedb/paradedb:latest-pg17", name="paradedb-17", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=paradedb_17_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_18_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, paradedb_18_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="paradedb/paradedb:latest-pg18", name="paradedb-18", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=paradedb_18_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_connection( paradedb_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=paradedb_service.host, port=paradedb_service.port, user=paradedb_service.user, password=paradedb_service.password, database=paradedb_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_15_connection( paradedb_15_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=paradedb_15_service.host, port=paradedb_15_service.port, user=paradedb_15_service.user, password=paradedb_15_service.password, database=paradedb_15_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_16_connection( paradedb_16_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=paradedb_16_service.host, port=paradedb_16_service.port, user=paradedb_16_service.user, password=paradedb_16_service.password, database=paradedb_16_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_17_connection( paradedb_17_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=paradedb_17_service.host, port=paradedb_17_service.port, user=paradedb_17_service.user, password=paradedb_17_service.password, database=paradedb_17_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def paradedb_18_connection( paradedb_18_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=paradedb_18_service.host, port=paradedb_18_service.port, user=paradedb_18_service.user, password=paradedb_18_service.password, database=paradedb_18_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_image() -> str: return "google/alloydbomni:17"
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_port() -> int | None: value = os.environ.get("ALLOYDB_OMNI_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_15_port() -> int | None: value = os.environ.get("ALLOYDB_OMNI_15_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_16_port() -> int | None: value = os.environ.get("ALLOYDB_OMNI_16_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_17_port() -> int | None: value = os.environ.get("ALLOYDB_OMNI_17_PORT") return int(value) if value else None
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_service( docker_service: DockerService, alloydb_omni_image: str, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, alloydb_omni_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image=alloydb_omni_image, name="alloydb-omni", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=alloydb_omni_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_15_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, alloydb_omni_15_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="google/alloydbomni:15", name="alloydb-omni-15", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=alloydb_omni_15_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_16_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, alloydb_omni_16_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="google/alloydbomni:16", name="alloydb-omni-16", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=alloydb_omni_16_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_17_service( docker_service: DockerService, xdist_postgres_isolation_level: XdistIsolationLevel, postgres_host: str, postgres_user: str, postgres_password: str, alloydb_omni_17_port: int | None, ) -> Generator[PostgresService, None, None]: with _provide_postgres_service( docker_service, image="google/alloydbomni:17", name="alloydb-omni-17", xdist_postgres_isolate=xdist_postgres_isolation_level, host=postgres_host, user=postgres_user, password=postgres_password, host_port=alloydb_omni_17_port, ) as service: yield service
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_connection( alloydb_omni_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=alloydb_omni_service.host, port=alloydb_omni_service.port, user=alloydb_omni_service.user, password=alloydb_omni_service.password, database=alloydb_omni_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_15_connection( alloydb_omni_15_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=alloydb_omni_15_service.host, port=alloydb_omni_15_service.port, user=alloydb_omni_15_service.user, password=alloydb_omni_15_service.password, database=alloydb_omni_15_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_16_connection( alloydb_omni_16_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=alloydb_omni_16_service.host, port=alloydb_omni_16_service.port, user=alloydb_omni_16_service.user, password=alloydb_omni_16_service.password, database=alloydb_omni_16_service.database, ), ) as conn: yield conn
[docs] @pytest.fixture(autouse=False, scope="session") def alloydb_omni_17_connection( alloydb_omni_17_service: PostgresService, ) -> Generator[psycopg.Connection, None, None]: with psycopg.connect( _make_connection_string( host=alloydb_omni_17_service.host, port=alloydb_omni_17_service.port, user=alloydb_omni_17_service.user, password=alloydb_omni_17_service.password, database=alloydb_omni_17_service.database, ), ) as conn: yield conn