forked from LayerLens/stratix-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_helpers.py
More file actions
189 lines (155 loc) · 6.12 KB
/
_helpers.py
File metadata and controls
189 lines (155 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Shared helpers for LayerLens SDK samples.
Provides utility functions used across multiple samples to keep
individual sample files focused on demonstrating SDK features.
"""
from __future__ import annotations
import os
import json
import time
import logging
import tempfile
from typing import Any, List, Optional
from layerlens import Stratix
logger = logging.getLogger(__name__)
def upload_trace_dict(
client: Stratix,
*,
input_text: str,
output_text: str,
metadata: Optional[dict[str, Any]] = None,
) -> Any:
"""Upload a single trace from in-memory data.
Writes the trace to a temporary JSONL file and uploads via the SDK's
``client.traces.upload()`` method.
Args:
client: An initialized :class:`Stratix` client.
input_text: The input/prompt text for the trace.
output_text: The output/response text for the trace.
metadata: Optional metadata dict attached to the trace.
Returns:
A :class:`CreateTracesResponse` with ``trace_ids``.
"""
trace_data: dict[str, Any] = {
"input": [{"role": "user", "content": input_text}],
"output": output_text,
}
if metadata:
trace_data["metadata"] = metadata
fd, path = tempfile.mkstemp(suffix=".jsonl")
try:
with os.fdopen(fd, "w") as f:
f.write(json.dumps(trace_data) + "\n")
result = client.traces.upload(path)
finally:
if os.path.exists(path):
os.unlink(path)
return result
def get_default_model_id(client: Stratix) -> str:
"""Get a model ID suitable for judge creation.
Checks project models first, then falls back to the public catalog.
Caches the result for the lifetime of the process.
Args:
client: An initialized :class:`Stratix` client.
Returns:
A model ID string suitable for passing to ``judges.create(model_id=...)``.
Raises:
RuntimeError: If no models are available in the project or public catalog.
"""
# Check cache
cached = getattr(get_default_model_id, "_cached_id", None)
if cached:
return cached
# Use public models (required for judge creation)
try:
public_resp = client.public.models.get()
if public_resp and hasattr(public_resp, "models") and public_resp.models:
get_default_model_id._cached_id = public_resp.models[0].id # type: ignore[attr-defined]
return public_resp.models[0].id
except Exception:
pass
# Fall back to project models
try:
models = client.models.get()
if models:
get_default_model_id._cached_id = models[0].id # type: ignore[attr-defined]
return models[0].id
except Exception:
pass
raise RuntimeError("No models available. Add a model to your project or check API connectivity.")
def create_judge(
client: Stratix,
*,
name: str,
evaluation_goal: str,
model_id: Optional[str] = None,
) -> Any:
"""Create a judge, automatically resolving model_id if not provided.
Args:
client: An initialized :class:`Stratix` client.
name: Judge display name.
evaluation_goal: What the judge evaluates (min 10 characters).
model_id: Explicit model ID. If ``None``, resolves via :func:`get_default_model_id`.
Returns:
A :class:`Judge` object.
"""
if model_id is None:
model_id = get_default_model_id(client)
try:
return client.judges.create(name=name, evaluation_goal=evaluation_goal, model_id=model_id)
except Exception as exc:
# Handle 409 Conflict (judge name already exists) by finding and returning the existing judge
if "already exists" in str(exc) or "409" in str(exc):
logger.info("Judge '%s' already exists, reusing.", name)
resp = client.judges.get_many()
if resp and resp.judges:
for j in resp.judges:
if j.name == name:
return j
raise
def poll_evaluation_results(
client: Stratix,
evaluation_id: str,
*,
max_attempts: int = 60,
initial_delay: float = 2.0,
max_delay: float = 10.0,
backoff_factor: float = 1.3,
) -> Optional[List[Any]]:
"""Poll for trace evaluation results with exponential backoff.
Trace evaluations are **asynchronous**. When ``trace_evaluations.create()``
returns, the evaluation has been accepted but execution has not yet started.
The actual LLM judge execution takes a variable amount of time (typically
5-60 seconds depending on model and trace complexity). During this window:
- ``get_results()`` may raise a 404 ``NotFoundError`` (results row not
yet written to the database).
- ``get_results()`` may return an empty ``results=[]`` list (row exists
but execution is still in progress).
Both cases are normal and expected. This helper retries with exponential
backoff until a non-empty result list appears or the attempt budget is
exhausted.
Args:
client: An initialized :class:`Stratix` client.
evaluation_id: The trace evaluation ID to poll.
max_attempts: Maximum number of poll attempts (default 60, ~3-4 min total).
initial_delay: Initial delay in seconds between polls.
max_delay: Maximum delay cap in seconds.
backoff_factor: Multiplier applied to delay each iteration.
Returns:
A list of :class:`TraceEvaluationResult` objects, or ``None``
if results were not available within the polling window.
"""
delay = initial_delay
for attempt in range(1, max_attempts + 1):
try:
resp = client.trace_evaluations.get_results(evaluation_id)
if resp and resp.score is not None:
return [resp]
# None or missing score -- evaluation accepted but execution still in progress
except Exception:
# 404 NotFoundError is expected while the results row hasn't been
# created yet. Other transient errors (429, 502) are also retryable.
pass
if attempt < max_attempts:
time.sleep(delay)
delay = min(delay * backoff_factor, max_delay)
return None