Source code for pytest_databases.docker.rustfs

from __future__ import annotations

import contextlib
import os
from dataclasses import dataclass
from typing import TYPE_CHECKING
from urllib.error import URLError
from urllib.request import Request, urlopen

import pytest
from docker.errors import ImageNotFound

from pytest_databases.docker import TRUE_VALUES
from pytest_databases.helpers import get_xdist_worker_num
from pytest_databases.types import ServiceContainer, XdistIsolationLevel

if TYPE_CHECKING:
    from collections.abc import Generator

    from pytest_databases._service import DockerService


[docs] @dataclass class RustfsService(ServiceContainer): endpoint: str access_key: str secret_key: str secure: bool
[docs] @pytest.fixture(scope="session") def rustfs_access_key() -> str: return os.getenv("RUSTFS_ACCESS_KEY", "rustfsadmin")
[docs] @pytest.fixture(scope="session") def rustfs_secret_key() -> str: return os.getenv("RUSTFS_SECRET_KEY", "rustfsadmin")
[docs] @pytest.fixture(scope="session") def rustfs_secure() -> bool: return os.getenv("RUSTFS_SECURE", "false").lower() in TRUE_VALUES
[docs] @pytest.fixture(scope="session") def xdist_rustfs_isolation_level() -> XdistIsolationLevel: return "database"
[docs] @pytest.fixture(scope="session") def rustfs_default_bucket_name(xdist_rustfs_isolation_level: XdistIsolationLevel) -> str: if env_bucket := os.getenv("RUSTFS_DEFAULT_BUCKET_NAME"): return env_bucket worker_num = get_xdist_worker_num() if worker_num is not None and xdist_rustfs_isolation_level == "server": return f"pytest-databases-{worker_num}" return "pytest-databases"
[docs] @pytest.fixture(scope="session") def rustfs_service( docker_service: "DockerService", rustfs_access_key: str, rustfs_secret_key: str, rustfs_secure: bool, rustfs_default_bucket_name: str, xdist_rustfs_isolation_level: XdistIsolationLevel, ) -> "Generator[RustfsService, None, None]": def check(_service: ServiceContainer) -> bool: scheme = "https" if rustfs_secure else "http" url = f"{scheme}://{_service.host}:{_service.port}/health" if not url.startswith(("http:", "https:")): msg = "URL must start with 'http:' or 'https:'" raise ValueError(msg) try: with urlopen(url=Request(url, method="GET"), timeout=10) as response: # noqa: S310 return response.status == 200 except (URLError, ConnectionError): return False worker_num = get_xdist_worker_num() name = "rustfs" transient = False if worker_num is not None and xdist_rustfs_isolation_level == "server": name = f"{name}_{worker_num}" transient = True env = { "RUSTFS_ROOT_USER": rustfs_access_key, "RUSTFS_ROOT_PASSWORD": rustfs_secret_key, } with docker_service.run( image="rustfs/rustfs:latest", name=name, container_port=9000, timeout=30, pause=0.5, env=env, check=check, transient=transient, ) as service: # Create default bucket using rustfs/rc container scheme = "https" if rustfs_secure else "http" # Since we're running from the host, we use the host's IP/port for the alias endpoint_url = f"{scheme}://{service.host}:{service.port}" # We need the Docker client to run the one-shot container client = docker_service._client # Pull the RC image first if not present try: client.images.get("rustfs/rc:latest") except ImageNotFound: client.images.pull("rustfs/rc:latest") # Run rc container to create bucket # Command: rc alias set local <url> <access> <secret> && rc mb local/<bucket> # Note: We use --ignore-existing for 'mb' if available, or just ignore errors. # However, it's safer to just run and ignore errors. command = [ "sh", "-c", ( f"rc alias set local {endpoint_url} {rustfs_access_key} {rustfs_secret_key} && " f"rc mb local/{rustfs_default_bucket_name}" ), ] with contextlib.suppress(Exception): client.containers.run( image="rustfs/rc:latest", command=command, remove=True, network_mode="host", # Use host network to connect to the mapped port ) yield RustfsService( host=service.host, port=service.port, container=service.container, endpoint=f"{service.host}:{service.port}", access_key=rustfs_access_key, secret_key=rustfs_secret_key, secure=rustfs_secure, )