Skip to main content
The stream() method is used to make streaming chat completion requests to the Edgee AI Gateway. It returns a Result containing a Stream that yields Result<StreamChunk> objects as they arrive from the API.

Arguments

ParameterTypeDescription
model impl Into<String>The model identifier to use (e.g., "gpt-5.2")
input impl Into<Input>The input for the completion. Can be a string (&str or String), Vec<Message>, or InputObject

Input Types

The stream() method accepts the same input types as send():

String Input

When input is a string, it’s automatically converted to a user message:
use tokio_stream::StreamExt;

let mut stream = client.stream("gpt-5.2", "Tell me a story").await?;

while let Some(result) = stream.next().await {
    match result {
        Ok(chunk) => {
            if let Some(text) = chunk.text() {
                print!("{}", text);
            }
            
            if let Some(reason) = chunk.finish_reason() {
                println!("\nFinished: {}", reason);
            }
        }
        Err(e) => eprintln!("Stream error: {}", e),
    }
}
// Equivalent to: input: InputObject::new(vec![Message::user("Tell me a story")])

Vec<Message> or InputObject

When input is a Vec<Message> or InputObject, you have full control over the conversation:
PropertyTypeDescription
messages Vec<Message>Array of conversation messages
toolsOption<Vec<Tool>>Array of function tools available to the model
tool_choiceOption<serde_json::Value>Controls which tool (if any) the model should call. See Tools documentation for details
tagsOption<Vec<String>>Optional tags to categorize and label the request for analytics and filtering. Can also be sent via the x-edgee-tags header (comma-separated)
compression_modelOption<String>Compression model for this request: "agentic", "claude", "opencode", "cursor", or "customer". Each model is a bundle of compression strategies. Overrides API key settings when present.
compression_configurationOption<CompressionConfiguration>Configuration for the compression model. Currently only available for agentic. Contains optional rate (0.0-1.0, default 0.8) and semantic_preservation_threshold (0-100).
For details about Message type, see the Send Method documentation. For details about Tool and ToolChoice types, see the Tools documentation. Example - Streaming with Messages:
use edgee::Message;
use tokio_stream::StreamExt;

let messages = vec![
    Message::system("You are a helpful assistant."),
    Message::user("Write a poem about coding"),
];

let mut stream = client.stream("gpt-5.2", messages).await?;

while let Some(result) = stream.next().await {
    if let Ok(chunk) = result {
        if let Some(text) = chunk.text() {
            print!("{}", text);
        }
    }
}

Return Value

The stream() method returns a Result containing a Stream that yields Result<StreamChunk>. Each chunk contains incremental updates to the response.

StreamChunk Object

Each chunk yielded by the stream has the following structure:
PropertyTypeDescription
idStringUnique identifier for the completion
objectStringObject type (typically "chat.completion.chunk")
createdu64Unix timestamp of when the chunk was created
modelStringModel identifier used for the completion
choicesVec<StreamChoice>Array of streaming choices (typically one)
compressionOption<Compression>Token compression metrics (if compression was applied)

StreamChoice Object

Each choice in the choices array contains:
PropertyTypeDescription
indexu32The index of this choice in the array
deltaStreamDeltaThe incremental update to the message
finish_reasonOption<String>Reason why the generation stopped. Only present in the final chunk. Possible values: "stop", "length", "tool_calls", "content_filter", or None
Example - Handling Multiple Choices:
use tokio_stream::StreamExt;

let mut stream = client.stream("gpt-5.2", "Give me creative ideas").await?;

while let Some(result) = stream.next().await {
    if let Ok(chunk) = result {
        for choice in &chunk.choices {
            if let Some(content) = &choice.delta.content {
                println!("Choice {}: {}", choice.index, content);
            }
        }
    }
}

StreamDelta Object

