Skip to content

Streaming error messages duplicates error message and loses stacktrace? with MVP #1216

@Sixzero

Description

@Sixzero

I wrote an MVP to show the issue, basically throwing an error while streaming with HTTP.jl:

# streaming_mvp.jl
using HTTP, JSON3, Sockets

"""
Minimal Viable Product (MVP) for HTTP streaming
Demonstrates a server that streams responses and a client that consumes them
"""

# Define a simple callback type to handle streaming responses
mutable struct SimpleStreamCallback
    SimpleStreamCallback() = new()
end
error_function(data) = if occursin("3", data) error("Error: The simulated error is here, we need readable stacktrace") end
# Simple callback function that just prints the data
function process_chunk(callback::SimpleStreamCallback, data)
    error_function(data)
    print(data)
end

# Server implementation
function create_streaming_server(host="127.0.0.1", port=8080)
    router = HTTP.Router()
    
    # Streaming endpoint
    function stream_handler(stream::HTTP.Stream)
        HTTP.setheader(stream, "Content-Type" => "text/event-stream")
        HTTP.setheader(stream, "Cache-Control" => "no-cache")
        HTTP.setheader(stream, "Connection" => "keep-alive")
        
        # Process the request body if needed
        body = String(read(stream))
        
        # Stream response data
        for i in 1:10
            # Send a data message in SSE format
            write(stream, "data: Chunk $i of data\n\n")
            sleep(0.5)
        end
        
        # Send final message
        write(stream, "data: Streaming completed\n\n")
        
        return nothing
    end
    
    # Register the streaming endpoint
    HTTP.register!(router, "POST", "/stream", stream_handler)
    
    # Start the server
    server = HTTP.serve!(router, host, port; stream=true)
    
    println("Server running at http://$host:$port")
    return server
end

# Simplified client implementation
function streamed_request!(callback::SimpleStreamCallback, url, headers, input)
    resp = HTTP.open("POST", url, headers) do stream
        # Send request body
        write(stream, String(take!(input)))
        HTTP.closewrite(stream)
        
        # Start reading the response
        response = HTTP.startread(stream)
        
        # Verify content type
        content_type = [header[2]
                        for header in response.headers
                        if lowercase(header[1]) == "content-type"]
        @assert length(content_type)==1 "Content-Type header must be present and unique"
        @assert occursin("text/event-stream", lowercase(content_type[1])) "Content-Type must be text/event-stream"
        
        # Process the stream
        buffer = ""
        while !eof(stream)
            # Read available data
            chunk = String(readavailable(stream))
            buffer *= chunk
            
            # Process complete SSE messages (format: "message\n\n")
            while occursin("\n\n", buffer)
                parts = split(buffer, "\n\n", limit=2)
                message = parts[1]
                buffer = length(parts) > 1 ? parts[2] : ""
                
                # Extract data from SSE format
                if startswith(message, "")
                    data = replace(message, "data: " => "")
                    # Call the callback with the data
                    process_chunk(callback, data)
                end
            end
        end
        
        HTTP.closeread(stream)
        return response
    end
    
    # Add a newline for better formatting
    println()
    
    return resp
end

# Example usage
function run_example()
    # Start the server
    server = create_streaming_server()
    
    try
        # Create a simple callback
        callback = SimpleStreamCallback()
        
        # Prepare request
        url = "http://127.0.0.1:8080/stream"
        headers = ["Content-Type" => "application/json"]
        input = IOBuffer(JSON3.write(Dict(:prompt => "Test streaming")))
        
        # Make the streaming request
        println("Making streaming request...")
        resp = streamed_request!(callback, url, headers, input)
        
        # Show the final response status
        println("\nRequest completed with status: $(resp.status)")
    finally
        # Shutdown the server
        close(server)
        @assert istaskdone(server.task)
    end
end

# Run the example
run_example()

Output:

Server running at http://127.0.0.1:8080
Making streaming request...
Chunk 1 of dataChunk 2 of data[ Info: Server on 127.0.0.1:8080 closing
ERROR: HTTP.RequestError:
HTTP.Request:
HTTP.Messages.Request:
"""
POST /stream HTTP/1.1
Content-Type: application/json
Host: 127.0.0.1:8080
Accept: */*
User-Agent: HTTP.jl/1.11.3
Accept-Encoding: gzip
Transfer-Encoding: chunked

[Message Body was streamed]"""Underlying error:
Error: The simulated error is here, we need readable stacktrace
Stacktrace:
  [1] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, closeimmediately::Bool, kw::@Kwargs{…})
    @ HTTP.ConnectionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:143
  [2] connections
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:60 [inlined]
  [3] (::Base.var"#106#108"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
    @ Base ./error.jl:300
  [4] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
    @ HTTP.RetryRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:75
  [5] manageretries
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:30 [inlined]
  [6] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
    @ HTTP.CookieRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/CookieRequest.jl:42
  [7] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Function, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
    @ HTTP.HeadersRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:71
  [8] defaultheaders
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:14 [inlined]
  [9] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
    @ HTTP.RedirectRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:25
 [10] redirects
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:14 [inlined]
 [11] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Vector{…}, body::Nothing; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{…})
    @ HTTP.MessageRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:35
 [12] makerequest
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:24 [inlined]
 [13] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Vector{…}, b::Nothing, q::Nothing; headers::Vector{…}, body::Nothing, query::Nothing, kw::@Kwargs{…})
    @ HTTP ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:457
 [14] #request#20
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:315 [inlined]
 [15] request
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:313 [inlined]
 [16] open
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:579 [inlined]
 [17] streamed_request!
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:57 [inlined]
 [18] run_example()
    @ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:120
 [19] top-level scope
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:132

caused by: Error: The simulated error is here, we need readable stacktrace
Stacktrace:
  [1] error(s::String)
    @ Base ./error.jl:35
  [2] error_function
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:12 [inlined]
  [3] process_chunk
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:15 [inlined]
  [4] (::var"#29#32"{…})(stream::HTTP.Streams.Stream{…})
    @ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:89
  [5] macro expansion
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:65 [inlined]
  [6] macro expansion
    @ ./task.jl:498 [inlined]
  [7] streamlayer(stream::HTTP.Streams.Stream{…}; iofunction::var"#29#32"{…}, decompress::Nothing, logerrors::Bool, logtag::Nothing, timedout::Nothing, kw::@Kwargs{…})
    @ HTTP.StreamRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:35
  [8] streamlayer
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:21 [inlined]
  [9] (::HTTP.ExceptionRequest.var"#exceptions#2"{…})(stream::HTTP.Streams.Stream{…}; status_exception::Bool, timedout::Nothing, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.ExceptionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ExceptionRequest.jl:14
 [10] exceptions
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ExceptionRequest.jl:13 [inlined]
 [11] (::HTTP.TimeoutRequest.var"#timeouts#3"{…})(stream::HTTP.Streams.Stream{…}; readtimeout::Int64, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.TimeoutRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/TimeoutRequest.jl:18
 [12] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, closeimmediately::Bool, kw::@Kwargs{…})
    @ HTTP.ConnectionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:122
 [13] connections
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:60 [inlined]
 [14] (::Base.var"#106#108"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
    @ Base ./error.jl:300
 [15] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
    @ HTTP.RetryRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:75
 [16] manageretries
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:30 [inlined]
 [17] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
    @ HTTP.CookieRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/CookieRequest.jl:42
 [18] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Function, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
    @ HTTP.HeadersRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:71
 [19] defaultheaders
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:14 [inlined]
 [20] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
    @ HTTP.RedirectRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:25
 [21] redirects
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:14 [inlined]
 [22] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Vector{…}, body::Nothing; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{…})
    @ HTTP.MessageRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:35
 [23] makerequest
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:24 [inlined]
 [24] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Vector{…}, b::Nothing, q::Nothing; headers::Vector{…}, body::Nothing, query::Nothing, kw::@Kwargs{…})
    @ HTTP ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:457
 [25] #request#20
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:315 [inlined]
 [26] request
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:313 [inlined]
 [27] open
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:579 [inlined]
 [28] streamed_request!
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:57 [inlined]
 [29] run_example()
    @ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:120
 [30] top-level scope
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:132
Some type information was truncated. Use `show(err)` to see complete types.

What is cool is that I actually can see a stacktrace here, but only in the second message shows those 3 lines we need. I wonder if things are well written here, why would we get duplicated stacktrace lines ?

Also I feel the stacktrace is a little bit too deep, I wonder if we could make this a little bit simpler.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions