Skip to content

Class peagen.tui.app.QueueDashboardApp

peagen.tui.app.QueueDashboardApp

QueueDashboardApp(gateway_url='http://localhost:8000')

Bases: App

Source code in peagen/tui/app.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def __init__(self, gateway_url: str = "http://localhost:8000") -> None:
    super().__init__()
    ws_url = gateway_url.replace("http", "ws").rstrip("/") + "/ws/tasks"
    self.client = TaskStreamClient(ws_url)
    self.backend = RemoteBackend(gateway_url)
    self.sort_key = "time"
    self.sort_reverse = False
    self.filter_id: str | None = None
    self.filter_pool: str | None = None
    self.filter_status: str | None = None
    self.filter_action: str | None = None
    self.filter_label: str | None = None
    self.collapsed: set[str] = set()
    self._seen_parents: set[str] = set()
    self._reconnect_screen: ReconnectScreen | None = None
    self._filter_debounce_timer = None
    self._current_file: str | None = None
    self._remote_info: tuple | None = None
    self.ws_connected = False  # Track WebSocket connection status
    self.limit = 50
    self.offset = 0

CSS class-attribute instance-attribute

CSS = '\n    TabPane#editor { height: 1fr; width:  1fr; }\n    TextArea#code_editor { height: 1fr; width:  1fr; }\n\n    /* Styles for the filter section to ensure visibility */\n    #filter-section-container {\n        padding: 0 1; /* Horizontal padding */\n        margin-bottom: 1; /* Gap below the entire filter section */\n        /* Fixed height: Label (1) + Label\'s padding-bottom (1) + FilterBar (1, assuming compact Selects) */\n        height: 3; \n    }\n\n    #filter-title-label {\n        padding-bottom: 1; /* Space between "Filter" title and the FilterBar */\n        text-style: bold;\n        /* The Label widget itself is typically 1 cell high */\n    }\n\n    /* Ensure the main TabbedContent (holding Pools, Tasks, etc.) takes up the remaining space */\n    /* This targets the TabbedContent that is a direct child of the main Vertical layout container */\n    Vertical > TabbedContent {\n        height: 1fr; /* Takes up the remaining flexible vertical space */\n    }\n    '

TITLE class-attribute instance-attribute

TITLE = 'Peagen'

BINDINGS class-attribute instance-attribute

BINDINGS = [
    ("1", "switch('pools')", "Pools"),
    ("2", "switch('tasks')", "Tasks"),
    ("3", "switch('errors')", "Errors"),
    ("4", "switch('artifacts')", "Artifacts"),
    ("5", "switch('templates')", "Templates"),
    ("ctrl+s", "save_file", "Save"),
    ("c", "toggle_children", "Collapse"),
    ("space", "toggle_children", "Collapse"),
    ("ctrl+c", "copy_selection", "Copy"),
    ("ctrl+p", "paste_clipboard", "Paste"),
    ("s", "cycle_sort", "Sort"),
    ("escape", "clear_filters", "Clear Filters"),
    ("n", "next_page", "Next Page"),
    ("p", "prev_page", "Prev Page"),
    ("l", "set_limit", "Limit"),
    ("j", "jump_page", "Jump Page"),
    ("q", "quit", "Quit"),
]

SORT_KEYS class-attribute instance-attribute

SORT_KEYS = [
    "time",
    "pool",
    "status",
    "action",
    "label",
    "duration",
    "id",
    "date_created",
    "last_modified",
    "error",
]

LIMIT_OPTIONS class-attribute instance-attribute

LIMIT_OPTIONS = [10, 20, 50, 100]

COLUMN_LABEL_TO_SORT_KEY class-attribute instance-attribute

COLUMN_LABEL_TO_SORT_KEY = {
    "ID": "id",
    "Pool": "pool",
    "Status": "status",
    "Action": "action",
    "Labels": "label",
    "Started": "date_created",
    "Finished": "last_modified",
    "Duration (s)": "duration",
    "Error": "error",
}

