fixed subscription table

This commit is contained in:
2025-02-02 00:02:31 -05:00
parent a1ab31acfe
commit ef5f57e678
5389 changed files with 686710 additions and 28 deletions

View File

@@ -0,0 +1,638 @@
"""
Module to define and register Terminal IPython shortcuts with
:mod:`prompt_toolkit`
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import os
import signal
import sys
import warnings
from dataclasses import dataclass
from typing import Callable, Any, Optional, List
from prompt_toolkit.application.current import get_app
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
from prompt_toolkit.key_binding.bindings import named_commands as nc
from prompt_toolkit.key_binding.bindings.completion import (
display_completions_like_readline,
)
from prompt_toolkit.key_binding.vi_state import InputMode, ViState
from prompt_toolkit.filters import Condition
from IPython.core.getipython import get_ipython
from IPython.terminal.shortcuts import auto_match as match
from IPython.terminal.shortcuts import auto_suggest
from IPython.terminal.shortcuts.filters import filter_from_string
from IPython.utils.decorators import undoc
from prompt_toolkit.enums import DEFAULT_BUFFER
__all__ = ["create_ipython_shortcuts"]
@dataclass
class BaseBinding:
command: Callable[[KeyPressEvent], Any]
keys: List[str]
@dataclass
class RuntimeBinding(BaseBinding):
filter: Condition
@dataclass
class Binding(BaseBinding):
# while filter could be created by referencing variables directly (rather
# than created from strings), by using strings we ensure that users will
# be able to create filters in configuration (e.g. JSON) files too, which
# also benefits the documentation by enforcing human-readable filter names.
condition: Optional[str] = None
def __post_init__(self):
if self.condition:
self.filter = filter_from_string(self.condition)
else:
self.filter = None
def create_identifier(handler: Callable):
parts = handler.__module__.split(".")
name = handler.__name__
package = parts[0]
if len(parts) > 1:
final_module = parts[-1]
return f"{package}:{final_module}.{name}"
else:
return f"{package}:{name}"
AUTO_MATCH_BINDINGS = [
*[
Binding(
cmd, [key], "focused_insert & auto_match & followed_by_closing_paren_or_end"
)
for key, cmd in match.auto_match_parens.items()
],
*[
# raw string
Binding(cmd, [key], "focused_insert & auto_match & preceded_by_raw_str_prefix")
for key, cmd in match.auto_match_parens_raw_string.items()
],
Binding(
match.double_quote,
['"'],
"focused_insert"
" & auto_match"
" & not_inside_unclosed_string"
" & preceded_by_paired_double_quotes"
" & followed_by_closing_paren_or_end",
),
Binding(
match.single_quote,
["'"],
"focused_insert"
" & auto_match"
" & not_inside_unclosed_string"
" & preceded_by_paired_single_quotes"
" & followed_by_closing_paren_or_end",
),
Binding(
match.docstring_double_quotes,
['"'],
"focused_insert"
" & auto_match"
" & not_inside_unclosed_string"
" & preceded_by_two_double_quotes",
),
Binding(
match.docstring_single_quotes,
["'"],
"focused_insert"
" & auto_match"
" & not_inside_unclosed_string"
" & preceded_by_two_single_quotes",
),
Binding(
match.skip_over,
[")"],
"focused_insert & auto_match & followed_by_closing_round_paren",
),
Binding(
match.skip_over,
["]"],
"focused_insert & auto_match & followed_by_closing_bracket",
),
Binding(
match.skip_over,
["}"],
"focused_insert & auto_match & followed_by_closing_brace",
),
Binding(
match.skip_over, ['"'], "focused_insert & auto_match & followed_by_double_quote"
),
Binding(
match.skip_over, ["'"], "focused_insert & auto_match & followed_by_single_quote"
),
Binding(
match.delete_pair,
["backspace"],
"focused_insert"
" & preceded_by_opening_round_paren"
" & auto_match"
" & followed_by_closing_round_paren",
),
Binding(
match.delete_pair,
["backspace"],
"focused_insert"
" & preceded_by_opening_bracket"
" & auto_match"
" & followed_by_closing_bracket",
),
Binding(
match.delete_pair,
["backspace"],
"focused_insert"
" & preceded_by_opening_brace"
" & auto_match"
" & followed_by_closing_brace",
),
Binding(
match.delete_pair,
["backspace"],
"focused_insert"
" & preceded_by_double_quote"
" & auto_match"
" & followed_by_double_quote",
),
Binding(
match.delete_pair,
["backspace"],
"focused_insert"
" & preceded_by_single_quote"
" & auto_match"
" & followed_by_single_quote",
),
]
AUTO_SUGGEST_BINDINGS = [
# there are two reasons for re-defining bindings defined upstream:
# 1) prompt-toolkit does not execute autosuggestion bindings in vi mode,
# 2) prompt-toolkit checks if we are at the end of text, not end of line
# hence it does not work in multi-line mode of navigable provider
Binding(
auto_suggest.accept_or_jump_to_end,
["end"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept_or_jump_to_end,
["c-e"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept,
["c-f"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept,
["right"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept_word,
["escape", "f"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept_token,
["c-right"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.discard,
["escape"],
# note this one is using `emacs_insert_mode`, not `emacs_like_insert_mode`
# as in `vi_insert_mode` we do not want `escape` to be shadowed (ever).
"has_suggestion & default_buffer_focused & emacs_insert_mode",
),
Binding(
auto_suggest.discard,
["delete"],
"has_suggestion & default_buffer_focused & emacs_insert_mode",
),
Binding(
auto_suggest.swap_autosuggestion_up,
["c-up"],
"navigable_suggestions"
" & ~has_line_above"
" & has_suggestion"
" & default_buffer_focused",
),
Binding(
auto_suggest.swap_autosuggestion_down,
["c-down"],
"navigable_suggestions"
" & ~has_line_below"
" & has_suggestion"
" & default_buffer_focused",
),
Binding(
auto_suggest.up_and_update_hint,
["c-up"],
"has_line_above & navigable_suggestions & default_buffer_focused",
),
Binding(
auto_suggest.down_and_update_hint,
["c-down"],
"has_line_below & navigable_suggestions & default_buffer_focused",
),
Binding(
auto_suggest.accept_character,
["escape", "right"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept_and_move_cursor_left,
["c-left"],
"has_suggestion & default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.accept_and_keep_cursor,
["escape", "down"],
"has_suggestion & default_buffer_focused & emacs_insert_mode",
),
Binding(
auto_suggest.backspace_and_resume_hint,
["backspace"],
# no `has_suggestion` here to allow resuming if no suggestion
"default_buffer_focused & emacs_like_insert_mode",
),
Binding(
auto_suggest.resume_hinting,
["right"],
"is_cursor_at_the_end_of_line"
" & default_buffer_focused"
" & emacs_like_insert_mode"
" & pass_through",
),
]
SIMPLE_CONTROL_BINDINGS = [
Binding(cmd, [key], "vi_insert_mode & default_buffer_focused & ebivim")
for key, cmd in {
"c-a": nc.beginning_of_line,
"c-b": nc.backward_char,
"c-k": nc.kill_line,
"c-w": nc.backward_kill_word,
"c-y": nc.yank,
"c-_": nc.undo,
}.items()
]
ALT_AND_COMOBO_CONTROL_BINDINGS = [
Binding(cmd, list(keys), "vi_insert_mode & default_buffer_focused & ebivim")
for keys, cmd in {
# Control Combos
("c-x", "c-e"): nc.edit_and_execute,
("c-x", "e"): nc.edit_and_execute,
# Alt
("escape", "b"): nc.backward_word,
("escape", "c"): nc.capitalize_word,
("escape", "d"): nc.kill_word,
("escape", "h"): nc.backward_kill_word,
("escape", "l"): nc.downcase_word,
("escape", "u"): nc.uppercase_word,
("escape", "y"): nc.yank_pop,
("escape", "."): nc.yank_last_arg,
}.items()
]
def add_binding(bindings: KeyBindings, binding: Binding):
bindings.add(
*binding.keys,
**({"filter": binding.filter} if binding.filter is not None else {}),
)(binding.command)
def create_ipython_shortcuts(shell, skip=None) -> KeyBindings:
"""Set up the prompt_toolkit keyboard shortcuts for IPython.
Parameters
----------
shell: InteractiveShell
The current IPython shell Instance
skip: List[Binding]
Bindings to skip.
Returns
-------
KeyBindings
the keybinding instance for prompt toolkit.
"""
kb = KeyBindings()
skip = skip or []
for binding in KEY_BINDINGS:
skip_this_one = False
for to_skip in skip:
if (
to_skip.command == binding.command
and to_skip.filter == binding.filter
and to_skip.keys == binding.keys
):
skip_this_one = True
break
if skip_this_one:
continue
add_binding(kb, binding)
def get_input_mode(self):
app = get_app()
app.ttimeoutlen = shell.ttimeoutlen
app.timeoutlen = shell.timeoutlen
return self._input_mode
def set_input_mode(self, mode):
shape = {InputMode.NAVIGATION: 2, InputMode.REPLACE: 4}.get(mode, 6)
cursor = "\x1b[{} q".format(shape)
sys.stdout.write(cursor)
sys.stdout.flush()
self._input_mode = mode
if shell.editing_mode == "vi" and shell.modal_cursor:
ViState._input_mode = InputMode.INSERT # type: ignore
ViState.input_mode = property(get_input_mode, set_input_mode) # type: ignore
return kb
def reformat_and_execute(event):
"""Reformat code and execute it"""
shell = get_ipython()
reformat_text_before_cursor(
event.current_buffer, event.current_buffer.document, shell
)
event.current_buffer.validate_and_handle()
def reformat_text_before_cursor(buffer, document, shell):
text = buffer.delete_before_cursor(len(document.text[: document.cursor_position]))
try:
formatted_text = shell.reformat_handler(text)
buffer.insert_text(formatted_text)
except Exception as e:
buffer.insert_text(text)
def handle_return_or_newline_or_execute(event):
shell = get_ipython()
if getattr(shell, "handle_return", None):
return shell.handle_return(shell)(event)
else:
return newline_or_execute_outer(shell)(event)
def newline_or_execute_outer(shell):
def newline_or_execute(event):
"""When the user presses return, insert a newline or execute the code."""
b = event.current_buffer
d = b.document
if b.complete_state:
cc = b.complete_state.current_completion
if cc:
b.apply_completion(cc)
else:
b.cancel_completion()
return
# If there's only one line, treat it as if the cursor is at the end.
# See https://github.com/ipython/ipython/issues/10425
if d.line_count == 1:
check_text = d.text
else:
check_text = d.text[: d.cursor_position]
status, indent = shell.check_complete(check_text)
# if all we have after the cursor is whitespace: reformat current text
# before cursor
after_cursor = d.text[d.cursor_position :]
reformatted = False
if not after_cursor.strip():
reformat_text_before_cursor(b, d, shell)
reformatted = True
if not (
d.on_last_line
or d.cursor_position_row >= d.line_count - d.empty_line_count_at_the_end()
):
if shell.autoindent:
b.insert_text("\n" + indent)
else:
b.insert_text("\n")
return
if (status != "incomplete") and b.accept_handler:
if not reformatted:
reformat_text_before_cursor(b, d, shell)
b.validate_and_handle()
else:
if shell.autoindent:
b.insert_text("\n" + indent)
else:
b.insert_text("\n")
return newline_or_execute
def previous_history_or_previous_completion(event):
"""
Control-P in vi edit mode on readline is history next, unlike default prompt toolkit.
If completer is open this still select previous completion.
"""
event.current_buffer.auto_up()
def next_history_or_next_completion(event):
"""
Control-N in vi edit mode on readline is history previous, unlike default prompt toolkit.
If completer is open this still select next completion.
"""
event.current_buffer.auto_down()
def dismiss_completion(event):
"""Dismiss completion"""
b = event.current_buffer
if b.complete_state:
b.cancel_completion()
def reset_buffer(event):
"""Reset buffer"""
b = event.current_buffer
if b.complete_state:
b.cancel_completion()
else:
b.reset()
def reset_search_buffer(event):
"""Reset search buffer"""
if event.current_buffer.document.text:
event.current_buffer.reset()
else:
event.app.layout.focus(DEFAULT_BUFFER)
def suspend_to_bg(event):
"""Suspend to background"""
event.app.suspend_to_background()
def quit(event):
"""
Quit application with ``SIGQUIT`` if supported or ``sys.exit`` otherwise.
On platforms that support SIGQUIT, send SIGQUIT to the current process.
On other platforms, just exit the process with a message.
"""
sigquit = getattr(signal, "SIGQUIT", None)
if sigquit is not None:
os.kill(0, signal.SIGQUIT)
else:
sys.exit("Quit")
def indent_buffer(event):
"""Indent buffer"""
event.current_buffer.insert_text(" " * 4)
def newline_autoindent(event):
"""Insert a newline after the cursor indented appropriately.
Fancier version of former ``newline_with_copy_margin`` which should
compute the correct indentation of the inserted line. That is to say, indent
by 4 extra space after a function definition, class definition, context
manager... And dedent by 4 space after ``pass``, ``return``, ``raise ...``.
"""
shell = get_ipython()
inputsplitter = shell.input_transformer_manager
b = event.current_buffer
d = b.document
if b.complete_state:
b.cancel_completion()
text = d.text[: d.cursor_position] + "\n"
_, indent = inputsplitter.check_complete(text)
b.insert_text("\n" + (" " * (indent or 0)), move_cursor=False)
def open_input_in_editor(event):
"""Open code from input in external editor"""
event.app.current_buffer.open_in_editor()
if sys.platform == "win32":
from IPython.core.error import TryNext
from IPython.lib.clipboard import (
ClipboardEmpty,
tkinter_clipboard_get,
win32_clipboard_get,
)
@undoc
def win_paste(event):
try:
text = win32_clipboard_get()
except TryNext:
try:
text = tkinter_clipboard_get()
except (TryNext, ClipboardEmpty):
return
except ClipboardEmpty:
return
event.current_buffer.insert_text(text.replace("\t", " " * 4))
else:
@undoc
def win_paste(event):
"""Stub used on other platforms"""
pass
KEY_BINDINGS = [
Binding(
handle_return_or_newline_or_execute,
["enter"],
"default_buffer_focused & ~has_selection & insert_mode",
),
Binding(
reformat_and_execute,
["escape", "enter"],
"default_buffer_focused & ~has_selection & insert_mode & ebivim",
),
Binding(quit, ["c-\\"]),
Binding(
previous_history_or_previous_completion,
["c-p"],
"vi_insert_mode & default_buffer_focused",
),
Binding(
next_history_or_next_completion,
["c-n"],
"vi_insert_mode & default_buffer_focused",
),
Binding(dismiss_completion, ["c-g"], "default_buffer_focused & has_completions"),
Binding(reset_buffer, ["c-c"], "default_buffer_focused"),
Binding(reset_search_buffer, ["c-c"], "search_buffer_focused"),
Binding(suspend_to_bg, ["c-z"], "supports_suspend"),
Binding(
indent_buffer,
["tab"], # Ctrl+I == Tab
"default_buffer_focused"
" & ~has_selection"
" & insert_mode"
" & cursor_in_leading_ws",
),
Binding(newline_autoindent, ["c-o"], "default_buffer_focused & emacs_insert_mode"),
Binding(open_input_in_editor, ["f2"], "default_buffer_focused"),
*AUTO_MATCH_BINDINGS,
*AUTO_SUGGEST_BINDINGS,
Binding(
display_completions_like_readline,
["c-i"],
"readline_like_completions"
" & default_buffer_focused"
" & ~has_selection"
" & insert_mode"
" & ~cursor_in_leading_ws",
),
Binding(win_paste, ["c-v"], "default_buffer_focused & ~vi_mode & is_windows_os"),
*SIMPLE_CONTROL_BINDINGS,
*ALT_AND_COMOBO_CONTROL_BINDINGS,
]
UNASSIGNED_ALLOWED_COMMANDS = [
nc.beginning_of_buffer,
nc.end_of_buffer,
nc.end_of_line,
nc.forward_word,
nc.unix_line_discard,
]

View File

@@ -0,0 +1,105 @@
"""
Utilities function for keybinding with prompt toolkit.
This will be bound to specific key press and filter modes,
like whether we are in edit mode, and whether the completer is open.
"""
import re
from prompt_toolkit.key_binding import KeyPressEvent
def parenthesis(event: KeyPressEvent):
"""Auto-close parenthesis"""
event.current_buffer.insert_text("()")
event.current_buffer.cursor_left()
def brackets(event: KeyPressEvent):
"""Auto-close brackets"""
event.current_buffer.insert_text("[]")
event.current_buffer.cursor_left()
def braces(event: KeyPressEvent):
"""Auto-close braces"""
event.current_buffer.insert_text("{}")
event.current_buffer.cursor_left()
def double_quote(event: KeyPressEvent):
"""Auto-close double quotes"""
event.current_buffer.insert_text('""')
event.current_buffer.cursor_left()
def single_quote(event: KeyPressEvent):
"""Auto-close single quotes"""
event.current_buffer.insert_text("''")
event.current_buffer.cursor_left()
def docstring_double_quotes(event: KeyPressEvent):
"""Auto-close docstring (double quotes)"""
event.current_buffer.insert_text('""""')
event.current_buffer.cursor_left(3)
def docstring_single_quotes(event: KeyPressEvent):
"""Auto-close docstring (single quotes)"""
event.current_buffer.insert_text("''''")
event.current_buffer.cursor_left(3)
def raw_string_parenthesis(event: KeyPressEvent):
"""Auto-close parenthesis in raw strings"""
matches = re.match(
r".*(r|R)[\"'](-*)",
event.current_buffer.document.current_line_before_cursor,
)
dashes = matches.group(2) if matches else ""
event.current_buffer.insert_text("()" + dashes)
event.current_buffer.cursor_left(len(dashes) + 1)
def raw_string_bracket(event: KeyPressEvent):
"""Auto-close bracker in raw strings"""
matches = re.match(
r".*(r|R)[\"'](-*)",
event.current_buffer.document.current_line_before_cursor,
)
dashes = matches.group(2) if matches else ""
event.current_buffer.insert_text("[]" + dashes)
event.current_buffer.cursor_left(len(dashes) + 1)
def raw_string_braces(event: KeyPressEvent):
"""Auto-close braces in raw strings"""
matches = re.match(
r".*(r|R)[\"'](-*)",
event.current_buffer.document.current_line_before_cursor,
)
dashes = matches.group(2) if matches else ""
event.current_buffer.insert_text("{}" + dashes)
event.current_buffer.cursor_left(len(dashes) + 1)
def skip_over(event: KeyPressEvent):
"""Skip over automatically added parenthesis/quote.
(rather than adding another parenthesis/quote)"""
event.current_buffer.cursor_right()
def delete_pair(event: KeyPressEvent):
"""Delete auto-closed parenthesis"""
event.current_buffer.delete()
event.current_buffer.delete_before_cursor()
auto_match_parens = {"(": parenthesis, "[": brackets, "{": braces}
auto_match_parens_raw_string = {
"(": raw_string_parenthesis,
"[": raw_string_bracket,
"{": raw_string_braces,
}

