Skip to content

Class swarmauri_gitfilter_minio.minio_filter.MinioFilter

swarmauri_gitfilter_minio.minio_filter.MinioFilter

MinioFilter(
    endpoint,
    access_key,
    secret_key,
    bucket,
    *,
    secure=True,
    prefix="",
    **kwargs,
)

Bases: StorageAdapterBase, GitFilterBase

Interact with MinIO for storing Git objects.

Provides helpers to upload and download individual files or directory trees, allowing Peagen to offload repository data to object storage.

Create a new MinioFilter instance.

endpoint (str): Host and port of the MinIO service. access_key (SecretStr): Access key credential. secret_key (SecretStr): Secret key credential. bucket (str): Bucket name to store objects in. secure (bool): Use HTTPS when True. prefix (str): Optional path prefix inside the bucket. **kwargs: Additional options forwarded to StorageAdapterBase.

RETURNS (None): This method does not return anything.

Source code in swarmauri_gitfilter_minio/minio_filter.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self,
    endpoint: str,
    access_key: SecretStr,
    secret_key: SecretStr,
    bucket: str,
    *,
    secure: bool = True,
    prefix: str = "",
    **kwargs,
) -> None:
    """Create a new MinioFilter instance.

    endpoint (str): Host and port of the MinIO service.
    access_key (SecretStr): Access key credential.
    secret_key (SecretStr): Secret key credential.
    bucket (str): Bucket name to store objects in.
    secure (bool): Use HTTPS when ``True``.
    prefix (str): Optional path prefix inside the bucket.
    **kwargs: Additional options forwarded to ``StorageAdapterBase``.

    RETURNS (None): This method does not return anything.
    """
    super().__init__(**kwargs)
    self._client = Minio(
        endpoint,
        access_key=access_key,
        secret_key=secret_key,
        secure=secure,
    )
    self._endpoint = endpoint
    self._secure = secure
    self._bucket = bucket
    self._prefix = prefix.lstrip("/")

    if not self._client.bucket_exists(bucket):
        self._client.make_bucket(bucket)

root_uri property

root_uri

Return the root URI for the configured bucket and prefix.

RETURNS (str): Base MinIO URI ending with a trailing slash.

upload

upload(key, data)

Upload a binary stream to the bucket.

key (str): Destination object key. data (BinaryIO): File-like object containing the data.

RETURNS (str): URI of the stored object. RAISES (S3Error): If the upload fails.

Source code in swarmauri_gitfilter_minio/minio_filter.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def upload(self, key: str, data: BinaryIO) -> str:
    """Upload a binary stream to the bucket.

    key (str): Destination object key.
    data (BinaryIO): File-like object containing the data.

    RETURNS (str): URI of the stored object.
    RAISES (S3Error): If the upload fails.
    """
    size: Optional[int] = None
    try:
        size = os.fstat(data.fileno()).st_size  # type: ignore[attr-defined]
    except Exception:
        if not isinstance(data, (io.BytesIO, io.BufferedReader)):
            data = io.BytesIO(data.read())  # type: ignore[arg-type]
        size = len(data.getbuffer())  # type: ignore[attr-defined]
    data.seek(0)
    self._client.put_object(
        self._bucket,
        self._full_key(key),
        data,
        length=size if size and size > 0 else -1,
        part_size=10 * 1024 * 1024,
    )

    return f"{self.root_uri}{key.lstrip('/')}"

download

download(key)

Retrieve an object from the bucket.

key (str): Object key to fetch.

RETURNS (BinaryIO): Buffer containing the object data. RAISES (FileNotFoundError): If the object cannot be found.

Source code in swarmauri_gitfilter_minio/minio_filter.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def download(self, key: str) -> BinaryIO:
    """Retrieve an object from the bucket.

    key (str): Object key to fetch.

    RETURNS (BinaryIO): Buffer containing the object data.
    RAISES (FileNotFoundError): If the object cannot be found.
    """
    try:
        resp = self._client.get_object(self._bucket, self._full_key(key))
        buffer = io.BytesIO(resp.read())
        buffer.seek(0)
        resp.close()
        resp.release_conn()
        return buffer
    except S3Error as exc:
        raise FileNotFoundError(f"{self._bucket}/{key}: {exc}") from exc