queue_len class-attribute instance-attribute

queue_len = reactive(0)

done_len class-attribute instance-attribute

done_len = reactive(0)

fail_len class-attribute instance-attribute

fail_len = reactive(0)

worker_len class-attribute instance-attribute

worker_len = reactive(0)

client instance-attribute

client = TaskStreamClient(ws_url)

backend instance-attribute

backend = RemoteBackend(gateway_url)

sort_key instance-attribute

sort_key = 'time'

sort_reverse instance-attribute

sort_reverse = False

filter_id instance-attribute

filter_id = None

filter_pool instance-attribute

filter_pool = None

filter_status instance-attribute

filter_status = None

filter_action instance-attribute

filter_action = None

filter_label instance-attribute

filter_label = None

collapsed instance-attribute

collapsed = set()

ws_connected instance-attribute

ws_connected = False

limit instance-attribute

limit = 50

offset instance-attribute

offset = 0

on_mount async

on_mount()
Source code in peagen/tui/app.py
299
300
301
302
303
304
305
306
307
async def on_mount(self):
    # Register for connection status changes
    self.client.on_connection_change(self.on_websocket_connection_change)

    self.run_worker(
        self.client.listen(), exclusive=True, group="websocket_listener"
    )
    self.set_interval(1.0, self._refresh_backend_and_ui)
    self.trigger_data_processing()

on_websocket_connection_change async

on_websocket_connection_change(is_connected, error_msg)

Handle websocket connection status changes.

Source code in peagen/tui/app.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
async def on_websocket_connection_change(
    self, is_connected: bool, error_msg: str
) -> None:
    """Handle websocket connection status changes."""
    self.ws_connected = is_connected

    if not is_connected:
        # Show reconnect screen for websocket disconnection
        await self._show_reconnect(f"WebSocket disconnected: {error_msg}")
    elif is_connected and self._reconnect_screen:
        # Only dismiss if the backend is also connected
        ok = await self.backend.refresh(limit=self.limit, offset=self.offset)
        if ok:
            await self._dismiss_reconnect()
            self.toast("Connection re-established", duration=2.0)
            self.trigger_data_processing(debounce=False)

retry_connection async

retry_connection()
Source code in peagen/tui/app.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
async def retry_connection(self) -> None:
    # Restart the websocket listener
    self.run_worker(
        self.client.listen(), exclusive=True, group="websocket_listener_retry"
    )

    # Try to refresh backend data
    ok = await self.backend.refresh(limit=self.limit, offset=self.offset)

    # Only dismiss if both backend and websocket are connected
    if ok and self.ws_connected:
        await self._dismiss_reconnect()
    else:
        # Update the message with the current error state
        error_msg = self.backend.last_error or "Connection failed"
        if not self.ws_connected:
            error_msg = f"WebSocket disconnected. {error_msg}"

        if self._reconnect_screen:
            self._reconnect_screen.message = error_msg
        else:
            await self._show_reconnect(error_msg)

trigger_data_processing

trigger_data_processing(debounce=True)
Source code in peagen/tui/app.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def trigger_data_processing(self, debounce: bool = True) -> None:
    if debounce:
        if self._filter_debounce_timer is not None:
            self._filter_debounce_timer.stop()
        self._filter_debounce_timer = self.set_timer(
            0.3,
            lambda: self.run_worker(
                self.async_process_and_update_data(),
                exclusive=True,
                group="data_refresh_worker",
            ),
            name="filter_debounce_timer",
        )
    else:
        self.run_worker(
            self.async_process_and_update_data(),
            exclusive=True,
            group="data_refresh_worker",
        )

on_select_changed async

