Skip to content

Class tigrbl.session.default.DefaultSession

tigrbl.session.default.DefaultSession

DefaultSession(underlying, spec=None)

Bases: TigrblSessionBase

Delegating session that wraps an underlying native session object (sync or async) and exposes the Tigrbl Session ABC.

No third-party imports: we rely on duck-typed methods on the underlying object (begin/commit/rollback, add/delete/flush/refresh/get/execute/close).

Source code in tigrbl/session/default.py
20
21
22
def __init__(self, underlying: Any, spec: Optional[SessionSpec] = None) -> None:
    super().__init__(spec)
    self._u = underlying

run_sync async

run_sync(fn)
Source code in tigrbl/session/default.py
25
26
27
28
29
async def run_sync(self, fn: Callable[[Any], Any]) -> Any:
    rv = fn(self._u)
    if inspect.isawaitable(rv):
        return await rv
    return rv

in_transaction

in_transaction()
Source code in tigrbl/session/default.py
56
57
58
59
60
61
62
63
def in_transaction(self) -> bool:
    it = getattr(self._u, "in_transaction", None)
    if callable(it):
        try:
            return bool(it())
        except Exception:
            pass
    return super().in_transaction()

begin async

begin()
Source code in tigrbl/session/base.py
44
45
46
async def begin(self) -> None:
    await self._tx_begin_impl()
    self._open = True

commit async

commit()
Source code in tigrbl/session/base.py
48
49
50
51
52
53
54
55
async def commit(self) -> None:
    # late guard
    if self._spec and self._spec.read_only and self._dirty:
        raise RuntimeError("read-only session: writes detected before commit")
    await self.flush()
    await self._tx_commit_impl()
    self._open = False
    self._dirty = False

rollback async

rollback()
Source code in tigrbl/session/base.py
57
58
59
60
61
62
63
64
65
66
67
async def rollback(self) -> None:
    # cancel queued add() tasks
    for t in self._pending:
        try:
            t.cancel()
        except Exception:
            pass
    self._pending.clear()
    await self._tx_rollback_impl()
    self._open = False
    self._dirty = False

get async

get(model, ident)
Source code in tigrbl/session/base.py
104
105
async def get(self, model: type, ident: Any) -> Any | None:
    return await self._get_impl(model, ident)

add

add(obj)
Source code in tigrbl/session/base.py
73
74
75
76
77
78
79
80
81
82
83
84
def add(self, obj: Any) -> None:
    if self._spec and self._spec.read_only:
        raise RuntimeError("write attempted in read-only session (add)")
    self._dirty = True
    rv = self._add_impl(obj)
    if inspect.isawaitable(rv):
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(rv)
        else:
            self._pending.append(loop.create_task(rv))

delete async

delete(obj)
Source code in tigrbl/session/base.py
86
87
88
89
90
async def delete(self, obj: Any) -> None:
    if self._spec and self._spec.read_only:
        raise RuntimeError("write attempted in read-only session (delete)")
    self._dirty = True
    await self._delete_impl(obj)

flush async

flush()
Source code in tigrbl/session/base.py
92
93
94
95
96
97
98
99
async def flush(self) -> None:
    if self._pending:
        done, _ = await asyncio.wait(self._pending, return_when=asyncio.ALL_COMPLETED)
        self._pending = []
        # surface any exception
        for t in done:
            _ = t.result()
    await self._flush_impl()

refresh async

refresh(obj)
Source code in tigrbl/session/base.py
101
102
async def refresh(self, obj: Any) -> None:
    await self._refresh_impl(obj)

execute async

execute(stmt)
Source code in tigrbl/session/base.py
107
108
async def execute(self, stmt: Any) -> Any:
    return await self._execute_impl(stmt)

close async

close()
Source code in tigrbl/session/base.py
110
111
112
113
114
115
116
117
async def close(self) -> None:
    for t in self._pending:
        try:
            t.cancel()
        except Exception:
            pass
    self._pending = []
    await self._close_impl()

apply_spec

apply_spec(spec)
Source code in tigrbl/session/base.py
30
31
def apply_spec(self, spec: SessionSpec | None) -> None:
    self._spec = spec