Skip to content

Class swarmauri_gitfilter_s3fs.s3fs_filter.S3FSFilter

swarmauri_gitfilter_s3fs.s3fs_filter.S3FSFilter

S3FSFilter(
    bucket,
    *,
    key="",
    secret="",
    endpoint_url=None,
    region_name=None,
    prefix="",
    **kwargs,
)

Bases: StorageAdapterBase, GitFilterBase

Git filter that stores artifacts in S3 via :mod:s3fs.

Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    bucket: str,
    *,
    key: SecretStr | str = "",
    secret: SecretStr | str = "",
    endpoint_url: str | None = None,
    region_name: str | None = None,
    prefix: str = "",
    **kwargs,
) -> None:
    super().__init__(**kwargs)
    self._bucket = bucket
    self._prefix = prefix.lstrip("/")

    k = key.get_secret_value() if isinstance(key, SecretStr) else key
    s = secret.get_secret_value() if isinstance(secret, SecretStr) else secret
    client_kwargs: dict[str, str] = {}
    if endpoint_url:
        client_kwargs["endpoint_url"] = endpoint_url
    if region_name:
        client_kwargs["region_name"] = region_name

    self._fs = s3fs.S3FileSystem(
        key=k or None,
        secret=s or None,
        client_kwargs=client_kwargs or None,
        **kwargs,
    )

root_uri property

root_uri

upload

upload(key, data)
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
63
64
65
66
67
def upload(self, key: str, data: BinaryIO) -> str:
    dest = self._full_key(key)
    with self._fs.open(dest, "wb") as fh:
        shutil.copyfileobj(data, fh)
    return f"{self.root_uri}{key.lstrip('/')}"

download

download(key)
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
69
70
71
def download(self, key: str) -> BinaryIO:
    src = self._full_key(key)
    return self._fs.open(src, "rb")

upload_dir

upload_dir(src, *, prefix='')
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
73
74
75
76
77
78
79
80
def upload_dir(self, src: str | os.PathLike, *, prefix: str = "") -> None:
    base = Path(src)
    for path in base.rglob("*"):
        if path.is_file():
            rel = path.relative_to(base).as_posix()
            key = f"{prefix.rstrip('/')}/{rel}" if prefix else rel
            with path.open("rb") as fh:
                self.upload(key, fh)

iter_prefix

iter_prefix(prefix)
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
82
83
84
85
86
87
88
def iter_prefix(self, prefix: str):
    base = self._full_key(prefix)
    for p in self._fs.find(base):
        rel = p[len(f"{self._bucket}/") :]
        if self._prefix and rel.startswith(self._prefix.rstrip("/") + "/"):
            rel = rel[len(self._prefix.rstrip("/")) + 1 :]
        yield rel

download_prefix

download_prefix(prefix, dest_dir)
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
90
91
92
93
94
95
96
97
def download_prefix(self, prefix: str, dest_dir: str | os.PathLike) -> None:
    dest = Path(dest_dir)
    for rel_key in self.iter_prefix(prefix):
        target = dest / rel_key
        target.parent.mkdir(parents=True, exist_ok=True)
        data = self.download(rel_key)
        with target.open("wb") as fh:
            shutil.copyfileobj(data, fh)

from_uri classmethod

from_uri(uri)
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@classmethod
def from_uri(cls, uri: str) -> "S3FSFilter":
    from urllib.parse import urlparse

    p = urlparse(uri)
    if p.scheme != "s3":
        raise ValueError("URI must start with s3://")

    bucket = p.netloc
    prefix = p.path.lstrip("/")

    cfg = load_peagen_toml()
    s3_cfg = cfg.get("storage", {}).get("filters", {}).get("s3fs", {})

    key = s3_cfg.get("key") or os.getenv("AWS_ACCESS_KEY_ID", "")
    secret = s3_cfg.get("secret") or os.getenv("AWS_SECRET_ACCESS_KEY", "")
    endpoint_url = s3_cfg.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL")
    region = s3_cfg.get("region") or os.getenv("AWS_REGION")

    return cls(
        bucket=bucket,
        prefix=prefix,
        key=key,
        secret=secret,
        endpoint_url=endpoint_url,
        region_name=region,
    )