Bases: TigrblSessionBase
Delegating session that wraps an underlying native session object
(sync or async) and exposes the Tigrbl Session ABC.
No third-party imports: we rely on duck-typed methods on the underlying
object (begin/commit/rollback, add/delete/flush/refresh/get/execute/close).
Source code in tigrbl/session/default.py
| def __init__(self, underlying: Any, spec: Optional[SessionSpec] = None) -> None:
super().__init__(spec)
self._u = underlying
|
run_sync
async
Source code in tigrbl/session/default.py
| async def run_sync(self, fn: Callable[[Any], Any]) -> Any:
rv = fn(self._u)
if inspect.isawaitable(rv):
return await rv
return rv
|
in_transaction
Source code in tigrbl/session/default.py
| def in_transaction(self) -> bool:
it = getattr(self._u, "in_transaction", None)
if callable(it):
try:
return bool(it())
except Exception:
pass
return super().in_transaction()
|
begin
async
Source code in tigrbl/session/base.py
| async def begin(self) -> None:
await self._tx_begin_impl()
self._open = True
|
commit
async
Source code in tigrbl/session/base.py
| async def commit(self) -> None:
# late guard
if self._spec and self._spec.read_only and self._dirty:
raise RuntimeError("read-only session: writes detected before commit")
await self.flush()
await self._tx_commit_impl()
self._open = False
self._dirty = False
|
rollback
async
Source code in tigrbl/session/base.py
57
58
59
60
61
62
63
64
65
66
67 | async def rollback(self) -> None:
# cancel queued add() tasks
for t in self._pending:
try:
t.cancel()
except Exception:
pass
self._pending.clear()
await self._tx_rollback_impl()
self._open = False
self._dirty = False
|
get
async
Source code in tigrbl/session/base.py
| async def get(self, model: type, ident: Any) -> Any | None:
return await self._get_impl(model, ident)
|
add
Source code in tigrbl/session/base.py
73
74
75
76
77
78
79
80
81
82
83
84 | def add(self, obj: Any) -> None:
if self._spec and self._spec.read_only:
raise RuntimeError("write attempted in read-only session (add)")
self._dirty = True
rv = self._add_impl(obj)
if inspect.isawaitable(rv):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
asyncio.run(rv)
else:
self._pending.append(loop.create_task(rv))
|
delete
async
Source code in tigrbl/session/base.py
| async def delete(self, obj: Any) -> None:
if self._spec and self._spec.read_only:
raise RuntimeError("write attempted in read-only session (delete)")
self._dirty = True
await self._delete_impl(obj)
|
flush
async
Source code in tigrbl/session/base.py
| async def flush(self) -> None:
if self._pending:
done, _ = await asyncio.wait(self._pending, return_when=asyncio.ALL_COMPLETED)
self._pending = []
# surface any exception
for t in done:
_ = t.result()
await self._flush_impl()
|
refresh
async
Source code in tigrbl/session/base.py
| async def refresh(self, obj: Any) -> None:
await self._refresh_impl(obj)
|
execute
async
Source code in tigrbl/session/base.py
| async def execute(self, stmt: Any) -> Any:
return await self._execute_impl(stmt)
|
close
async
Source code in tigrbl/session/base.py
110
111
112
113
114
115
116
117 | async def close(self) -> None:
for t in self._pending:
try:
t.cancel()
except Exception:
pass
self._pending = []
await self._close_impl()
|
apply_spec
Source code in tigrbl/session/base.py
| def apply_spec(self, spec: SessionSpec | None) -> None:
self._spec = spec
|