Skip to content

Class tigrbl.engine._engine.Engine

tigrbl.engine._engine.Engine dataclass

Engine(spec)

spec instance-attribute

spec

provider class-attribute instance-attribute

provider = field(init=False)

is_async property

is_async

get_db property

get_db

supports

supports()
Source code in tigrbl/engine/_engine.py
 98
 99
100
101
102
def supports(self) -> dict:
    try:
        return self.provider.supports()
    except Exception:
        return {"engine": self.spec.kind or "unknown"}

raw

raw()
Source code in tigrbl/engine/_engine.py
116
117
def raw(self) -> Tuple[Any, SessionFactory]:
    return self.provider.ensure()

session

session()
Source code in tigrbl/engine/_engine.py
123
124
125
126
127
128
129
130
131
@contextmanager
def session(self) -> Session:
    db = self.provider.session()
    try:
        yield db  # type: ignore[return-value]
    finally:
        close = getattr(db, "close", None)
        if callable(close):
            close()

asession async

asession()
Source code in tigrbl/engine/_engine.py
133
134
135
136
137
138
139
140
141
142
143
144
@asynccontextmanager
async def asession(self) -> AsyncSession:
    db = self.provider.session()
    try:
        yield db  # type: ignore[return-value]
    finally:
        close = getattr(db, "close", None)
        if callable(close):
            # AsyncSession.close() is sync; close() may exist as async in some impls
            res = close()
            if hasattr(res, "__await__"):
                await res