on_select_changed(event)
Source code in peagen/tui/app.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
async def on_select_changed(self, event: Select.Changed) -> None:
    event.stop()
    value = str(event.value) if event.value != Select.BLANK else None
    filter_changed = False

    if event.control.id == "filter_id":
        if self.filter_id != value:
            self.filter_id = value
            filter_changed = True
    elif event.control.id == "filter_pool":
        if self.filter_pool != value:
            self.filter_pool = value
            filter_changed = True
    elif event.control.id == "filter_status":
        if self.filter_status != value:
            self.filter_status = value
            filter_changed = True
    elif event.control.id == "filter_action":
        if self.filter_action != value:
            self.filter_action = value
            filter_changed = True
    elif event.control.id == "filter_label":
        if self.filter_label != value:
            self.filter_label = value
            filter_changed = True

    if filter_changed:
        self.trigger_data_processing()

on_input_changed async

on_input_changed(event)
Source code in peagen/tui/app.py
414
415
416
417
418
419
420
421
422
423
424
425
async def on_input_changed(self, event: Input.Changed) -> None:
    event.stop()
    value = event.value.strip() if event.value else None
    filter_changed = False

    if event.input.id == "filter_id":
        if self.filter_id != value:
            self.filter_id = value
            filter_changed = True

    if filter_changed:
        self.trigger_data_processing()

async_process_and_update_data async

async_process_and_update_data()
Source code in peagen/tui/app.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
async def async_process_and_update_data(self) -> None:
    all_tasks_from_client = list(self.client.tasks.values())
    all_tasks_from_backend = list(self.backend.tasks)

    combined_tasks_dict: Dict[str, Any] = {}
    for task in all_tasks_from_backend + all_tasks_from_client:
        tid = task.get("id")
        if tid is not None:
            combined_tasks_dict[tid] = task
    all_tasks = list(combined_tasks_dict.values())

    for task in all_tasks:
        result_data = task.get("result") or {}
        if result_data.get("children"):
            tid_str = str(task.get("id"))
            if tid_str not in self._seen_parents:
                self._seen_parents.add(tid_str)
                self.collapsed.add(tid_str)

    current_filter_criteria = {
        "id": self.filter_id,
        "pool": self.filter_pool,
        "status": self.filter_status,
        "action": self.filter_action,
        "label": self.filter_label,
        "sort_key": self.sort_key,
        "sort_reverse": self.sort_reverse,
        "collapsed": self.collapsed.copy(),
    }
    processed_data = self._perform_filtering_and_sorting(
        all_tasks,
        current_filter_criteria,
        limit=self.limit,
        offset=self.offset,
    )
    self.call_later(self._update_ui_with_processed_data, processed_data, all_tasks)

on_open_url async

on_open_url(event)
Source code in peagen/tui/app.py
725
726
727
728
729
730
731
732
733
async def on_open_url(self, event: events.OpenURL) -> None:
    if event.url.startswith("file://"):
        event.prevent_default()
        event.stop()
        await self.open_editor(event.url.removeprefix("file://"))
    if event.url.startswith("oid:"):
        event.prevent_default()
        event.stop()
        await self.open_git_oid(event.url.removeprefix("oid:"))

on_file_tree_file_selected async

on_file_tree_file_selected(message)
Source code in peagen/tui/app.py
735
736
737
async def on_file_tree_file_selected(self, message: FileTree.FileSelected) -> None:
    message.stop()
    await self.open_editor(message.path.as_posix())

toast

toast(message, *, style='information', duration=2.0)
Source code in peagen/tui/app.py
739
740
741
742
743
744
745
def toast(
    self, message: str, *, style: str = "information", duration: float | None = 2.0
) -> None:
    if hasattr(self, "notify"):
        self.notify(message, severity=style, timeout=duration)
    else:
        self.log(f"[{style.upper()}] {message}")

compose

