S3FSFilter(
bucket,
*,
key="",
secret="",
endpoint_url=None,
region_name=None,
prefix="",
**kwargs,
)
Bases: StorageAdapterBase
, GitFilterBase
Git filter that stores artifacts in S3 via :mod:s3fs
.
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | def __init__(
self,
bucket: str,
*,
key: SecretStr | str = "",
secret: SecretStr | str = "",
endpoint_url: str | None = None,
region_name: str | None = None,
prefix: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)
self._bucket = bucket
self._prefix = prefix.lstrip("/")
k = key.get_secret_value() if isinstance(key, SecretStr) else key
s = secret.get_secret_value() if isinstance(secret, SecretStr) else secret
client_kwargs: dict[str, str] = {}
if endpoint_url:
client_kwargs["endpoint_url"] = endpoint_url
if region_name:
client_kwargs["region_name"] = region_name
self._fs = s3fs.S3FileSystem(
key=k or None,
secret=s or None,
client_kwargs=client_kwargs or None,
**kwargs,
)
|
upload
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
| def upload(self, key: str, data: BinaryIO) -> str:
dest = self._full_key(key)
with self._fs.open(dest, "wb") as fh:
shutil.copyfileobj(data, fh)
return f"{self.root_uri}{key.lstrip('/')}"
|
download
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
| def download(self, key: str) -> BinaryIO:
src = self._full_key(key)
return self._fs.open(src, "rb")
|
upload_dir
upload_dir(src, *, prefix='')
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
| def upload_dir(self, src: str | os.PathLike, *, prefix: str = "") -> None:
base = Path(src)
for path in base.rglob("*"):
if path.is_file():
rel = path.relative_to(base).as_posix()
key = f"{prefix.rstrip('/')}/{rel}" if prefix else rel
with path.open("rb") as fh:
self.upload(key, fh)
|
iter_prefix
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
| def iter_prefix(self, prefix: str):
base = self._full_key(prefix)
for p in self._fs.find(base):
rel = p[len(f"{self._bucket}/") :]
if self._prefix and rel.startswith(self._prefix.rstrip("/") + "/"):
rel = rel[len(self._prefix.rstrip("/")) + 1 :]
yield rel
|
download_prefix
download_prefix(prefix, dest_dir)
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
| def download_prefix(self, prefix: str, dest_dir: str | os.PathLike) -> None:
dest = Path(dest_dir)
for rel_key in self.iter_prefix(prefix):
target = dest / rel_key
target.parent.mkdir(parents=True, exist_ok=True)
data = self.download(rel_key)
with target.open("wb") as fh:
shutil.copyfileobj(data, fh)
|
from_uri
classmethod
Source code in swarmauri_gitfilter_s3fs/s3fs_filter.py
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | @classmethod
def from_uri(cls, uri: str) -> "S3FSFilter":
from urllib.parse import urlparse
p = urlparse(uri)
if p.scheme != "s3":
raise ValueError("URI must start with s3://")
bucket = p.netloc
prefix = p.path.lstrip("/")
cfg = load_peagen_toml()
s3_cfg = cfg.get("storage", {}).get("filters", {}).get("s3fs", {})
key = s3_cfg.get("key") or os.getenv("AWS_ACCESS_KEY_ID", "")
secret = s3_cfg.get("secret") or os.getenv("AWS_SECRET_ACCESS_KEY", "")
endpoint_url = s3_cfg.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL")
region = s3_cfg.get("region") or os.getenv("AWS_REGION")
return cls(
bucket=bucket,
prefix=prefix,
key=key,
secret=secret,
endpoint_url=endpoint_url,
region_name=region,
)
|