The delta object contains incremental updates:
PropertyTypeDescription
roleOption<Role>The role of the message (typically Role::Assistant). Only present in the first chunk
contentOption<String>Incremental text content. Each chunk contains a portion of the full response
tool_callsOption<Vec<ToolCall>>Array of tool calls (if any). See Tools documentation for details

Convenience Methods

The StreamChunk struct provides convenience methods for easier access:
MethodReturn TypeDescription
text()Option<&str>Shortcut to choices[0].delta.content.as_deref() - the incremental text content
role()Option<&Role>Shortcut to choices[0].delta.role.as_ref() - the message role (first chunk only)
finish_reason()Option<&str>Shortcut to choices[0].finish_reason.as_deref() - the finish reason (final chunk only)
Example - Using Convenience Methods:
use tokio_stream::StreamExt;

let mut stream = client.stream("gpt-5.2", "Explain quantum computing").await?;

while let Some(result) = stream.next().await {
    match result {
        Ok(chunk) => {
            // Content chunks
            if let Some(text) = chunk.text() {
                print!("{}", text);
            }

            // First chunk contains the role
            if let Some(role) = chunk.role() {
                println!("\nRole: {:?}", role);
            }

            // Last chunk contains finish reason
            if let Some(reason) = chunk.finish_reason() {
                println!("\nFinish reason: {}", reason);
            }
        }
        Err(e) => eprintln!("Stream error: {}", e),
    }
}

Understanding Streaming Behavior

Chunk Structure

  1. First chunk: Contains role (typically Role::Assistant) and may contain initial content
  2. Content chunks: Contain incremental content updates
  3. Final chunk: Contains finish_reason indicating why generation stopped
Example - Collecting Full Response:
use tokio_stream::StreamExt;

let mut stream = client.stream("gpt-5.2", "Tell me a story").await?;
let mut full_text = String::new();

while let Some(result) = stream.next().await {
    match result {
        Ok(chunk) => {
            if let Some(text) = chunk.text() {
                full_text.push_str(text);
                print!("{}", text); // Also display as it streams
            }
        }
        Err(e) => eprintln!("Stream error: {}", e),
    }
}

println!("\n\nFull response ({} characters):", full_text.len());
println!("{}", full_text);

Finish Reasons

ValueDescription
"stop"Model generated a complete response and stopped naturally
"length"Response was cut off due to token limit
"tool_calls"Model requested tool/function calls
"content_filter"Content was filtered by safety systems
NoneGeneration is still in progress (not the final chunk)

Empty Chunks

Some chunks may not contain content. This is normal and can happen when:
  • The chunk only contains metadata (role, finish_reason)
  • The chunk is part of tool call processing
  • Network buffering creates empty chunks
Always check for chunk.text() before using it:
use tokio_stream::StreamExt;

let mut stream = client.stream("gpt-5.2", "Hello").await?;

while let Some(result) = stream.next().await {
    if let Ok(chunk) = result {
        if let Some(text) = chunk.text() {  // ✅ Good: Check before using
            println!("{}", text);
        }
        // ❌ Bad: println!("{:?}", chunk.text()) - may print None
    }
}

Error Handling

The stream() method can return errors at two levels:
  1. Initial error: When creating the stream (returns Result<Stream>)
  2. Stream errors: Individual chunks may contain errors (returns Result<StreamChunk>)
use edgee::Error;
use tokio_stream::StreamExt;

// Handle initial error
let mut stream = match client.stream("gpt-5.2", "Hello!").await {
    Ok(stream) => stream,
    Err(Error::Api { status, message }) => {
        eprintln!("API error {}: {}", status, message);
        return;
    }
    Err(e) => {
        eprintln!("Error creating stream: {}", e);
        return;
    }
};

// Handle stream errors
while let Some(result) = stream.next().await {
    match result {
        Ok(chunk) => {
            if let Some(text) = chunk.text() {
                print!("{}", text);
            }
        }
        Err(e) => {
            eprintln!("Stream error: {}", e);
        }
    }
}