compose()
Source code in peagen/tui/app.py
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def compose(self) -> ComposeResult:
    yield Header()
    self.workers_view = WorkersView(id="workers_view")
    self.file_tree = FileTree("tree", id="file_tree")
    self.templates_tree = TemplatesView(id="templates_tree")
    self.tasks_table = TaskTable(self.open_task_detail, id="tasks_table")
    self.tasks_table.add_columns(
        "ID",
        "Pool",
        "Status",
        "Action",
        "Labels",
        "Started",
        "Finished",
        "Duration (s)",
    )
    self.tasks_table.cursor_type = "cell"

    self.err_table = TaskTable(self.open_task_detail, id="err_table")
    self.err_table.add_columns(
        "ID",
        "Pool",
        "Status",
        "Action",
        "Labels",
        "Started",
        "Finished",
        "Duration (s)",
        "Error",
    )
    self.err_table.cursor_type = "cell"

    self.file_tabs = TabbedContent(id="file_tabs")
    self.file_tabs.display = False
    self.filter_bar = FilterBar()

    with Vertical():
        with Vertical(id="filter-section-container"):
            yield Label("Filter", id="filter-title-label")
            yield self.filter_bar
        with TabbedContent(initial="pools"):
            yield TabPane("Pools", self.workers_view, id="pools")
            yield TabPane("Tasks", self.tasks_table, id="tasks")
            yield TabPane("Errors", self.err_table, id="errors")
            yield TabPane("Artifacts", self.file_tree, id="artifacts")
            yield TabPane("Templates", self.templates_tree, id="templates")

    yield self.file_tabs
    self.footer = DashboardFooter()
    yield self.footer
    self.call_later(self.tasks_table.focus)

action_switch

action_switch(tab_id)
Source code in peagen/tui/app.py
799
800
801
802
803
804
805
806
807
def action_switch(self, tab_id: str) -> None:
    try:
        main_tab_content = self.query(TabbedContent).first()
        main_tab_content.active = tab_id
    except NoMatches:
        self.toast(
            f"Could not switch to tab ID '{tab_id}'. Main TabbedContent not found.",
            style="error",
        )

action_save_file

action_save_file()
Source code in peagen/tui/app.py
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
def action_save_file(self) -> None:
    if not self._current_file:
        self.toast("No file loaded.", style="yellow")
        return

    active_pane = self.file_tabs.active_pane
    if not active_pane:
        self.toast("No active file tab to save.", style="yellow")
        return

    try:
        editor = active_pane.query_one(TextArea)
    except NoMatches:
        self.toast("Could not find editor in the active tab.", style="error")
        return

    text_to_save = editor.text
    if self._remote_info:
        adapter, key, tmp_path_obj = self._remote_info
        Path(tmp_path_obj).write_text(text_to_save, encoding="utf-8")
        try:
            upload_remote(adapter, key, Path(tmp_path_obj))
            self.toast("Uploaded remote file", style="success")
        except Exception as exc:
            self.toast(f"Upload failed: {exc}", style="error")
    else:
        try:
            Path(self._current_file).write_text(text_to_save, encoding="utf-8")
            self.toast(f"Saved {self._current_file}", style="success")
        except Exception as exc:
            self.toast(f"Save failed: {exc}", style="error")

action_toggle_children

action_toggle_children()
Source code in peagen/tui/app.py
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
def action_toggle_children(self) -> None:
    row = self.tasks_table.cursor_row
    if row is None:
        return
    if hasattr(self.tasks_table, "get_row_key"):
        row_key = self.tasks_table.get_row_key(row)
    else:
        row_obj = (
            self.tasks_table.get_row_at(row)
            if hasattr(self.tasks_table, "get_row_at")
            else None
        )
        row_key = getattr(row_obj, "key", None) if row_obj else None
    if row_key is None:
        return
    row_key_str = str(row_key)
    if row_key_str in self.collapsed:
        self.collapsed.remove(row_key_str)
    else:
        self.collapsed.add(row_key_str)
    self.trigger_data_processing()

action_cycle_sort

action_cycle_sort()
Source code in peagen/tui/app.py
863
864
865
866
867
868
869
870
871
def action_cycle_sort(self) -> None:
    try:
        idx = self.SORT_KEYS.index(self.sort_key)
    except ValueError:
        idx = 0
    self.sort_key = self.SORT_KEYS[(idx + 1) % len(self.SORT_KEYS)]
    self.sort_reverse = False
    self.toast(f"Sorting by {self.sort_key}", duration=1.0)
    self.trigger_data_processing()

