Skip to content

Class tigrbl_auth.orm.auth_code.AuthCode

tigrbl_auth.orm.auth_code.AuthCode

Bases: Base, GUIDPk, Timestamped, UserColumn, TenantColumn

client_id class-attribute instance-attribute

client_id = acol(
    storage=S(
        PgUUID(as_uuid=True),
        fk=ForeignKeySpec(target="authn.clients.id"),
        nullable=False,
    )
)

redirect_uri class-attribute instance-attribute

redirect_uri = acol(
    storage=S(String(1000), nullable=False)
)

code_challenge class-attribute instance-attribute

code_challenge = acol(storage=S(String, nullable=True))

nonce class-attribute instance-attribute

nonce = acol(storage=S(String, nullable=True))

scope class-attribute instance-attribute

scope = acol(storage=S(String, nullable=True))

expires_at class-attribute instance-attribute

expires_at = acol(storage=S(TZDateTime, nullable=False))

claims class-attribute instance-attribute

claims = acol(storage=S(JSON, nullable=True))

created_at class-attribute instance-attribute

created_at = acol(
    spec=ColumnSpec(
        storage=S(
            type_=TZDateTime,
            default=tzutcnow,
            nullable=False,
        ),
        field=F(py_type=datetime),
        io=RO_IO,
    )
)

updated_at class-attribute instance-attribute

updated_at = acol(
    spec=ColumnSpec(
        storage=S(
            type_=TZDateTime,
            default=tzutcnow,
            onupdate=tzutcnow,
            nullable=False,
        ),
        field=F(py_type=datetime),
        io=RO_IO,
    )
)

id class-attribute instance-attribute

id = acol(
    spec=ColumnSpec(
        storage=S(
            type_=PgUUID(as_uuid=True),
            primary_key=True,
            default=uuid4,
        ),
        field=F(
            py_type=UUID,
            constraints={"examples": [uuid_example]},
        ),
        io=RO_IO,
    )
)

metadata class-attribute instance-attribute

metadata = MetaData(
    naming_convention={
        "pk": "pk_%(table_name)s",
        "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
        "ix": "ix_%(table_name)s_%(column_0_name)s",
        "uq": "uq_%(table_name)s_%(column_0_name)s",
        "ck": "ck_%(table_name)s_%(column_0_name)s_%(constraint_type)s",
    }
)

exchange async

exchange(ctx, obj)
Source code in tigrbl_auth/orm/auth_code.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@op_ctx(alias="exchange", target="delete", arity="member")
async def exchange(cls, ctx, obj):
    import secrets
    from datetime import datetime, timezone

    request = ctx.get("request")
    _require_tls(request)
    payload = ctx.get("payload") or {}
    client_id = payload.get("client_id")
    redirect_uri = payload.get("redirect_uri")

    def _normalize(val: str | None) -> str | None:
        if val is None:
            return None
        try:
            u = UUID(val)
        except ValueError:
            u = UUID(hex=val)
        pg = PgUUID(as_uuid=True)
        pg.as_uuid = u
        return pg.hex

    expires_at = obj.expires_at if obj else None
    if expires_at and expires_at.tzinfo is None:
        expires_at = expires_at.replace(tzinfo=timezone.utc)
    if (
        obj is None
        or obj.client_id.hex != _normalize(client_id)
        or obj.redirect_uri != redirect_uri
        or datetime.now(timezone.utc) > (expires_at or datetime.now(timezone.utc))
    ):
        raise HTTPException(status.HTTP_400_BAD_REQUEST, {"error": "invalid_grant"})
    jwt_kwargs: dict[str, str] = {}
    if obj.scope:
        jwt_kwargs["scope"] = obj.scope
    access, refresh = await _jwt.async_sign_pair(
        sub=str(obj.user_id), tid=str(obj.tenant_id), **jwt_kwargs
    )
    nonce = obj.nonce or secrets.token_urlsafe(8)
    extra_claims: dict[str, str] = {
        "tid": str(obj.tenant_id),
        "typ": "id",
        "at_hash": oidc_hash(access),
    }
    if obj.claims and "id_token" in obj.claims:
        user_obj = await User.handlers.read.core({"payload": {"id": obj.user_id}})
        idc = obj.claims["id_token"]
        if "email" in idc:
            extra_claims["email"] = user_obj.email if user_obj else ""
        if any(k in idc for k in ("name", "preferred_username")):
            extra_claims["name"] = user_obj.username if user_obj else ""
    id_token = await mint_id_token(
        sub=str(obj.user_id),
        aud=client_id,
        nonce=nonce,
        issuer=ISSUER,
        **extra_claims,
    )
    await cls.handlers.delete.core({"obj": obj})
    return {
        "access_token": access,
        "refresh_token": refresh,
        "id_token": id_token,
    }

tenant_id

tenant_id()
Source code in tigrbl/orm/mixins/principals.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@declared_attr
def tenant_id(cls) -> Mapped[UUID]:
    schema = getattr(cls, "__tenant_table_schema__", None) or _infer_schema(cls)
    spec = ColumnSpec(
        storage=S(
            type_=PgUUID(as_uuid=True),
            fk=ForeignKeySpec(target=f"{schema}.tenants.id"),
            nullable=False,
            index=True,
        ),
        field=F(py_type=UUID, constraints={"examples": [uuid_example]}),
        io=CRUD_IO,
    )
    return acol(spec=spec)

user_id

user_id()
Source code in tigrbl/orm/mixins/principals.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@declared_attr
def user_id(cls) -> Mapped[UUID]:
    schema = getattr(cls, "__user_table_schema__", None) or _infer_schema(cls)
    spec = ColumnSpec(
        storage=S(
            type_=PgUUID(as_uuid=True),
            fk=ForeignKeySpec(target=f"{schema}.users.id"),
            nullable=False,
            index=True,
        ),
        field=F(py_type=UUID, constraints={"examples": [uuid_example]}),
        io=CRUD_IO,
    )
    return acol(spec=spec)