View File

@@ -0,0 +1,648 @@
import re
import asyncio
import tokenize
from io import StringIO
from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
import warnings
import prompt_toolkit
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.key_binding.bindings import named_commands as nc
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest
from prompt_toolkit.document import Document
from prompt_toolkit.history import History
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.layout.processors import (
Processor,
Transformation,
TransformationInput,
)
from IPython.core.getipython import get_ipython
from IPython.utils.tokenutil import generate_tokens
from .filters import pass_through
try:
import jupyter_ai_magics
import jupyter_ai.completions.models as jai_models
except ModuleNotFoundError:
jai_models = None
def _get_query(document: Document):
return document.lines[document.cursor_position_row]
class AppendAutoSuggestionInAnyLine(Processor):
"""
Append the auto suggestion to lines other than the last (appending to the
last line is natively supported by the prompt toolkit).
This has a private `_debug` attribute that can be set to True to display
debug information as virtual suggestion on the end of any line. You can do
so with:
>>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
>>> AppendAutoSuggestionInAnyLine._debug = True
"""
_debug: ClassVar[bool] = False
def __init__(self, style: str = "class:auto-suggestion") -> None:
self.style = style
def apply_transformation(self, ti: TransformationInput) -> Transformation:
"""
Apply transformation to the line that is currently being edited.
This is a variation of the original implementation in prompt toolkit
that allows to not only append suggestions to any line, but also to show
multi-line suggestions.
As transformation are applied on a line-by-line basis; we need to trick
a bit, and elide any line that is after the line we are currently
editing, until we run out of completions. We cannot shift the existing
lines
There are multiple cases to handle:
The completions ends before the end of the buffer:
We can resume showing the normal line, and say that some code may
be hidden.
The completions ends at the end of the buffer
We can just say that some code may be hidden.
And separately:
The completions ends beyond the end of the buffer
We need to both say that some code may be hidden, and that some
lines are not shown.
"""
last_line_number = ti.document.line_count - 1
is_last_line = ti.lineno == last_line_number
noop = lambda text: Transformation(
fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
)
if ti.document.line_count == 1:
return noop("noop:oneline")
if ti.document.cursor_position_row == last_line_number and is_last_line:
# prompt toolkit already appends something; just leave it be
return noop("noop:last line and cursor")
# first everything before the current line is unchanged.
if ti.lineno < ti.document.cursor_position_row:
return noop("noop:before cursor")
buffer = ti.buffer_control.buffer
if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
return noop("noop:not eol")
delta = ti.lineno - ti.document.cursor_position_row
suggestions = buffer.suggestion.text.splitlines()
if len(suggestions) == 0:
return noop("noop: no suggestions")
suggestions_longer_than_buffer: bool = (
len(suggestions) + ti.document.cursor_position_row > ti.document.line_count
)
if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49):
if ti.lineno == ti.document.cursor_position_row:
return Transformation(
fragments=ti.fragments
+ [
(
"red",
"(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
)
]
)
else:
return Transformation(fragments=ti.fragments)
if delta == 0:
suggestion = suggestions[0]
return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
if is_last_line:
if delta < len(suggestions):
extra = f"; {len(suggestions) - delta} line(s) hidden"
suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
return Transformation([(self.style, suggestion)])
n_elided = len(suggestions)
for i in range(len(suggestions)):
ll = ti.get_line(last_line_number - i)
el = "".join(l[1] for l in ll).strip()
if el:
break
else:
n_elided -= 1
if n_elided:
return Transformation([(self.style, f"{n_elided} line(s) hidden")])
else:
return Transformation(
ti.get_line(last_line_number - len(suggestions) + 1)
+ ([(self.style, "shift-last-line")] if self._debug else [])
)
elif delta < len(suggestions):
suggestion = suggestions[delta]
return Transformation([(self.style, suggestion)])
else:
shift = ti.lineno - len(suggestions) + 1
return Transformation(ti.get_line(shift))
class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
"""
A subclass of AutoSuggestFromHistory that allow navigation to next/previous
suggestion from history. To do so it remembers the current position, but it
state need to carefully be cleared on the right events.
"""
skip_lines: int
_connected_apps: list[PromptSession]
# handle to the currently running llm task that appends suggestions to the
# current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or
# another request.
_llm_task: asyncio.Task | None = None
# This is the instance of the LLM provider from jupyter-ai to which we forward the request
# to generate inline completions.
_llm_provider: Any | None
def __init__(self):
super().__init__()
self.skip_lines = 0
self._connected_apps = []
self._llm_provider = None
def reset_history_position(self, _: Buffer):
self.skip_lines = 0
def disconnect(self) -> None:
self._cancel_running_llm_task()
for pt_app in self._connected_apps:
text_insert_event = pt_app.default_buffer.on_text_insert
text_insert_event.remove_handler(self.reset_history_position)
def connect(self, pt_app: PromptSession):
self._connected_apps.append(pt_app)
# note: `on_text_changed` could be used for a bit different behaviour
# on character deletion (i.e. resetting history position on backspace)
pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
def get_suggestion(
self, buffer: Buffer, document: Document
) -> Optional[Suggestion]:
text = _get_query(document)
if text.strip():
for suggestion, _ in self._find_next_match(
text, self.skip_lines, buffer.history
):
return Suggestion(suggestion)
return None
def _dismiss(self, buffer, *args, **kwargs) -> None:
self._cancel_running_llm_task()
buffer.suggestion = None
def _find_match(
self, text: str, skip_lines: float, history: History, previous: bool
) -> Generator[Tuple[str, float], None, None]:
"""
text : str
Text content to find a match for, the user cursor is most of the
time at the end of this text.
skip_lines : float
number of items to skip in the search, this is used to indicate how
far in the list the user has navigated by pressing up or down.
The float type is used as the base value is +inf
history : History
prompt_toolkit History instance to fetch previous entries from.
previous : bool
Direction of the search, whether we are looking previous match
(True), or next match (False).
Yields
------
Tuple with:
str:
current suggestion.
float:
will actually yield only ints, which is passed back via skip_lines,
which may be a +inf (float)
"""
line_number = -1
for string in reversed(list(history.get_strings())):
for line in reversed(string.splitlines()):
line_number += 1
if not previous and line_number < skip_lines:
continue
# do not return empty suggestions as these
# close the auto-suggestion overlay (and are useless)
if line.startswith(text) and len(line) > len(text):
yield line[len(text) :], line_number
if previous and line_number >= skip_lines:
return
def _find_next_match(
self, text: str, skip_lines: float, history: History
) -> Generator[Tuple[str, float], None, None]:
return self._find_match(text, skip_lines, history, previous=False)
def _find_previous_match(self, text: str, skip_lines: float, history: History):
return reversed(
list(self._find_match(text, skip_lines, history, previous=True))
)
def up(self, query: str, other_than: str, history: History) -> None:
self._cancel_running_llm_task()
for suggestion, line_number in self._find_next_match(
query, self.skip_lines, history
):
# if user has history ['very.a', 'very', 'very.b'] and typed 'very'
# we want to switch from 'very.b' to 'very.a' because a) if the
# suggestion equals current text, prompt-toolkit aborts suggesting
# b) user likely would not be interested in 'very' anyways (they
# already typed it).
if query + suggestion != other_than:
self.skip_lines = line_number
break
else:
# no matches found, cycle back to beginning
self.skip_lines = 0
def down(self, query: str, other_than: str, history: History) -> None:
self._cancel_running_llm_task()
for suggestion, line_number in self._find_previous_match(
query, self.skip_lines, history
):
if query + suggestion != other_than:
self.skip_lines = line_number
break
else:
# no matches found, cycle to end
for suggestion, line_number in self._find_previous_match(
query, float("Inf"), history
):
if query + suggestion != other_than:
self.skip_lines = line_number
break
def _cancel_running_llm_task(self) -> None:
"""
Try to cancell the currently running llm_task if exists, and set it to None.
"""
if self._llm_task is not None:
if self._llm_task.done():
self._llm_task = None
return
cancelled = self._llm_task.cancel()
if cancelled:
self._llm_task = None
if not cancelled:
warnings.warn(
"LLM task not cancelled, does your provider support cancellation?"
)
async def _trigger_llm(self, buffer) -> None:
"""
This will ask the current llm provider a suggestion for the current buffer.
If there is a currently running llm task, it will cancel it.
"""
# we likely want to store the current cursor position, and cancel if the cursor has moved.
if not self._llm_provider:
warnings.warn("No LLM provider found, cannot trigger LLM completions")
return
if jai_models is None:
warnings.warn(
"LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed"
)
self._cancel_running_llm_task()
async def error_catcher(buffer):
"""
This catches and log any errors, as otherwise this is just
lost in the void of the future running task.
"""
try:
await self._trigger_llm_core(buffer)
except Exception as e:
get_ipython().log.error("error")
raise
# here we need a cancellable task so we can't just await the error catched
self._llm_task = asyncio.create_task(error_catcher(buffer))
await self._llm_task
async def _trigger_llm_core(self, buffer: Buffer):
"""
This is the core of the current llm request.
Here we build a compatible `InlineCompletionRequest` and ask the llm
provider to stream it's response back to us iteratively setting it as
the suggestion on the current buffer.
Unlike with JupyterAi, as we do not have multiple cell, the cell number
is always set to `0`, note that we _could_ set it to a new number each
time and ignore threply from past numbers.
We set the prefix to the current cell content, but could also inset the
rest of the history or even just the non-fail history.
In the same way, we do not have cell id.
LLM provider may return multiple suggestion stream, but for the time
being we only support one.
Here we make the assumption that the provider will have
stream_inline_completions, I'm not sure it is the case for all
providers.
"""
request = jai_models.InlineCompletionRequest(
number=0,
prefix=buffer.document.text,
suffix="",
mime="text/x-python",
stream=True,
path=None,
language="python",
cell_id=None,
)
async for reply_and_chunks in self._llm_provider.stream_inline_completions(
request
):
if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
if len(reply_and_chunks.list.items) > 1:
raise ValueError(
"Terminal IPython cannot deal with multiple LLM suggestions at once"
)
buffer.suggestion = Suggestion(
reply_and_chunks.list.items[0].insertText
)
buffer.on_suggestion_set.fire()
elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
buffer.on_suggestion_set.fire()
return
_MIN_LINES = 5
async def llm_autosuggestion(event: KeyPressEvent):
"""
Ask the AutoSuggester from history to delegate to ask an LLM for completion
This will first make sure that the current buffer have _MIN_LINES (7)
available lines to insert the LLM completion
Provisional as of 8.32, may change without warnigns
"""
provider = get_ipython().auto_suggest
if not isinstance(provider, NavigableAutoSuggestFromHistory):
return
doc = event.current_buffer.document
lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
for _ in range(lines_to_insert):
event.current_buffer.insert_text("\n", move_cursor=False)
await provider._trigger_llm(event.current_buffer)
def accept_or_jump_to_end(event: KeyPressEvent):
"""Apply autosuggestion or jump to end of line."""
buffer = event.current_buffer
d = buffer.document
after_cursor = d.text[d.cursor_position :]
lines = after_cursor.split("\n")
end_of_current_line = lines[0].strip()
suggestion = buffer.suggestion
if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
buffer.insert_text(suggestion.text)
else:
nc.end_of_line(event)
def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent):
"""Accept autosuggestion or jump to end of line.
.. deprecated:: 8.12
Use `accept_or_jump_to_end` instead.
"""
return accept_or_jump_to_end(event)
def accept(event: KeyPressEvent):
"""Accept autosuggestion"""
buffer = event.current_buffer
suggestion = buffer.suggestion
if suggestion:
buffer.insert_text(suggestion.text)
else:
nc.forward_char(event)
def discard(event: KeyPressEvent):
"""Discard autosuggestion"""
buffer = event.current_buffer
buffer.suggestion = None
def accept_word(event: KeyPressEvent):
"""Fill partial autosuggestion by word"""
buffer = event.current_buffer
suggestion = buffer.suggestion
if suggestion:
t = re.split(r"(\S+\s+)", suggestion.text)
buffer.insert_text(next((x for x in t if x), ""))
else:
nc.forward_word(event)
def accept_character(event: KeyPressEvent):
"""Fill partial autosuggestion by character"""
b = event.current_buffer
suggestion = b.suggestion
if suggestion and suggestion.text:
b.insert_text(suggestion.text[0])
def accept_and_keep_cursor(event: KeyPressEvent):
"""Accept autosuggestion and keep cursor in place"""
buffer = event.current_buffer
old_position = buffer.cursor_position
suggestion = buffer.suggestion
if suggestion:
buffer.insert_text(suggestion.text)
buffer.cursor_position = old_position
def accept_and_move_cursor_left(event: KeyPressEvent):
"""Accept autosuggestion and move cursor left in place"""
accept_and_keep_cursor(event)
nc.backward_char(event)
def _update_hint(buffer: Buffer):
if buffer.auto_suggest:
suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
buffer.suggestion = suggestion
def backspace_and_resume_hint(event: KeyPressEvent):
"""Resume autosuggestions after deleting last character"""
nc.backward_delete_char(event)
_update_hint(event.current_buffer)
def resume_hinting(event: KeyPressEvent):
"""Resume autosuggestions"""
pass_through.reply(event)
# Order matters: if update happened first and event reply second, the
# suggestion would be auto-accepted if both actions are bound to same key.
_update_hint(event.current_buffer)
def up_and_update_hint(event: KeyPressEvent):
"""Go up and update hint"""
current_buffer = event.current_buffer
current_buffer.auto_up(count=event.arg)
_update_hint(current_buffer)
def down_and_update_hint(event: KeyPressEvent):
"""Go down and update hint"""
current_buffer = event.current_buffer
current_buffer.auto_down(count=event.arg)
_update_hint(current_buffer)
def accept_token(event: KeyPressEvent):
"""Fill partial autosuggestion by token"""
b = event.current_buffer
suggestion = b.suggestion
if suggestion:
prefix = _get_query(b.document)
text = prefix + suggestion.text
tokens: List[Optional[str]] = [None, None, None]
substrings = [""]
i = 0
for token in generate_tokens(StringIO(text).readline):
if token.type == tokenize.NEWLINE:
index = len(text)
else:
index = text.index(token[1], len(substrings[-1]))
substrings.append(text[:index])
tokenized_so_far = substrings[-1]
if tokenized_so_far.startswith(prefix):
if i == 0 and len(tokenized_so_far) > len(prefix):
tokens[0] = tokenized_so_far[len(prefix) :]
substrings.append(tokenized_so_far)
i += 1
tokens[i] = token[1]
if i == 2:
break
i += 1
if tokens[0]:
to_insert: str
insert_text = substrings[-2]
if tokens[1] and len(tokens[1]) == 1:
insert_text = substrings[-1]
to_insert = insert_text[len(prefix) :]
b.insert_text(to_insert)
return
nc.forward_word(event)
Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
def _swap_autosuggestion(
buffer: Buffer,
provider: NavigableAutoSuggestFromHistory,
direction_method: Callable,
):
"""
We skip most recent history entry (in either direction) if it equals the
current autosuggestion because if user cycles when auto-suggestion is shown
they most likely want something else than what was suggested (otherwise
they would have accepted the suggestion).
"""
suggestion = buffer.suggestion
if not suggestion:
return
query = _get_query(buffer.document)
current = query + suggestion.text
direction_method(query=query, other_than=current, history=buffer.history)
new_suggestion = provider.get_suggestion(buffer, buffer.document)
buffer.suggestion = new_suggestion
def swap_autosuggestion_up(event: KeyPressEvent):
"""Get next autosuggestion from history."""
shell = get_ipython()
provider = shell.auto_suggest
if not isinstance(provider, NavigableAutoSuggestFromHistory):
return
return _swap_autosuggestion(
buffer=event.current_buffer, provider=provider, direction_method=provider.up
)
def swap_autosuggestion_down(event: KeyPressEvent):
"""Get previous autosuggestion from history."""
shell = get_ipython()
provider = shell.auto_suggest
if not isinstance(provider, NavigableAutoSuggestFromHistory):
return
return _swap_autosuggestion(
buffer=event.current_buffer,
provider=provider,
direction_method=provider.down,
)
def __getattr__(key):
if key == "accept_in_vi_insert_mode":
warnings.warn(
"`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and "
"renamed to `accept_or_jump_to_end`. Please update your configuration "
"accordingly",
DeprecationWarning,
stacklevel=2,
)
return _deprected_accept_in_vi_insert_mode
raise AttributeError