action_filter_by_cell

action_filter_by_cell()
Source code in peagen/tui/app.py
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
def action_filter_by_cell(self) -> None:
    row = self.tasks_table.cursor_row
    col = self.tasks_table.cursor_column
    if row is None or col is None:
        return
    value = (
        self.tasks_table.get_cell_at(Coordinate(row, col))
        if hasattr(self.tasks_table, "get_cell_at")
        else self.tasks_table.get_cell(row, col)
    )

    col_label_widget = self.tasks_table.columns[col].label
    col_label = str(col_label_widget)

    filter_changed = False
    str_value = str(value)

    if col_label == "Pool":
        if self.filter_pool != str_value:
            self.filter_pool = None if self.filter_pool == str_value else str_value
            filter_changed = True
    elif col_label == "Status":
        if self.filter_status != str_value:
            self.filter_status = (
                None if self.filter_status == str_value else str_value
            )
            filter_changed = True
    elif col_label == "Action":
        if self.filter_action != str_value:
            self.filter_action = (
                None if self.filter_action == str_value else str_value
            )
            filter_changed = True
    elif col_label == "Labels":
        lbl = str_value.split(",")[0] if str_value else ""
        if self.filter_label != lbl:
            self.filter_label = None if self.filter_label == lbl else lbl
            filter_changed = True

    if filter_changed:
        if hasattr(self, "filter_bar"):
            if col_label == "Pool":
                self.filter_bar.pool_select.value = (
                    self.filter_pool
                    if self.filter_pool is not None
                    else Select.BLANK
                )
            elif col_label == "Status":
                self.filter_bar.status_select.value = (
                    self.filter_status
                    if self.filter_status is not None
                    else Select.BLANK
                )
            elif col_label == "Action":
                self.filter_bar.action_select.value = (
                    self.filter_action
                    if self.filter_action is not None
                    else Select.BLANK
                )
            elif col_label == "Labels":
                self.filter_bar.label_select.value = (
                    self.filter_label
                    if self.filter_label is not None
                    else Select.BLANK
                )
        self.trigger_data_processing()

action_clear_filters

action_clear_filters()
Source code in peagen/tui/app.py
940
941
942
943
944
945
946
947
948
def action_clear_filters(self) -> None:
    self.filter_id = None
    self.filter_pool = None
    self.filter_status = None
    self.filter_action = None
    self.filter_label = None
    if hasattr(self, "filter_bar"):
        self.filter_bar.clear()
    self.trigger_data_processing()

action_copy_selection

action_copy_selection()

Copy highlighted or focused text to the clipboard.

Source code in peagen/tui/app.py
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
def action_copy_selection(self) -> None:
    """Copy highlighted or focused text to the clipboard."""

    widget = self.focused
    text = ""
    if isinstance(widget, DataTable):
        row = widget.cursor_row
        key_value = None
        if row is not None:
            if hasattr(widget, "ordered_rows"):
                try:
                    key_value = widget.ordered_rows[row].key
                except Exception:
                    key_value = None
            if key_value is None and hasattr(widget, "get_row_key"):
                try:
                    key_value = widget.get_row_key(row)  # type: ignore[attr-defined]
                except Exception:
                    key_value = None
        if key_value is not None:
            text = str(getattr(key_value, "value", key_value))
        else:
            row, col = widget.cursor_row, widget.cursor_column
            if row is not None and col is not None:
                value = (
                    widget.get_cell_at(Coordinate(row, col))
                    if hasattr(widget, "get_cell_at")
                    else widget.get_cell(row, col)
                )
                text = str(value)
    elif isinstance(widget, TextArea):
        text = widget.selected_text or widget.text
    elif isinstance(widget, Tree):
        node = getattr(widget, "cursor_node", None)
        if node is not None:
            text = str(getattr(node, "label", ""))
    elif hasattr(widget, "selected_text"):
        text = widget.selected_text
    elif hasattr(widget, "text"):
        text = widget.text
    elif hasattr(widget, "value"):
        text = str(widget.value)
    if text:
        clipboard_copy(text)

