Skip to content

Class tigrbl.session.base.TigrblSessionBase

tigrbl.session.base.TigrblSessionBase dataclass

TigrblSessionBase(_spec=None)

Bases: SessionABC

Common session behavior
  • Tracks SessionSpec
  • Tracks transaction state (_open) and write intent (_dirty)
  • Queues accidentally-async add() work and resolves on flush/commit
  • Enforces read-only both on write calls and at commit

apply_spec

apply_spec(spec)
Source code in tigrbl/session/base.py
30
31
def apply_spec(self, spec: SessionSpec | None) -> None:
    self._spec = spec

run_sync async

run_sync(fn)

Default async marker: run the callback against this session. Subclasses may override to pass the native handle.

Source code in tigrbl/session/base.py
33
34
35
36
37
38
39
40
41
async def run_sync(self, fn: Callable[[Any], Any]) -> Any:
    """
    Default async marker: run the callback against *this* session.
    Subclasses may override to pass the native handle.
    """
    rv = fn(self)
    if inspect.isawaitable(rv):
        return await rv
    return rv

begin async

begin()
Source code in tigrbl/session/base.py
44
45
46
async def begin(self) -> None:
    await self._tx_begin_impl()
    self._open = True

commit async

commit()
Source code in tigrbl/session/base.py
48
49
50
51
52
53
54
55
async def commit(self) -> None:
    # late guard
    if self._spec and self._spec.read_only and self._dirty:
        raise RuntimeError("read-only session: writes detected before commit")
    await self.flush()
    await self._tx_commit_impl()
    self._open = False
    self._dirty = False

rollback async

rollback()
Source code in tigrbl/session/base.py
57
58
59
60
61
62
63
64
65
66
67
async def rollback(self) -> None:
    # cancel queued add() tasks
    for t in self._pending:
        try:
            t.cancel()
        except Exception:
            pass
    self._pending.clear()
    await self._tx_rollback_impl()
    self._open = False
    self._dirty = False

in_transaction

in_transaction()
Source code in tigrbl/session/base.py
69
70
def in_transaction(self) -> bool:
    return bool(self._open)

add

add(obj)
Source code in tigrbl/session/base.py
73
74
75
76
77
78
79
80
81
82
83
84
def add(self, obj: Any) -> None:
    if self._spec and self._spec.read_only:
        raise RuntimeError("write attempted in read-only session (add)")
    self._dirty = True
    rv = self._add_impl(obj)
    if inspect.isawaitable(rv):
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(rv)
        else:
            self._pending.append(loop.create_task(rv))

delete async

delete(obj)
Source code in tigrbl/session/base.py
86
87
88
89
90
async def delete(self, obj: Any) -> None:
    if self._spec and self._spec.read_only:
        raise RuntimeError("write attempted in read-only session (delete)")
    self._dirty = True
    await self._delete_impl(obj)

flush async

flush()
Source code in tigrbl/session/base.py
92
93
94
95
96
97
98
99
async def flush(self) -> None:
    if self._pending:
        done, _ = await asyncio.wait(self._pending, return_when=asyncio.ALL_COMPLETED)
        self._pending = []
        # surface any exception
        for t in done:
            _ = t.result()
    await self._flush_impl()

refresh async

refresh(obj)
Source code in tigrbl/session/base.py
101
102
async def refresh(self, obj: Any) -> None:
    await self._refresh_impl(obj)

get async

get(model, ident)
Source code in tigrbl/session/base.py
104
105
async def get(self, model: type, ident: Any) -> Any | None:
    return await self._get_impl(model, ident)

execute async

execute(stmt)
Source code in tigrbl/session/base.py
107
108
async def execute(self, stmt: Any) -> Any:
    return await self._execute_impl(stmt)

close async

close()
Source code in tigrbl/session/base.py
110
111
112
113
114
115
116
117
async def close(self) -> None:
    for t in self._pending:
        try:
            t.cancel()
        except Exception:
            pass
    self._pending = []
    await self._close_impl()