Bases: Base
, GUIDPk
, Timestamped
, UserColumn
, TenantColumn
client_id
class-attribute
instance-attribute
client_id = acol(
storage=S(
PgUUID(as_uuid=True),
fk=ForeignKeySpec(target="authn.clients.id"),
nullable=False,
)
)
redirect_uri
class-attribute
instance-attribute
redirect_uri = acol(
storage=S(String(1000), nullable=False)
)
code_challenge
class-attribute
instance-attribute
code_challenge = acol(storage=S(String, nullable=True))
nonce
class-attribute
instance-attribute
nonce = acol(storage=S(String, nullable=True))
scope
class-attribute
instance-attribute
scope = acol(storage=S(String, nullable=True))
expires_at
class-attribute
instance-attribute
expires_at = acol(storage=S(TZDateTime, nullable=False))
claims
class-attribute
instance-attribute
claims = acol(storage=S(JSON, nullable=True))
created_at
class-attribute
instance-attribute
created_at = acol(
spec=ColumnSpec(
storage=S(
type_=TZDateTime,
default=tzutcnow,
nullable=False,
),
field=F(py_type=datetime),
io=RO_IO,
)
)
updated_at
class-attribute
instance-attribute
updated_at = acol(
spec=ColumnSpec(
storage=S(
type_=TZDateTime,
default=tzutcnow,
onupdate=tzutcnow,
nullable=False,
),
field=F(py_type=datetime),
io=RO_IO,
)
)
id
class-attribute
instance-attribute
id = acol(
spec=ColumnSpec(
storage=S(
type_=PgUUID(as_uuid=True),
primary_key=True,
default=uuid4,
),
field=F(
py_type=UUID,
constraints={"examples": [uuid_example]},
),
io=RO_IO,
)
)
metadata = MetaData(
naming_convention={
"pk": "pk_%(table_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"ix": "ix_%(table_name)s_%(column_0_name)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(column_0_name)s_%(constraint_type)s",
}
)
exchange
async
Source code in tigrbl_auth/orm/auth_code.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 | @op_ctx(alias="exchange", target="delete", arity="member")
async def exchange(cls, ctx, obj):
import secrets
from datetime import datetime, timezone
request = ctx.get("request")
_require_tls(request)
payload = ctx.get("payload") or {}
client_id = payload.get("client_id")
redirect_uri = payload.get("redirect_uri")
def _normalize(val: str | None) -> str | None:
if val is None:
return None
try:
u = UUID(val)
except ValueError:
u = UUID(hex=val)
pg = PgUUID(as_uuid=True)
pg.as_uuid = u
return pg.hex
expires_at = obj.expires_at if obj else None
if expires_at and expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
if (
obj is None
or obj.client_id.hex != _normalize(client_id)
or obj.redirect_uri != redirect_uri
or datetime.now(timezone.utc) > (expires_at or datetime.now(timezone.utc))
):
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"error": "invalid_grant"})
jwt_kwargs: dict[str, str] = {}
if obj.scope:
jwt_kwargs["scope"] = obj.scope
access, refresh = await _jwt.async_sign_pair(
sub=str(obj.user_id), tid=str(obj.tenant_id), **jwt_kwargs
)
nonce = obj.nonce or secrets.token_urlsafe(8)
extra_claims: dict[str, str] = {
"tid": str(obj.tenant_id),
"typ": "id",
"at_hash": oidc_hash(access),
}
if obj.claims and "id_token" in obj.claims:
user_obj = await User.handlers.read.core({"payload": {"id": obj.user_id}})
idc = obj.claims["id_token"]
if "email" in idc:
extra_claims["email"] = user_obj.email if user_obj else ""
if any(k in idc for k in ("name", "preferred_username")):
extra_claims["name"] = user_obj.username if user_obj else ""
id_token = await mint_id_token(
sub=str(obj.user_id),
aud=client_id,
nonce=nonce,
issuer=ISSUER,
**extra_claims,
)
await cls.handlers.delete.core({"obj": obj})
return {
"access_token": access,
"refresh_token": refresh,
"id_token": id_token,
}
|
tenant_id
Source code in tigrbl/orm/mixins/principals.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44 | @declared_attr
def tenant_id(cls) -> Mapped[UUID]:
schema = getattr(cls, "__tenant_table_schema__", None) or _infer_schema(cls)
spec = ColumnSpec(
storage=S(
type_=PgUUID(as_uuid=True),
fk=ForeignKeySpec(target=f"{schema}.tenants.id"),
nullable=False,
index=True,
),
field=F(py_type=UUID, constraints={"examples": [uuid_example]}),
io=CRUD_IO,
)
return acol(spec=spec)
|
user_id
Source code in tigrbl/orm/mixins/principals.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64 | @declared_attr
def user_id(cls) -> Mapped[UUID]:
schema = getattr(cls, "__user_table_schema__", None) or _infer_schema(cls)
spec = ColumnSpec(
storage=S(
type_=PgUUID(as_uuid=True),
fk=ForeignKeySpec(target=f"{schema}.users.id"),
nullable=False,
index=True,
),
field=F(py_type=UUID, constraints={"examples": [uuid_example]}),
io=CRUD_IO,
)
return acol(spec=spec)
|