upload_dir

upload_dir(src, *, prefix='')

Upload all files under a directory to the bucket.

src (str | os.PathLike): Local directory to walk. prefix (str): Optional key prefix to prepend to uploaded objects.

RETURNS (None): This method does not return anything.

Source code in swarmauri_gitfilter_minio/minio_filter.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def upload_dir(self, src: str | os.PathLike, *, prefix: str = "") -> None:
    """Upload all files under a directory to the bucket.

    src (str | os.PathLike): Local directory to walk.
    prefix (str): Optional key prefix to prepend to uploaded objects.

    RETURNS (None): This method does not return anything.
    """
    base = Path(src)
    for path in base.rglob("*"):
        if path.is_file():
            rel = path.relative_to(base).as_posix()
            key = f"{prefix.rstrip('/')}/{rel}" if prefix else rel
            with path.open("rb") as fh:
                self.upload(key, fh)

iter_prefix

iter_prefix(prefix)

Iterate over object keys matching a prefix.

prefix (str): Key prefix to match.

RETURNS (Iterator[str]): Relative keys under the configured prefix.

Source code in swarmauri_gitfilter_minio/minio_filter.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def iter_prefix(self, prefix: str):
    """Iterate over object keys matching a prefix.

    prefix (str): Key prefix to match.

    RETURNS (Iterator[str]): Relative keys under the configured prefix.
    """
    for obj in self._client.list_objects(
        self._bucket, prefix=prefix, recursive=True
    ):
        key = obj.object_name
        if self._prefix and key.startswith(self._prefix.rstrip("/") + "/"):
            key = key[len(self._prefix.rstrip("/")) + 1 :]
        yield key

download_prefix

download_prefix(prefix, dest_dir)

Download all objects beneath a prefix into a directory.

prefix (str): Key prefix to copy from the bucket. dest_dir (str | os.PathLike): Local directory to populate.

RETURNS (None): This method does not return anything.

Source code in swarmauri_gitfilter_minio/minio_filter.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def download_prefix(self, prefix: str, dest_dir: str | os.PathLike) -> None:
    """Download all objects beneath a prefix into a directory.

    prefix (str): Key prefix to copy from the bucket.
    dest_dir (str | os.PathLike): Local directory to populate.

    RETURNS (None): This method does not return anything.
    """
    dest = Path(dest_dir)
    for rel_key in self.iter_prefix(prefix):
        target = dest / rel_key
        target.parent.mkdir(parents=True, exist_ok=True)
        data = self.download(rel_key)
        with target.open("wb") as fh:
            shutil.copyfileobj(data, fh)

from_uri classmethod

from_uri(uri)

Create a MinioFilter from a connection string.

uri (str): A URI like minio://host:port/bucket/prefix.

RETURNS (MinioFilter): Configured MinioFilter instance.

Source code in swarmauri_gitfilter_minio/minio_filter.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@classmethod
def from_uri(cls, uri: str) -> "MinioFilter":
    """Create a MinioFilter from a connection string.

    uri (str): A URI like ``minio://host:port/bucket/prefix``.

    RETURNS (MinioFilter): Configured ``MinioFilter`` instance.
    """
    from urllib.parse import urlparse

    p = urlparse(uri)
    secure = p.scheme == "minios"
    endpoint = p.netloc
    bucket, *rest = p.path.lstrip("/").split("/", 1)
    prefix = rest[0] if rest else ""

    cfg = load_peagen_toml()
    minio_cfg = cfg.get("storage", {}).get("filters", {}).get("minio", {})

    access_key = minio_cfg.get("access_key") or os.getenv("MINIO_ACCESS_KEY", "")
    secret_key = minio_cfg.get("secret_key") or os.getenv("MINIO_SECRET_KEY", "")

    return cls(
        endpoint=endpoint,
        bucket=bucket,
        access_key=access_key,
        secret_key=secret_key,
        secure=secure,
        prefix=prefix,
    )