Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions gum/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def parse_args():
parser.add_argument('--limit', '-l', type=int, help='Limit the number of results', default=10)
parser.add_argument('--model', '-m', type=str, help='Model to use')
parser.add_argument('--reset-cache', action='store_true', help='Reset the GUM cache and exit') # Add this line
parser.add_argument('--list-apps', action='store_true', help='List currently visible application names and exit')

# Batching configuration arguments
parser.add_argument('--min-batch-size', type=int, help='Minimum number of observations to trigger batch processing')
Expand Down Expand Up @@ -66,6 +67,17 @@ async def main():
min_batch_size = args.min_batch_size or int(os.getenv('MIN_BATCH_SIZE', '5'))
max_batch_size = args.max_batch_size or int(os.getenv('MAX_BATCH_SIZE', '15'))

if getattr(args, 'list_apps', False):
screen = Screen(model)
windows = screen.capture.get_window_list()
# Use a set to get unique owner names and filter out empty strings
apps = sorted(list(set(w['owner_name'] for w in windows if w.get('owner_name'))))
print("\nVisible applications:")
for app in apps:
print(f" - {app}")
print("-" * 20)
return

# you need one of: user_name for listening mode, --query, or --recent
if user_name is None and args.query is None and not getattr(args, 'recent', False):
print("Please provide a user name (-u), a query (-q), or use --recent to list latest propositions")
Expand Down
7 changes: 5 additions & 2 deletions gum/observers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from .observer import Observer
from .screen import Screen
from .calendar import Calendar
# TODO: Calendar observer disabled due to ics/tatsu incompatibility with Python 3.10+
# See: tatsu 4.4.0 uses `from collections import Mapping` (removed in 3.10)
# Fix: upgrade ics to >=0.8 and update calendar.py for the new API.
# from .calendar import Calendar

__all__ = ["Observer", "Screen", "Calendar"]
__all__ = ["Observer", "Screen"]
56 changes: 56 additions & 0 deletions gum/observers/_capture_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Optional

class CaptureBase(ABC):
"""
Abstract base class for platform-specific screen capture and window management.
"""

@abstractmethod
def get_monitor_geometries(self) -> List[Dict[str, int]]:
"""
Returns a list of dictionaries containing geometry for all active monitors.

Expected keys: 'left', 'top', 'width', 'height'.
Coordinates should be in the OS's global coordinate system.
"""
pass

@abstractmethod
def is_any_app_visible(self, app_names: List[str]) -> bool:
"""
Checks if any application in the provided list has at least one
visible, non-minimized window on any screen.

Args:
app_names: A list of application names/titles to check for.

Returns:
True if at least one matching application window is visible.
"""
pass

@abstractmethod
def get_monitor_at_point(self, x: float, y: float) -> Optional[Dict[str, int]]:
"""
Returns the geometry dictionary of the monitor containing the given global coordinates.

Args:
x: The horizontal global coordinate.
y: The vertical global coordinate.

Returns:
A dictionary with 'left', 'top', 'width', 'height' keys, or None if the point
is off-screen.
"""
pass

@abstractmethod
def get_window_list(self) -> List[Dict]:
"""
Returns a raw list of metadata for all currently onscreen windows.

Used primarily for debugging and advanced filtering.
Expected keys: 'owner_name', 'title', 'bounds', 'is_visible'.
"""
pass
121 changes: 121 additions & 0 deletions gum/observers/_capture_mac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from __future__ import annotations
from typing import List, Dict, Optional, Iterable

import Quartz
from shapely.geometry import box
from shapely.ops import unary_union

from ._capture_base import CaptureBase

class CaptureMac(CaptureBase):
"""
macOS-specific screen capture and window management using Quartz.
"""

def _get_global_bounds(self) -> tuple[float, float, float, float]:
"""Return a bounding box enclosing **all** physical displays."""
err, ids, cnt = Quartz.CGGetActiveDisplayList(16, None, None)
if err != Quartz.kCGErrorSuccess:
raise OSError(f"CGGetActiveDisplayList failed: {err}")

min_x = min_y = float("inf")
max_x = max_y = -float("inf")
for did in ids[:cnt]:
r = Quartz.CGDisplayBounds(did)
x0, y0 = r.origin.x, r.origin.y
x1, y1 = x0 + r.size.width, y0 + r.size.height
min_x, min_y = min(min_x, x0), min(min_y, y0)
max_x, max_y = max(max_x, x1), max(max_y, y1)
return min_x, min_y, max_x, max_y

def get_monitor_geometries(self) -> List[Dict[str, int]]:
"""Returns a list of monitor geometries using Quartz."""
err, ids, cnt = Quartz.CGGetActiveDisplayList(16, None, None)
if err != Quartz.kCGErrorSuccess:
raise OSError(f"CGGetActiveDisplayList failed: {err}")

monitors = []
for did in ids[:cnt]:
r = Quartz.CGDisplayBounds(did)
monitors.append({
"left": int(r.origin.x),
"top": int(r.origin.y),
"width": int(r.size.width),
"height": int(r.size.height)
})
return monitors

def get_window_list(self) -> List[Dict]:
"""List onscreen windows using Quartz."""
opts = (
Quartz.kCGWindowListOptionOnScreenOnly
| Quartz.kCGWindowListOptionIncludingWindow
)
wins = Quartz.CGWindowListCopyWindowInfo(opts, Quartz.kCGNullWindowID)

result = []
for info in wins:
bounds = info.get("kCGWindowBounds", {})
result.append({
"owner_name": info.get("kCGWindowOwnerName", ""),
"title": info.get("kCGWindowName", ""),
"bounds": {
"X": bounds.get("X", 0),
"Y": bounds.get("Y", 0),
"Width": bounds.get("Width", 0),
"Height": bounds.get("Height", 0),
},
"is_visible": True # Since we use kCGWindowListOptionOnScreenOnly
})
return result

def is_any_app_visible(self, app_names: List[str]) -> bool:
"""Determines app visibility using Quartz and Shapely for area calculation."""
if not app_names:
return False

_, _, _, gmax_y = self._get_global_bounds()
targets = set(app_names)

opts = (
Quartz.kCGWindowListOptionOnScreenOnly
| Quartz.kCGWindowListOptionIncludingWindow
)
wins = Quartz.CGWindowListCopyWindowInfo(opts, Quartz.kCGNullWindowID)

occupied = None
for info in wins:
owner = info.get("kCGWindowOwnerName", "")
if owner in ("Dock", "WindowServer", "Window Server"):
continue

bounds = info.get("kCGWindowBounds", {})
x, y, w, h = (
bounds.get("X", 0),
bounds.get("Y", 0),
bounds.get("Width", 0),
bounds.get("Height", 0),
)
if w <= 0 or h <= 0:
continue

inv_y = gmax_y - y - h
poly = box(x, inv_y, x + w, inv_y + h)
if poly.is_empty:
continue

visible = poly if occupied is None else poly.difference(occupied)
if not visible.is_empty:
if owner in targets:
return True # Found a visible window for one of the target apps
occupied = poly if occupied is None else unary_union([occupied, poly])

return False

def get_monitor_at_point(self, x: float, y: float) -> Optional[Dict[str, int]]:
"""Finds the monitor geometry containing the point using Quartz bounds."""
monitors = self.get_monitor_geometries()
for m in monitors:
if m["left"] <= x < m["left"] + m["width"] and m["top"] <= y < m["top"] + m["height"]:
return m
return None
Loading