LangChain의 Stream과 StreamEvents에 대해 살펴보고,
이를 클라이언트에 전달하는 효과적인 방법인 Server-Sent Events 방식에 대해 알아본다.
LangChain의 Streaming
LLM의 토큰 생성
Causal Language Model들은 자동 회귀 방식으로 토큰을 생성한다. 이전까지 만들어진 토큰들을 토대로 다음에 나올 토큰을 예측(디코딩)하면서 문장을 완성한다. 가령
"GPT는 자동 회귀 -- --" 라는 토큰이 생성되었다면, 그 다음에 나온 토큰은 현재까지 생성된 토큰들을 토대로 확률적으로 결정한다. 만일 "GPT는 자동 회귀 -- 생성한다"와 같이 마스킹된 내용 이후를 알고 있는 상황에서도 "--"에 들어갈 말을 확률적으로 예측하기 위해 그 다음 단어인 "생성한다" 부분을 마스킹한다. (https://huggingface.co/learn/llm-course/en/chapter7/6). 이처럼 GPT는 왼쪽에서 오른쪽으로 토큰을 생성하고 확정해 나간다.
stream()
이처럼 토큰을 순차적으로 확정해나가는 LM들의 특성을 고려하면, 확정된 토큰들을 먼저 클라이언트에게 보내는 것은 합리적이다.
이에 LangChain은 TTFB를 최소화하기 위해 stream 메서드를 지원한다.(https://js.langchain.com/docs/how_to/streaming/)
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
stream에서 반환된 청크는 아래와 같은 형태의 객체이다.
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: '',
tool_call_chunks: [],
additional_kwargs: {},
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
또한 MessageChunk객체는 concat
등의 메서드를 지원한다.
let finalChunk = chunks[0];
for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello! I'm a",
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_call_chunks: [],
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: []
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: "Hello! I'm a",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
concat 메서드를 확인하면, 나머지 메타데이터는 가장 나중의 메타데이터가 덮어 씌우고, content가 병합되는 것을 알 수 있다.
StreamMode는 기본적으로 messages이며 아래와 같은 청크들을 반환한다.
// AIMessageChunk - AI 응답
{
id: "run-12345",
content: "Hello",
additional_kwargs: {},
response_metadata: {
tokenUsage: { completionTokens: 1, promptTokens: 10, totalTokens: 11 },
finish_reason: null
},
tool_calls: [],
invalid_tool_calls: [],
usage_metadata: {
input_tokens: 10,
output_tokens: 1,
total_tokens: 11
}
}
update, debug 같은 스트림모드를 추가로 지정해서 사용할 수 있으며, 배열 형태로 stream 모드를 지정할 수도 있다. stream 모드를 배열로 지정하는 경우 배열 형태로 반환되며, 배열의 첫번째 인자는 stream 모드의 이름이고, 두번째는 stream된 객체이다.
for await (const chunk of await app.stream(inputs, {
streamMode: ["checkpoints", "debug"],
configurable: { thread_id: sessionId },
})) {
console.log(chunk);
}
[
'checkpoints',
{
config: {
configurable: [Object],
metadata: [Object],
recursion_limit: 25,
tags: []
},
values: {
messages: [Array],
routeType: 'chat',
routingQuery: null,
postId: '818a9bbd-1d07-46d5-95dd-4e0ca25063e8',
postSummary: null
},
metadata: { source: 'loop', step: 2, parents: {} },
next: [ 'routingNode' ],
tasks: [ [Object] ],
parentConfig: {
configurable: [Object],
metadata: [Object],
recursion_limit: 25,
tags: []
}
}
]
비동기 이터레이터
LangChain js가 TTFB를 최소화하는 방식은 비동기 제너레이터이다.
가령 Ollama의 응답을 비동기 이터레이터로 처리한다면 제너레이터는 아래와 같은 형태의 코드가 될 것이다.
async function* stream(model, prompt) {
const response = await fetch('http://localhost:11434/api/generate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ model, prompt, stream: true })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n');
for (const line of lines) {
if (line.trim()) {
const data = JSON.parse(line);
yield data.response;
if (data.done) return;
}
}
}
}
Ollama에서 생성되는 응답을 즉시 yield와 반환하고 처리할 수 있다.
LangChain의 stream 역시 이와 같은 비동기 이터레이터를 반환한다.
StreamEvents
for await (const chunk of await app.streamEvents(inputs, {
version: "v2",
configurable: { thread_id: sessionId },
})) {
console.log(chunk);
}
LangChain.js는 streamEvents라는 인터페이스를 제공한다.
런너블 객체 안에 있는 다양한 이벤트들을 모두 캡처한다.
streamEvents를 사용할 때 version을 streamOption으로 넘겨주게 되는데, 현재 제공되는 버전은 v2이다. streamEvents는 지속적으로 업데이트 중에 있고, 업데이트는 하위 호환을 지원하지 않을 수 있다. 따라서 버전을 지정한다.
v2는 아래와 같은 청크를 반환한다.
{
event: 'on_chat_model_stream',
data: { chunk: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
일반적인 어플리케이션에서는 .stream 인터페이스만으로 충분하지만, 다양한 이벤트들을 캡처할 필요가 있는 경우에 streamEvents를 사용하면 상대적으로 일관된 인터페이스를 통해 유지보수하기 좋다.
Server-Sent Events
클라이언트가 서버와의 연결을 열면, 서버 측에서 응답을 푸시하는 구조를 의미한다.
클라이언트에서는 EventSource를 통해 서버 센트 이벤트를 위한 연결을 열 수 있다.
EventSource API는 onmessge를 제공한다.
const es = new EventSource("https://api.example.com/stream", { withCredentials: true });
es.addEventListener("message", (e) => {
// 기본 이벤트 (event 필드가 없는 메시지)
console.log("message:", e.data);
});
// 혹은
es.onmessage = (e) => {
console.log("message:", e.data);
}
서버 측에서는 헤더에 'text/event-stream' 를 콘텐츠 타입으로 담아 응답한다.
const app = express();
app.get("/stream", (req, res) => {
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.write("event: start");
res.write("id: 1")
res.write("data: 이벤트 타입을 지정할 수 있습니다\n\n");
res.write("id: 2")
res.write("data: 이벤트 타입을 지정하지 않으면 기본 message 채널로 전송된다.\n\n");
res.end();
req.on("close", () => {
// 클라이언트가 연결을 끊었을 때의 로직을 입력한다.
});
retry와 Last-Event-ID
SSE는 클라이언트의 연결이 끊겼을 때를 대비하기 위한 사양이 규정되어 있다.
만일 클라이언트가 "close"를 호출하지 않고 연결이 끊겼을 때에는 대략 3~5초 간격으로 리트라이를 시도한다. (크롬은 3초 간격, 최대 30초)
"retry: 1000"과 같이 명시적으로 넣을 수 있다.
한번 이벤트에 id를 명시하면, "Last-Event-ID" 헤더를 이용해 수신에 성공했던 id를 전달한다.
https://html.spec.whatwg.org/multipage/server-sent-events.html
서버에서 연결복구 로직을 수행하고 싶다면 Last-Event-ID를 헤더로부터 읽어 로직을 수행하면 된다.
const app = express();
const events = []
app.get("/stream", (req, res) => {
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
// 재연결시의 로직 추가
const lastEventId = req.headers['last-event-id'];
if(lastEventId) {
const missedEvents = 놓친이벤트가져오기(lastEventId)
missedEvents.forEach((event)=>{
res.write(`id: ${event.id}`)
res.write(`data: ${event.data}\n\n`);
}
}
res.write("event: start");
res.write("id: 1")
res.write("data: 이벤트 타입을 지정할 수 있습니다\n\n");
res.write("id: 2")
res.write("data: 이벤트 타입을 지정하지 않으면 기본 message 채널로 전송된다.\n\n");
res.end();
req.on("close", () => {
// 클라이언트가 연결을 끊었을 때의 로직을 입력한다.
});
}
Runnerble.streamEvents와 SSE 연동하기
export function GET(request) {
const stream = new ReadableStream({
start(controller) {
controller.enqueue('첫 번째 청크');
controller.enqueue('두 번째 청크');
controller.close();
}
});
}
Stream API의 ReadableStream을 이용해 청크 단위의 처리가 가능하다.
해당 스트림을 서버 센트 이벤트로 보낼 때 아래와 같이 반환하면 된다.
export function GET(request) {
const stream = new ReadableStream({
start(controller) {
controller.enqueue('첫 번째 청크');
controller.enqueue('두 번째 청크');
}
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive", // controller.close() 없이 연결을 유지하고 싶을 때
// "X-Accel-Buffering": 'no', // nginx 용
},
});
}
참고로 각 서버마다 스트림을 버퍼로 내보내는 설정이 있다. 가령 nginx의 경우 "X-Accel-Buffering"이고, Cloudflare의 경우 "CF-Cache-Status"이다. 스트림에 대한 버퍼를 끄고 싶다면 이와 관련된 설정을 헤더에 추가한다.
랭그래프를 이용하여 구현하면 아래와 같은 형태가 된다.
// 서버에서 GET 요청으로 클라이언트와 연결한다.
export function GET(request) {
const stream = new ReadableStream({
start(controller) {
for await (const chunk of llm.streamEvents(inputs, {
version: "v2",
configurable: { thread_id: sessionId },
})) {
chatEventHander({ controller, chunk });
}
}
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
// 이벤트 청크를 처리하는 로직
const encoder = new TextEncoder(); // 응답을 utf-8로 안전하게 유지하기 위함.
function eventHandler({
controller,
chunk,
}: {
controller: ReadableStreamDefaultController;
chunk: StreamEvent;
}) {
const event = chunk.event
if(event === "on_chat_model_start") {
// ...
}
if(event === "on_chat_model_stream") {
const id = chunk.run_id // streamEvents에서 run_id라는 무작위 id를 생성함.
const data = chunk.data
controller.enqueue(encoder.encode(`id: ${JSON.stringify(id)}\n\n`));
controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
}
if(event === "on_chat_model_end") {
// ...
}
}
streamEvents의 청크에 "event"라는 프로퍼티로 이벤트명이 나오는 것을 이용하여 조건 분기를 처리한다.
이후 각각의 이벤트에서 ReadableStream의 controller를 이용하여 enqueue에 적절한 데이터를 넣어주면 된다.
만일 id가 필요한 경우 streamEvents의 청크에 run_id라는 무작위 id가 생성되어 있으니 이를 포함하여 전송하면 된다.
abort 처리
const abortController = new AbortController();
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of app.streamEvents(inputs, {
version: "v2",
configurable: { thread_id: sessionId },
signal: abortController.signal, // abortController의 시그널 추가
})) {
chatEventHander({ controller, chunk });
}
},
cancel() {
// 사용자의 요청으로 SSE가 중단된 경우 실행될 이벤트
abortController.abort();
},
});
사용자의 중단 요청을 처리할 때에는 Web API인 AbortController를 사용하면 된다.
cancle요청에서 abort()를 통해 signal.aborted를 true로 만들면 Web API들이 중단되는데,
런너블 객체의 signal에 넘겨주면 런너블 객체 역시 스트리밍을 중단한다. 단, LLM 서버에서는 런너블 객체의 stream이 중단된 이후에도 지속적으로 청크를 전송
참고
whatwg - server sent events
MDN — Server-sent events
MDN — Using server-sent events