View File

@@ -0,0 +1,322 @@
"""
Filters restricting scope of IPython Terminal shortcuts.
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import ast
import re
import signal
import sys
from typing import Callable, Dict, Union
from prompt_toolkit.application.current import get_app
from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.filters import Condition, Filter, emacs_insert_mode, has_completions
from prompt_toolkit.filters import has_focus as has_focus_impl
from prompt_toolkit.filters import (
Always,
Never,
has_selection,
has_suggestion,
vi_insert_mode,
vi_mode,
)
from prompt_toolkit.layout.layout import FocusableElement
from IPython.core.getipython import get_ipython
from IPython.core.guarded_eval import _find_dunder, BINARY_OP_DUNDERS, UNARY_OP_DUNDERS
from IPython.terminal.shortcuts import auto_suggest
from IPython.utils.decorators import undoc
@undoc
@Condition
def cursor_in_leading_ws():
before = get_app().current_buffer.document.current_line_before_cursor
return (not before) or before.isspace()
def has_focus(value: FocusableElement):
"""Wrapper around has_focus adding a nice `__name__` to tester function"""
tester = has_focus_impl(value).func
tester.__name__ = f"is_focused({value})"
return Condition(tester)
@undoc
@Condition
def has_line_below() -> bool:
document = get_app().current_buffer.document
return document.cursor_position_row < len(document.lines) - 1
@undoc
@Condition
def is_cursor_at_the_end_of_line() -> bool:
document = get_app().current_buffer.document
return document.is_cursor_at_the_end_of_line
@undoc
@Condition
def has_line_above() -> bool:
document = get_app().current_buffer.document
return document.cursor_position_row != 0
@Condition
def ebivim():
shell = get_ipython()
return shell.emacs_bindings_in_vi_insert_mode
@Condition
def supports_suspend():
return hasattr(signal, "SIGTSTP")
@Condition
def auto_match():
shell = get_ipython()
return shell.auto_match
def all_quotes_paired(quote, buf):
paired = True
i = 0
while i < len(buf):
c = buf[i]
if c == quote:
paired = not paired
elif c == "\\":
i += 1
i += 1
return paired
_preceding_text_cache: Dict[Union[str, Callable], Condition] = {}
_following_text_cache: Dict[Union[str, Callable], Condition] = {}
def preceding_text(pattern: Union[str, Callable]):
if pattern in _preceding_text_cache:
return _preceding_text_cache[pattern]
if callable(pattern):
def _preceding_text():
app = get_app()
before_cursor = app.current_buffer.document.current_line_before_cursor
# mypy can't infer if(callable): https://github.com/python/mypy/issues/3603
return bool(pattern(before_cursor)) # type: ignore[operator]
else:
m = re.compile(pattern)
def _preceding_text():
app = get_app()
before_cursor = app.current_buffer.document.current_line_before_cursor
return bool(m.match(before_cursor))
_preceding_text.__name__ = f"preceding_text({pattern!r})"
condition = Condition(_preceding_text)
_preceding_text_cache[pattern] = condition
return condition
def following_text(pattern):
try:
return _following_text_cache[pattern]
except KeyError:
pass
m = re.compile(pattern)
def _following_text():
app = get_app()
return bool(m.match(app.current_buffer.document.current_line_after_cursor))
_following_text.__name__ = f"following_text({pattern!r})"
condition = Condition(_following_text)
_following_text_cache[pattern] = condition
return condition
@Condition
def not_inside_unclosed_string():
app = get_app()
s = app.current_buffer.document.text_before_cursor
# remove escaped quotes
s = s.replace('\\"', "").replace("\\'", "")
# remove triple-quoted string literals
s = re.sub(r"(?:\"\"\"[\s\S]*\"\"\"|'''[\s\S]*''')", "", s)
# remove single-quoted string literals
s = re.sub(r"""(?:"[^"]*["\n]|'[^']*['\n])""", "", s)
return not ('"' in s or "'" in s)
@Condition
def navigable_suggestions():
shell = get_ipython()
return isinstance(shell.auto_suggest, auto_suggest.NavigableAutoSuggestFromHistory)
@Condition
def readline_like_completions():
shell = get_ipython()
return shell.display_completions == "readlinelike"
@Condition
def is_windows_os():
return sys.platform == "win32"
class PassThrough(Filter):
"""A filter allowing to implement pass-through behaviour of keybindings.
Prompt toolkit key processor dispatches only one event per binding match,
which means that adding a new shortcut will suppress the old shortcut
if the keybindings are the same (unless one is filtered out).
To stop a shortcut binding from suppressing other shortcuts:
- add the `pass_through` filter to list of filter, and
- call `pass_through.reply(event)` in the shortcut handler.
"""
def __init__(self):
self._is_replying = False
def reply(self, event: KeyPressEvent):
self._is_replying = True
try:
event.key_processor.reset()
event.key_processor.feed_multiple(event.key_sequence)
event.key_processor.process_keys()
finally:
self._is_replying = False
def __call__(self):
return not self._is_replying
pass_through = PassThrough()
# these one is callable and re-used multiple times hence needs to be
# only defined once beforehand so that transforming back to human-readable
# names works well in the documentation.
default_buffer_focused = has_focus(DEFAULT_BUFFER)
KEYBINDING_FILTERS = {
"always": Always(),
# never is used for exposing commands which have no default keybindings
"never": Never(),
"has_line_below": has_line_below,
"has_line_above": has_line_above,
"is_cursor_at_the_end_of_line": is_cursor_at_the_end_of_line,
"has_selection": has_selection,
"has_suggestion": has_suggestion,
"vi_mode": vi_mode,
"vi_insert_mode": vi_insert_mode,
"emacs_insert_mode": emacs_insert_mode,
# https://github.com/ipython/ipython/pull/12603 argued for inclusion of
# emacs key bindings with a configurable `emacs_bindings_in_vi_insert_mode`
# toggle; when the toggle is on user can access keybindigns like `ctrl + e`
# in vi insert mode. Because some of the emacs bindings involve `escape`
# followed by another key, e.g. `escape` followed by `f`, prompt-toolkit
# needs to wait to see if there will be another character typed in before
# executing pure `escape` keybinding; in vi insert mode `escape` switches to
# command mode which is common and performance critical action for vi users.
# To avoid the delay users employ a workaround:
# https://github.com/ipython/ipython/issues/13443#issuecomment-1032753703
# which involves switching `emacs_bindings_in_vi_insert_mode` off.
#
# For the workaround to work:
# 1) end users need to toggle `emacs_bindings_in_vi_insert_mode` off
# 2) all keybindings which would involve `escape` need to respect that
# toggle by including either:
# - `vi_insert_mode & ebivim` for actions which have emacs keybindings
# predefined upstream in prompt-toolkit, or
# - `emacs_like_insert_mode` for actions which do not have existing
# emacs keybindings predefined upstream (or need overriding of the
# upstream bindings to modify behaviour), defined below.
"emacs_like_insert_mode": (vi_insert_mode & ebivim) | emacs_insert_mode,
"has_completions": has_completions,
"insert_mode": vi_insert_mode | emacs_insert_mode,
"default_buffer_focused": default_buffer_focused,
"search_buffer_focused": has_focus(SEARCH_BUFFER),
# `ebivim` stands for emacs bindings in vi insert mode
"ebivim": ebivim,
"supports_suspend": supports_suspend,
"is_windows_os": is_windows_os,
"auto_match": auto_match,
"focused_insert": (vi_insert_mode | emacs_insert_mode) & default_buffer_focused,
"not_inside_unclosed_string": not_inside_unclosed_string,
"readline_like_completions": readline_like_completions,
"preceded_by_paired_double_quotes": preceding_text(
lambda line: all_quotes_paired('"', line)
),
"preceded_by_paired_single_quotes": preceding_text(
lambda line: all_quotes_paired("'", line)
),
"preceded_by_raw_str_prefix": preceding_text(r".*(r|R)[\"'](-*)$"),
"preceded_by_two_double_quotes": preceding_text(r'^.*""$'),
"preceded_by_two_single_quotes": preceding_text(r"^.*''$"),
"followed_by_closing_paren_or_end": following_text(r"[,)}\]]|$"),
"preceded_by_opening_round_paren": preceding_text(r".*\($"),
"preceded_by_opening_bracket": preceding_text(r".*\[$"),
"preceded_by_opening_brace": preceding_text(r".*\{$"),
"preceded_by_double_quote": preceding_text('.*"$'),
"preceded_by_single_quote": preceding_text(r".*'$"),
"followed_by_closing_round_paren": following_text(r"^\)"),
"followed_by_closing_bracket": following_text(r"^\]"),
"followed_by_closing_brace": following_text(r"^\}"),
"followed_by_double_quote": following_text('^"'),
"followed_by_single_quote": following_text("^'"),
"navigable_suggestions": navigable_suggestions,
"cursor_in_leading_ws": cursor_in_leading_ws,
"pass_through": pass_through,
}
def eval_node(node: Union[ast.AST, None]):
if node is None:
return None
if isinstance(node, ast.Expression):
return eval_node(node.body)
if isinstance(node, ast.BinOp):
left = eval_node(node.left)
right = eval_node(node.right)
dunders = _find_dunder(node.op, BINARY_OP_DUNDERS)
if dunders:
return getattr(left, dunders[0])(right)
raise ValueError(f"Unknown binary operation: {node.op}")
if isinstance(node, ast.UnaryOp):
value = eval_node(node.operand)
dunders = _find_dunder(node.op, UNARY_OP_DUNDERS)
if dunders:
return getattr(value, dunders[0])()
raise ValueError(f"Unknown unary operation: {node.op}")
if isinstance(node, ast.Name):
if node.id in KEYBINDING_FILTERS:
return KEYBINDING_FILTERS[node.id]
else:
sep = "\n - "
known_filters = sep.join(sorted(KEYBINDING_FILTERS))
raise NameError(
f"{node.id} is not a known shortcut filter."
f" Known filters are: {sep}{known_filters}."
)
raise ValueError("Unhandled node", ast.dump(node))
def filter_from_string(code: str):
expression = ast.parse(code, mode="eval")
return eval_node(expression)
__all__ = ["KEYBINDING_FILTERS", "filter_from_string"]