action_paste_clipboard

action_paste_clipboard()
Source code in peagen/tui/app.py
 995
 996
 997
 998
 999
1000
1001
1002
1003
def action_paste_clipboard(self) -> None:
    widget = self.focused
    text_to_paste = clipboard_paste()
    if isinstance(widget, TextArea):
        widget.insert_text_at_cursor(text_to_paste)
    elif hasattr(widget, "insert_text_at_cursor"):
        widget.insert_text_at_cursor(text_to_paste)
    elif hasattr(widget, "insert"):
        widget.insert(text_to_paste)

action_next_page

action_next_page()
Source code in peagen/tui/app.py
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
def action_next_page(self) -> None:
    self.offset += self.limit
    coro = self.backend.refresh(limit=self.limit, offset=self.offset)
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        asyncio.run(coro)
    else:
        self.run_worker(coro, exclusive=True, group="data_refresh_worker")
    self.trigger_data_processing(debounce=False)

action_prev_page

action_prev_page()
Source code in peagen/tui/app.py
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
def action_prev_page(self) -> None:
    if self.offset >= self.limit:
        self.offset -= self.limit
        coro = self.backend.refresh(limit=self.limit, offset=self.offset)
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(coro)
        else:
            self.run_worker(coro, exclusive=True, group="data_refresh_worker")
        self.trigger_data_processing(debounce=False)

action_set_limit

action_set_limit(limit=None)

Set the number of tasks shown per page.

Source code in peagen/tui/app.py
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
def action_set_limit(self, limit: int | None = None) -> None:
    """Set the number of tasks shown per page."""

    if limit is None:
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(self._prompt_and_set_limit())
        else:
            self.run_worker(self._prompt_and_set_limit(), exclusive=True)
        return

    if limit <= 0:
        limit = 1
    self.limit = limit
    self.offset = 0
    coro = self.backend.refresh(limit=self.limit, offset=self.offset)
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        asyncio.run(coro)
    else:
        self.run_worker(coro, exclusive=True, group="data_refresh_worker")
    self.trigger_data_processing(debounce=False)

action_jump_page

action_jump_page(page=None)

Jump directly to page if provided or prompt the user.

Source code in peagen/tui/app.py
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
def action_jump_page(self, page: int | None = None) -> None:
    """Jump directly to *page* if provided or prompt the user."""
    if page is None:
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(self._prompt_and_jump())
        else:
            self.run_worker(self._prompt_and_jump(), exclusive=True)
        return
    self._apply_jump_page(page)

on_data_table_cell_selected async

on_data_table_cell_selected(event)
Source code in peagen/tui/app.py
1095
1096
1097
1098
1099
1100
1101
1102
async def on_data_table_cell_selected(self, event: DataTable.CellSelected) -> None:
    if isinstance(event.value, str) and event.value.startswith("[link="):
        path_str = event.value.split("=", 1)[1].split("]", 1)[0]
        await self.open_editor(Path(path_str).as_posix())
        return

    # Selection events no longer open task details automatically.
    return

on_data_table_header_selected async

on_data_table_header_selected(event)

Sort a table when the user clicks a column header.

Source code in peagen/tui/app.py
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
async def on_data_table_header_selected(
    self, event: DataTable.HeaderSelected
) -> None:
    """Sort a table when the user clicks a column header."""
    table = event.control
    column_key = event.column_key

    reverse = False
    last_key = getattr(table, "_last_sort_key", None)
    last_reverse = getattr(table, "_last_sort_reverse", False)
    if last_key == column_key:
        reverse = not last_reverse

    table.sort(column_key, reverse=reverse)
    table._last_sort_key = column_key
    table._last_sort_reverse = reverse

    label_plain = event.label.plain
    sort_key = self.COLUMN_LABEL_TO_SORT_KEY.get(label_plain)
    if sort_key:
        self.sort_key = sort_key
    self.sort_reverse = reverse
    self.trigger_data_processing(debounce=False)
    event.stop()

