Skip to content

Commit 51def88

Browse files
bloveclaude
andcommitted
feat(langgraph): translate CustomStreamEvents into structured AgentEvent
toAgent(ref) now emits events$: Observable<AgentEvent>. Translates state_update events into AgentStateUpdateEvent (with data: Record), all others into the structured AgentCustomEvent escape hatch. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 4c36c85 commit 51def88

2 files changed

Lines changed: 52 additions & 26 deletions

File tree

libs/langgraph/src/lib/to-agent.spec.ts

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { signal } from '@angular/core';
33
import { TestBed } from '@angular/core/testing';
44
import { HumanMessage, AIMessage } from '@langchain/core/messages';
5-
import type { Agent, AgentCustomEvent } from '@cacheplane/chat';
5+
import type { Agent, AgentEvent } from '@cacheplane/chat';
66
import type { AgentRef, CustomStreamEvent } from './agent.types';
77
import { ResourceStatus } from './agent.types';
88
import { toAgent } from './to-agent';
@@ -115,7 +115,39 @@ describe('toAgent (LangGraph adapter)', () => {
115115
});
116116
});
117117

118-
it('exposes customEvents$ that emits newly-appended events with type aliased from name', () => {
118+
it('translates a state_update CustomStreamEvent into AgentStateUpdateEvent', () => {
119+
TestBed.runInInjectionContext(() => {
120+
const customEvents = signal<any[]>([]);
121+
const ref = stubAgentRef({ customEvents } as any);
122+
const chat = toAgent(ref);
123+
124+
const received: any[] = [];
125+
chat.events$.subscribe((e) => received.push(e));
126+
127+
customEvents.set([{ name: 'state_update', data: { count: 1 } }]);
128+
TestBed.flushEffects();
129+
130+
expect(received).toEqual([{ type: 'state_update', data: { count: 1 } }]);
131+
});
132+
});
133+
134+
it('wraps non-state_update CustomStreamEvent as AgentCustomEvent', () => {
135+
TestBed.runInInjectionContext(() => {
136+
const customEvents = signal<any[]>([]);
137+
const ref = stubAgentRef({ customEvents } as any);
138+
const chat = toAgent(ref);
139+
140+
const received: any[] = [];
141+
chat.events$.subscribe((e) => received.push(e));
142+
143+
customEvents.set([{ name: 'tick', data: 42 }]);
144+
TestBed.flushEffects();
145+
146+
expect(received).toEqual([{ type: 'custom', name: 'tick', data: 42 }]);
147+
});
148+
});
149+
150+
it('exposes events$ that emits newly-appended events as structured AgentEvent', () => {
119151
const customSig = signal<CustomStreamEvent[]>([]);
120152
const ref = stubAgentRef({ customEvents: customSig });
121153

@@ -124,8 +156,8 @@ describe('toAgent (LangGraph adapter)', () => {
124156
adapter = toAgent(ref);
125157
});
126158

127-
const received: AgentCustomEvent[] = [];
128-
adapter.customEvents$!.subscribe((e) => received.push(e));
159+
const received: AgentEvent[] = [];
160+
adapter.events$.subscribe((e) => received.push(e));
129161

130162
customSig.set([{ name: 'state_update', data: { counter: 1 } }]);
131163
TestBed.flushEffects();
@@ -142,7 +174,7 @@ describe('toAgent (LangGraph adapter)', () => {
142174

143175
expect(received).toEqual([
144176
{ type: 'state_update', data: { counter: 1 } },
145-
{ type: 'a2ui.surface', data: { surfaceId: 'main' } },
177+
{ type: 'custom', name: 'a2ui.surface', data: { surfaceId: 'main' } },
146178
]);
147179
});
148180
});

libs/langgraph/src/lib/to-agent.ts

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,9 @@ import { Subject, type Observable } from 'rxjs';
44
import type { BaseMessage } from '@langchain/core/messages';
55
import type { ToolCallWithResult, Interrupt } from '@langchain/langgraph-sdk';
66
import type {
7-
AgentWithHistory,
8-
AgentCheckpoint,
9-
AgentCustomEvent,
10-
Message,
11-
Role,
12-
AgentStatus,
13-
ToolCall,
14-
ToolCallStatus,
15-
AgentInterrupt,
16-
Subagent,
17-
AgentSubmitInput,
18-
AgentSubmitOptions,
7+
AgentWithHistory, AgentCheckpoint, AgentEvent,
8+
Message, Role, ToolCall, ToolCallStatus, AgentStatus,
9+
AgentInterrupt, Subagent, AgentSubmitInput, AgentSubmitOptions,
1910
} from '@cacheplane/chat';
2011
import type { AgentRef, CustomStreamEvent, SubagentStreamRef, ThreadState } from './agent.types';
2112
import { ResourceStatus } from './agent.types';
@@ -56,7 +47,7 @@ export function toAgent<T>(ref: AgentRef<T, any>): AgentWithHistory {
5647
return out;
5748
});
5849

59-
const customEvents$ = buildCustomEvents$(ref);
50+
const events$ = buildEvents$(ref);
6051

6152
const history = computed<AgentCheckpoint[]>(() =>
6253
ref.history().map(toCheckpoint),
@@ -71,7 +62,7 @@ export function toAgent<T>(ref: AgentRef<T, any>): AgentWithHistory {
7162
state,
7263
interrupt,
7364
subagents,
74-
customEvents$,
65+
events$,
7566
history,
7667
submit: (input: AgentSubmitInput, opts?: AgentSubmitOptions) =>
7768
ref.submit(buildSubmitPayload(input), opts ? { signal: opts.signal } as never : undefined),
@@ -80,15 +71,15 @@ export function toAgent<T>(ref: AgentRef<T, any>): AgentWithHistory {
8071
}
8172

8273
/**
83-
* Build an Observable<AgentCustomEvent> that bridges LangGraph's
74+
* Build an Observable<AgentEvent> that bridges LangGraph's
8475
* `Signal<CustomStreamEvent[]>` (append-only array) into a stream of newly
8576
* emitted events. Each effect firing compares against a cursor tracking the
8677
* previously-seen length and emits only the tail slice.
8778
*/
88-
function buildCustomEvents$(
79+
function buildEvents$(
8980
ref: AgentRef<unknown, any>,
90-
): Observable<AgentCustomEvent> {
91-
const subject = new Subject<AgentCustomEvent>();
81+
): Observable<AgentEvent> {
82+
const subject = new Subject<AgentEvent>();
9283
let seen = 0;
9384
effect(() => {
9485
const all = ref.customEvents();
@@ -97,15 +88,18 @@ function buildCustomEvents$(
9788
seen = 0;
9889
}
9990
for (let i = seen; i < all.length; i++) {
100-
subject.next(toCustomEvent(all[i]));
91+
subject.next(toAgentEvent(all[i]));
10192
}
10293
seen = all.length;
10394
});
10495
return subject.asObservable();
10596
}
10697

107-
function toCustomEvent(e: CustomStreamEvent): AgentCustomEvent {
108-
return { type: e.name, data: e.data };
98+
function toAgentEvent(e: CustomStreamEvent): AgentEvent {
99+
if (e.name === 'state_update' && isRecord(e.data)) {
100+
return { type: 'state_update', data: e.data };
101+
}
102+
return { type: 'custom', name: e.name, data: e.data };
109103
}
110104

111105
function mapStatus(s: ResourceStatus): AgentStatus {

0 commit comments

Comments
 (0)