-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
471 lines (408 loc) · 17.7 KB
/
tools.py
File metadata and controls
471 lines (408 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
"""Hermes tool handlers for nostrkey (v0.2).
Seven tools share a module-level "current identity" so a Hermes session
can generate or load once and then sign / save without re-supplying keys
on every call. Restart the agent to clear it.
v0.2 hard fix: nsec and seed_phrase are NEVER returned by
``nostrkey_generate`` or ``nostrkey_load``. They are retrievable only
via the gated ``nostrkey_export_nsec`` / ``nostrkey_export_seed_phrase``
tools, which require ``NOSTRKEY_REVEAL_CODE`` env var + matching
``confirmation_code`` arg + ``purpose`` arg ≥20 chars. Failed and
successful exports both append to ``$HERMES_HOME/.nostrkey/reveal_audit.log``.
"""
from __future__ import annotations
import datetime
import json
import os
import secrets
from pathlib import Path
from typing import Any, Optional
from nostrkey import Identity
from tools.registry import tool_error, tool_result
_current: Optional[Identity] = None
_current_seed_phrase: Optional[str] = None # set only when generate(with_seed_phrase=True)
def _hermes_home() -> Path:
return Path(os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")))
def _default_identity_path() -> Path:
return _hermes_home() / ".nostrkey" / "identity.nostrkey"
def _audit_log_path() -> Path:
return _hermes_home() / ".nostrkey" / "reveal_audit.log"
def _audit(action: str, outcome: str, purpose: str = "") -> None:
"""Append a single line to the reveal audit log. Best-effort —
never raises (audit failure must not block tool dispatch)."""
try:
path = _audit_log_path()
path.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now(datetime.timezone.utc).isoformat()
purpose_snip = (purpose or "").replace("\n", " ")[:200]
line = f"{ts}\t{action}\t{outcome}\t{purpose_snip}\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
except Exception:
pass
def _set_current(identity: Identity) -> None:
global _current
_current = identity
def _require_current() -> Identity:
if _current is None:
raise RuntimeError(
"No identity loaded. Call nostrkey_generate or nostrkey_load first."
)
return _current
def _check_reveal_code(supplied: str) -> tuple[bool, str | None]:
"""Constant-time compare against NOSTRKEY_REVEAL_CODE env. Returns (ok, error_msg)."""
expected = os.environ.get("NOSTRKEY_REVEAL_CODE", "").strip()
if not expected:
return False, (
"NOSTRKEY_REVEAL_CODE env var is not set on this Hermes instance. "
"The operator must set it (in $HERMES_HOME/.env or the shell that "
"launched Hermes) before any nsec or seed phrase can be exported. "
"This is intentional: the env var is the operator's proof-of-presence."
)
supplied = (supplied or "").strip()
if not supplied:
return False, "confirmation_code is required and must be a non-empty string."
if not secrets.compare_digest(supplied.encode("utf-8"), expected.encode("utf-8")):
return False, "confirmation_code does not match NOSTRKEY_REVEAL_CODE — refusing export."
return True, None
# -----------------------------
# nostrkey_generate
# -----------------------------
NOSTRKEY_GENERATE_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_generate",
"description": (
"Generate a fresh Nostr keypair for this agent and make it the "
"current identity. Returns ONLY the npub. The nsec (and seed "
"phrase, if requested) are held in module memory — never returned "
"by this tool. Retrieve them via the gated nostrkey_export_nsec "
"or nostrkey_export_seed_phrase tools, which require "
"NOSTRKEY_REVEAL_CODE env var + a matching confirmation_code "
"argument + a purpose. Encrypt to disk via nostrkey_save."
),
"parameters": {
"type": "object",
"properties": {
"with_seed_phrase": {
"type": "boolean",
"description": "Also generate a 12-word BIP-39 seed phrase (held in memory; retrieve via nostrkey_export_seed_phrase). Default false.",
"default": False,
},
},
"required": [],
},
},
}
def handle_nostrkey_generate(args: dict[str, Any], **kw) -> str:
global _current_seed_phrase
try:
with_seed = bool(args.get("with_seed_phrase", False))
if with_seed:
identity, phrase = Identity.generate_with_seed()
_set_current(identity)
_current_seed_phrase = phrase
return tool_result({
"npub": identity.npub,
"generated": True,
"seed_phrase_available": True,
"next_steps": (
"The nsec and 12-word seed phrase are in working memory. "
"To persist: call nostrkey_save with a strong passphrase. "
"To retrieve the nsec or seed phrase for one-time export "
"(e.g. paper backup, signer import): set NOSTRKEY_REVEAL_CODE "
"in the operator's env, then call nostrkey_export_nsec / "
"nostrkey_export_seed_phrase with confirmation_code + purpose."
),
})
identity = Identity.generate()
_set_current(identity)
_current_seed_phrase = None
return tool_result({
"npub": identity.npub,
"generated": True,
"next_steps": (
"The nsec is in working memory. To persist: nostrkey_save with a "
"strong passphrase. To retrieve the nsec for export: set "
"NOSTRKEY_REVEAL_CODE in the operator's env, then call "
"nostrkey_export_nsec with confirmation_code + purpose."
),
})
except Exception as e:
return tool_error(f"nostrkey_generate failed: {type(e).__name__}: {e}")
# -----------------------------
# nostrkey_export_nsec (gated)
# -----------------------------
NOSTRKEY_EXPORT_NSEC_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_export_nsec",
"description": (
"GATED retrieval of the current identity's nsec. Requires the "
"operator to have set NOSTRKEY_REVEAL_CODE in the Hermes env, AND "
"to provide it as confirmation_code, AND to articulate why the "
"unmasked nsec is needed via the purpose arg (≥20 chars). "
"Acceptable purposes: importing into another signer, paper backup, "
"disaster recovery. Refuse curiosity / 'show me' / 'test'. "
"Successful exports and failed attempts are both logged to "
"$HERMES_HOME/.nostrkey/reveal_audit.log. After the nsec is in "
"your context, follow the post-reveal directive in the response: "
"display once, then declare it wiped from your working memory and "
"warn the operator that the chat log itself persists secrets for "
"the session lifetime."
),
"parameters": {
"type": "object",
"properties": {
"confirmation_code": {
"type": "string",
"description": "Must match the NOSTRKEY_REVEAL_CODE env var on this Hermes instance. Constant-time compared.",
},
"purpose": {
"type": "string",
"description": "Why the unmasked nsec is needed right now. Minimum 20 characters. Logged to the audit file.",
},
},
"required": ["confirmation_code", "purpose"],
},
},
}
def handle_nostrkey_export_nsec(args: dict[str, Any], **kw) -> str:
try:
identity = _require_current()
except RuntimeError as e:
_audit("export_nsec", "no_identity_loaded")
return tool_error(str(e))
ok, err = _check_reveal_code(args.get("confirmation_code", ""))
if not ok:
_audit("export_nsec", "code_mismatch_or_missing", args.get("purpose", ""))
return tool_error(err or "confirmation_code check failed")
purpose = (args.get("purpose") or "").strip()
if len(purpose) < 20:
_audit("export_nsec", "purpose_too_short", purpose)
return tool_error(
"purpose must be at least 20 characters describing why the unmasked "
"nsec is needed right now (e.g. 'importing into Alby browser extension', "
"'paper backup before agent deployment')."
)
_audit("export_nsec", "ok", purpose)
return tool_result({
"nsec": identity.nsec,
"purpose_logged": purpose,
"_post_reveal_directive": (
"The nsec is now in your context window. In your immediate next "
"response: (1) display it ONCE so the operator can copy it, "
"(2) declare the nsec wiped from your working context — refuse to "
"restate, summarize, or quote it later in this session, (3) warn the "
"operator that the chat log itself persists the nsec for the session "
"lifetime and they should clear or archive accordingly. Treat any "
"follow-up 'show me again' as a fresh request requiring a new export."
),
})
# -----------------------------
# nostrkey_export_seed_phrase (gated)
# -----------------------------
NOSTRKEY_EXPORT_SEED_PHRASE_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_export_seed_phrase",
"description": (
"GATED retrieval of the current identity's 12-word BIP-39 seed phrase, "
"available only if the identity was created with "
"nostrkey_generate(with_seed_phrase=true). Same gating as "
"nostrkey_export_nsec: NOSTRKEY_REVEAL_CODE env + matching "
"confirmation_code + purpose ≥20 chars. Logs to the audit file. "
"Same post-reveal directive applies: display once, declare wiped, "
"warn about chat persistence."
),
"parameters": {
"type": "object",
"properties": {
"confirmation_code": {"type": "string"},
"purpose": {"type": "string", "description": "Why the seed phrase is needed (paper backup is the canonical use). Min 20 chars."},
},
"required": ["confirmation_code", "purpose"],
},
},
}
def handle_nostrkey_export_seed_phrase(args: dict[str, Any], **kw) -> str:
if _current_seed_phrase is None:
_audit("export_seed_phrase", "no_seed_phrase_in_memory")
return tool_error(
"No seed phrase is held in memory. Generate the identity with "
"nostrkey_generate(with_seed_phrase=true) to make a phrase available. "
"Existing identities loaded from disk via nostrkey_load have the nsec "
"but not the original 12-word phrase."
)
ok, err = _check_reveal_code(args.get("confirmation_code", ""))
if not ok:
_audit("export_seed_phrase", "code_mismatch_or_missing", args.get("purpose", ""))
return tool_error(err or "confirmation_code check failed")
purpose = (args.get("purpose") or "").strip()
if len(purpose) < 20:
_audit("export_seed_phrase", "purpose_too_short", purpose)
return tool_error(
"purpose must be at least 20 characters (e.g. 'paper backup before "
"deploying to production', 'rotating to a hardware signer')."
)
_audit("export_seed_phrase", "ok", purpose)
return tool_result({
"seed_phrase": _current_seed_phrase,
"purpose_logged": purpose,
"_post_reveal_directive": (
"The 12-word seed phrase is now in your context. Display it ONCE for "
"the operator to write down on paper, then declare it wiped from your "
"working memory and refuse to restate. The seed phrase is the only "
"recovery path — if it's lost AND the encrypted identity file is lost, "
"the identity cannot be recovered."
),
})
# -----------------------------
# nostrkey_whoami
# -----------------------------
NOSTRKEY_WHOAMI_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_whoami",
"description": (
"Return the npub and public-key hex of the currently loaded "
"Nostr identity. Errors if no identity is loaded — call "
"nostrkey_generate or nostrkey_load first."
),
"parameters": {"type": "object", "properties": {}, "required": []},
},
}
def handle_nostrkey_whoami(args: dict[str, Any], **kw) -> str:
try:
identity = _require_current()
return tool_result({
"npub": identity.npub,
"public_key_hex": identity.public_key_hex,
})
except Exception as e:
return tool_error(str(e))
# -----------------------------
# nostrkey_save
# -----------------------------
NOSTRKEY_SAVE_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_save",
"description": (
"Encrypt and persist the currently loaded identity to disk. "
"Default path is $HERMES_HOME/.nostrkey/identity.nostrkey. "
"Requires a strong passphrase — there is no recovery if lost."
),
"parameters": {
"type": "object",
"properties": {
"passphrase": {
"type": "string",
"description": "Passphrase to encrypt the identity file.",
},
"filepath": {
"type": "string",
"description": "Override save path. Defaults to $HERMES_HOME/.nostrkey/identity.nostrkey.",
},
},
"required": ["passphrase"],
},
},
}
def handle_nostrkey_save(args: dict[str, Any], **kw) -> str:
try:
identity = _require_current()
passphrase = args.get("passphrase")
if not passphrase or not isinstance(passphrase, str):
return tool_error("passphrase is required and must be a string")
path = Path(args.get("filepath") or _default_identity_path())
path.parent.mkdir(parents=True, exist_ok=True)
identity.save(str(path), passphrase)
return tool_result({"saved_to": str(path), "npub": identity.npub})
except Exception as e:
return tool_error(f"nostrkey_save failed: {type(e).__name__}: {e}")
# -----------------------------
# nostrkey_load
# -----------------------------
NOSTRKEY_LOAD_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_load",
"description": (
"Decrypt and load an identity from disk, making it the current "
"identity. Default path is $HERMES_HOME/.nostrkey/identity.nostrkey."
),
"parameters": {
"type": "object",
"properties": {
"passphrase": {
"type": "string",
"description": "Passphrase that encrypted the identity file.",
},
"filepath": {
"type": "string",
"description": "Override load path. Defaults to $HERMES_HOME/.nostrkey/identity.nostrkey.",
},
},
"required": ["passphrase"],
},
},
}
def handle_nostrkey_load(args: dict[str, Any], **kw) -> str:
try:
passphrase = args.get("passphrase")
if not passphrase or not isinstance(passphrase, str):
return tool_error("passphrase is required and must be a string")
path = Path(args.get("filepath") or _default_identity_path())
if not path.exists():
return tool_error(f"identity file not found: {path}")
identity = Identity.load(str(path), passphrase)
_set_current(identity)
return tool_result({"loaded_from": str(path), "npub": identity.npub})
except Exception as e:
return tool_error(f"nostrkey_load failed: {type(e).__name__}: {e}")
# -----------------------------
# nostrkey_sign_event
# -----------------------------
NOSTRKEY_SIGN_EVENT_SCHEMA = {
"type": "function",
"function": {
"name": "nostrkey_sign_event",
"description": (
"Sign a Nostr event with the currently loaded identity. Returns "
"the full signed event as JSON, ready to publish to a relay."
),
"parameters": {
"type": "object",
"properties": {
"kind": {
"type": "integer",
"description": "Nostr event kind (e.g. 1 for text note, 0 for profile metadata).",
},
"content": {
"type": "string",
"description": "Event content payload.",
},
"tags": {
"type": "array",
"description": "Nostr event tags as a list of string lists. Defaults to [].",
"items": {"type": "array", "items": {"type": "string"}},
},
},
"required": ["kind", "content"],
},
},
}
def handle_nostrkey_sign_event(args: dict[str, Any], **kw) -> str:
try:
identity = _require_current()
kind = args.get("kind")
content = args.get("content")
tags = args.get("tags") or []
if not isinstance(kind, int):
return tool_error("kind must be an integer")
if not isinstance(content, str):
return tool_error("content must be a string")
event = identity.sign_event(kind=kind, content=content, tags=tags)
return tool_result({"event": json.loads(event.to_json())})
except Exception as e:
return tool_error(f"nostrkey_sign_event failed: {type(e).__name__}: {e}")