Skip to content

Class swarmauri_storage_minio.minio_storage_adapter.MinioStorageAdapter

swarmauri_storage_minio.minio_storage_adapter.MinioStorageAdapter

MinioStorageAdapter(
    endpoint,
    access_key,
    secret_key,
    bucket,
    *,
    secure=True,
    prefix="",
    **kwargs,
)

Bases: StorageAdapterBase

Simple wrapper around the MinIO client for use with Peagen.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    endpoint: str,
    access_key: SecretStr,
    secret_key: SecretStr,
    bucket: str,
    *,
    secure: bool = True,
    prefix: str = "",
    **kwargs,
) -> None:
    super().__init__(**kwargs)
    self._client = Minio(
        endpoint,
        access_key=access_key,
        secret_key=secret_key,
        secure=secure,
    )
    self._endpoint = endpoint
    self._secure = secure
    self._bucket = bucket
    self._prefix = prefix.lstrip("/")

    if not self._client.bucket_exists(bucket):
        self._client.make_bucket(bucket)

root_uri property

root_uri

Return the base URI as minio[s]://endpoint/bucket/prefix/.

upload

upload(key, data)

Upload data to bucket/prefix/key and return the artifact URI.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def upload(self, key: str, data: BinaryIO) -> str:
    """Upload *data* to ``bucket/prefix/key`` and return the artifact URI."""
    size: Optional[int] = None
    try:
        size = os.fstat(data.fileno()).st_size  # type: ignore[attr-defined]
    except Exception:
        if not isinstance(data, (io.BytesIO, io.BufferedReader)):
            data = io.BytesIO(data.read())  # type: ignore[arg-type]
        size = len(data.getbuffer())  # type: ignore[attr-defined]
    data.seek(0)
    self._client.put_object(
        self._bucket,
        self._full_key(key),
        data,
        length=size if size and size > 0 else -1,
        part_size=10 * 1024 * 1024,
    )

    return f"{self.root_uri}{key.lstrip('/')}"

download

download(key)

Return a BytesIO for the object prefix/key.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def download(self, key: str) -> BinaryIO:
    """Return a ``BytesIO`` for the object ``prefix/key``."""
    try:
        resp = self._client.get_object(self._bucket, self._full_key(key))
        buffer = io.BytesIO(resp.read())
        buffer.seek(0)
        resp.close()
        resp.release_conn()
        return buffer
    except S3Error as exc:
        raise FileNotFoundError(f"{self._bucket}/{key}: {exc}") from exc

upload_dir

upload_dir(src, *, prefix='')

Recursively upload a directory under prefix.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
106
107
108
109
110
111
112
113
114
def upload_dir(self, src: str | os.PathLike, *, prefix: str = "") -> None:
    """Recursively upload a directory under ``prefix``."""
    base = Path(src)
    for path in base.rglob("*"):
        if path.is_file():
            rel = path.relative_to(base).as_posix()
            key = f"{prefix.rstrip('/')}/{rel}" if prefix else rel
            with path.open("rb") as fh:
                self.upload(key, fh)

iter_prefix

iter_prefix(prefix)

Yield keys under prefix relative to the run root.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
117
118
119
120
121
122
123
124
125
def iter_prefix(self, prefix: str):
    """Yield keys under ``prefix`` relative to the run root."""
    for obj in self._client.list_objects(
        self._bucket, prefix=prefix, recursive=True
    ):
        key = obj.object_name
        if self._prefix and key.startswith(self._prefix.rstrip("/") + "/"):
            key = key[len(self._prefix.rstrip("/")) + 1 :]
        yield key

download_dir

download_dir(prefix, dest_dir)

Download everything under prefix into dest_dir.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
128
129
130
131
132
133
134
135
136
def download_dir(self, prefix: str, dest_dir: str | os.PathLike) -> None:
    """Download everything under ``prefix`` into ``dest_dir``."""
    dest = Path(dest_dir)
    for rel_key in self.iter_prefix(prefix):
        target = dest / rel_key
        target.parent.mkdir(parents=True, exist_ok=True)
        data = self.download(rel_key)
        with target.open("wb") as fh:
            shutil.copyfileobj(data, fh)

from_uri classmethod

from_uri(uri)

Create an adapter from a minio[s]:// URI and env/TOML creds.

Source code in swarmauri_storage_minio/minio_storage_adapter.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@classmethod
def from_uri(cls, uri: str) -> "MinioStorageAdapter":
    """Create an adapter from a ``minio[s]://`` URI and env/TOML creds."""
    from urllib.parse import urlparse

    p = urlparse(uri)
    secure = p.scheme == "minios"
    endpoint = p.netloc
    bucket, *rest = p.path.lstrip("/").split("/", 1)
    prefix = rest[0] if rest else ""

    cfg = load_peagen_toml()
    minio_cfg = cfg.get("storage", {}).get("adapters", {}).get("minio", {})

    access_key = minio_cfg.get("access_key") or os.getenv("MINIO_ACCESS_KEY", "")
    secret_key = minio_cfg.get("secret_key") or os.getenv("MINIO_SECRET_KEY", "")

    return cls(
        endpoint=endpoint,
        bucket=bucket,
        access_key=access_key,
        secret_key=secret_key,
        secure=secure,
        prefix=prefix,
    )