The ultimate LangChain series — enable streaming
Let's add the streaming component to a chat instance
Introduction
Building on the techniques explored in the previous article of the ultimate LangChain series — Chat with Your Data — this guide will provide a concise and practical extension to enhance your user experience.
Our last tutorial went into implementing the chat component, enabling interaction with data stored in vector databases. However, you probably noticed that the response is dumped at once upon completion, leading to a noticeable delay and a less-than-ideal user experience.
With just a few additional lines of code, we'll guide you through the process of enabling streaming for new tokens as they are generated with LangChain. This simple yet impactful modification will transform how users interact with your application, providing a smoother and more responsive experience.
What is streaming in LLMs?
In the context of Language Model-based Learning Machines (LLMs), streaming refers to processing and transmitting data incrementally as it becomes available.
Unlike traditional batch processing, where data is processed all at once, streaming enables the LLM to process data as it arrives, piece by piece. This is great for real-time applications, such as chat interfaces or live translations, where immediate response and interaction are crucial for a good UX. LLMs can provide more responsive and fluid experiences by leveraging streaming, adapting to new inputs, and generating outputs on the fly.
Streaming with LangChain
With LangChain, we can use the StreamingStdOutCallbackHandler
stream tokens as the chain generates them.
The StreamingStdOutCallbackHandler
allows you to stream chain outputs to stdout
as they are generated. This is useful for long-running chains where you want to see outputs as they are produced rather than waiting for the full output at the end.
Adding streaming to the chat component
Let's take the chat component we made in the previous article:
import os
from dotenv import load_dotenv
from langchain.vectorstores import DeepLake
from langchain.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.embeddings import OpenAIEmbeddings
from langchain.callbacks import get_openai_callback
# Load environment variables from .env file
load_dotenv()
# Set environment variables
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY')
os.environ['ACTIVELOOP_TOKEN'] = os.getenv('ACTIVELOOP_TOKEN')
language_model = os.getenv('LANGUAGE_MODEL')
# Set DeepLake dataset path
DEEPLAKE_PATH = os.getenv('DATASET_PATH')
# Initialize OpenAI embeddings and disallow special tokens
EMBEDDINGS = OpenAIEmbeddings(disallowed_special=())
# Initialize DeepLake vector store with OpenAI embeddings
deep_lake = DeepLake(
dataset_path=DEEPLAKE_PATH,
read_only=True,
embedding_function=EMBEDDINGS,
)
# Initialize retriever and set search parameters
retriever = deep_lake.as_retriever()
retriever.search_kwargs.update({
'distance_metric': 'cos',
'fetch_k': 100,
'maximal_marginal_relevance': True,
'k': 10,
})
# Initialize ChatOpenAI model
model = ChatOpenAI(model_name=language_model, temperature=0.2) # gpt-3.5-turbo by default. Use gpt-4 for better and more accurate responses
# Initialize ConversationalRetrievalChain
qa = ConversationalRetrievalChain.from_llm(model, retriever=retriever)
# Initialize chat history
chat_history = []
def get_user_input():
"""Get user input and handle 'quit' command."""
question = input("\nPlease enter your question (or 'quit' to stop): ")
if question.lower() == 'quit':
return None
return question
def print_answer(question, answer):
"""Format and print question and answer."""
print(f"\nQuestion: {question}\nAnswer: {answer}\n")
def main():
"""Main program loop."""
while True:
question = get_user_input()
if question is None: # User has quit
break
# Display token usage and approximate costs
with get_openai_callback() as tokens_usage:
result = qa({"question": question, "chat_history": chat_history})
chat_history.append((question, result['answer']))
print_answer(question, result['answer'])
print(tokens_usage)
if __name__ == "__main__":
main()
This code creates a chat interface that interacts with an Activeloop vector store we created in the first article about vector DBs.
Note that you will need to create a vectorDB with some data to run this code;
How to create a vectorDB with Activeloop
How to create a vectorDB with Pinecone
To add streaming to the chat component, simply import the StreamingStdOutCallbackHandler
:
# Import streaming module
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
And add the streaming feature to the model instance:
model = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()], model_name=language_model, temperature=0.0)
You might also want to remove the line to print the answer; otherwise, you will get both the streamed answer and the full answer again.
while True:
question = get_user_input()
if question is None:
break
result = qa({"question": question, "chat_history": chat_history})
chat_history.append((question, result['answer']))
Note that the token tracker does not work with this basic implementation.
Conclusion
In this guide, we've explored the world of streaming with LangChain, building upon our previous topics of chat components and vector databases. We've seen how a simple yet powerful modification to our existing code can transform the user experience, enabling real-time interaction and responsiveness.
By understanding the concept of streaming in the context of Language Model-based Learning Machines (LLMs) and implementing it with the StreamingStdOutCallbackHandler
, we've unlocked a new dynamism in our application. The step-by-step instructions and code snippets should empower you to integrate streaming into your projects, enhancing users' engagement with your application.
In the next articles, we'll build CLI-based applications with LangChain to practice what you learn from this series. Stay tuned!