open_task_detail async

open_task_detail(task_id)
Source code in peagen/tui/app.py
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
async def open_task_detail(self, task_id: str) -> None:
    task = self.client.tasks.get(task_id)
    if not task:
        task = next(
            (t for t in self.backend.tasks if str(t.get("id")) == task_id), None
        )
    if task:
        await self.push_screen(TaskDetailScreen(task_data=task))
    else:
        self.toast(f"Task details not found for ID: {task_id}", style="warning")

open_editor async

open_editor(file_path)
Source code in peagen/tui/app.py
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
async def open_editor(self, file_path: str) -> None:
    parsed = urlparse(file_path)
    text = ""
    self._remote_info = None
    self._current_file = None
    actual_file_path = file_path

    if parsed.scheme and parsed.scheme != "file":
        try:
            tmp_path_obj, adapter, key = download_remote(file_path)
            self._remote_info = (adapter, key, tmp_path_obj)
            text = tmp_path_obj.read_text(encoding="utf-8")
            self._current_file = tmp_path_obj.as_posix()
            actual_file_path = file_path
        except Exception as exc:
            self.toast(f"Cannot download {file_path}: {exc}", style="error")
            return
    else:
        local_path = Path(file_path)
        try:
            text = local_path.read_text(encoding="utf-8")
            self._current_file = local_path.as_posix()
            actual_file_path = self._current_file
        except Exception as exc:
            self.toast(f"Cannot open {file_path}: {exc}", style="error")
            return

    pane_id = actual_file_path
    lang_map = {
        ".py": "python",
        ".js": "javascript",
        ".json": "json",
        ".yaml": "yaml",
        ".yml": "yaml",
        ".md": "markdown",
        ".html": "html",
        ".css": "css",
        ".toml": "toml",
        ".txt": "text",
        "": "text",
    }
    extension = Path(file_path).suffix.lower()
    language = lang_map.get(extension, "text")

    try:
        existing_pane = self.file_tabs.get_pane(pane_id)
        editor = existing_pane.query_one(TextArea)
        editor.load_text(text)
        editor.language = language
    except NoMatches:
        editor_id = f"editor_{self.file_tabs.tab_count}"
        editor = TextArea(text, id=editor_id, language=language)
        new_pane_widget = TabPane(Path(file_path).name, editor, id=pane_id)
        if not self.file_tabs.display:
            self.file_tabs.display = True
        await self.file_tabs.add_pane(new_pane_widget)

    self.file_tabs.active = pane_id
    self.toast(f"Editing {Path(file_path).name}", style="success", duration=1.5)

open_text async

open_text(name, text, language='text')
Source code in peagen/tui/app.py
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
async def open_text(self, name: str, text: str, language: str = "text") -> None:
    pane_id = f"text:{name}"
    try:
        existing_pane = self.file_tabs.get_pane(pane_id)
        editor = existing_pane.query_one(TextArea)
        editor.load_text(text)
        editor.language = language
    except NoMatches:
        editor = TextArea(text, language=language)
        new_pane = TabPane(name, editor, id=pane_id)
        if not self.file_tabs.display:
            self.file_tabs.display = True
        await self.file_tabs.add_pane(new_pane)
    self.file_tabs.active = pane_id

open_git_oid async

open_git_oid(oid)
Source code in peagen/tui/app.py
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
async def open_git_oid(self, oid: str) -> None:
    try:
        from peagen.core.mirror_core import open_repo

        vcs = open_repo(".")
        content = vcs.object_pretty(oid)
    except Exception as exc:
        self.toast(f"Cannot load OID {oid}: {exc}", style="error")
        return
    await self.open_text(oid, content)