diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6895a2a6..71a84d5c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,9 @@ jobs: version: ${{ matrix.version }} arch: ${{ matrix.arch }} - uses: julia-actions/cache@v2 + - name: Use Reseau Windows IOCP experiment + if: matrix.os == 'windows-latest' + run: julia --project=. --startup-file=no --history-file=no -e 'ENV["JULIA_PKG_PRECOMPILE_AUTO"]="0"; using Pkg; Pkg.add(PackageSpec(name="Reseau", url="https://github.com/JuliaServices/Reseau.jl", rev="d5350e16e770515c58cc21f317424584c2336e35")); Pkg.instantiate()' - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 - uses: julia-actions/julia-processcoverage@v1 diff --git a/src/http_transport.jl b/src/http_transport.jl index dd65039fd..26d58d1fb 100644 --- a/src/http_transport.jl +++ b/src/http_transport.jl @@ -1663,6 +1663,7 @@ function _roundtrip_incoming!( ) catch err writer_err[] = err isa Exception ? err : ProtocolError("request upload failed") + _request_write_head_written(write_state) && return nothing _request_write_allows_close(write_state) || return nothing @try_ignore begin _close_conn!(conn) @@ -1686,7 +1687,9 @@ function _roundtrip_incoming!( if _request_write_done(write_state) wait(writer_task::Task) err = writer_err[] - err === nothing || throw(err::Exception) + if err !== nothing && !_request_write_head_written(write_state) + throw(err::Exception) + end end else try @@ -1713,6 +1716,10 @@ function _roundtrip_incoming!( end _set_conn_read_deadline!(conn, request_deadline) early_final = false + upload_err = has_request_body ? writer_err[] : nothing + if upload_err !== nothing && _request_upload_abort_error(upload_err::Exception) + early_final = true + end if _request_write_should_wait_for_continue(write_state) && _request_write_continue_state(write_state::_RequestWriteState) == _REQUEST_WRITE_CONTINUE_PENDING _request_write_mark_continue_suppressed!(write_state::_RequestWriteState) early_final = true diff --git a/src/http_websockets.jl b/src/http_websockets.jl index 1adeceef3..944dc87cc 100644 --- a/src/http_websockets.jl +++ b/src/http_websockets.jl @@ -11,7 +11,7 @@ with [`WebSocket`](@ref) values directly rather than the lower-level """ module WebSockets -import Base: close, iterate +import Base: close, iterate, isready import ..Headers import ..HTTPError @@ -516,6 +516,19 @@ function receive(ws::WebSocket) return take!(ws.readchannel) end +""" + isready(ws) -> Bool + +Return `true` when at least one complete message is buffered and can be read with +[`receive`](@ref) without blocking, and `false` otherwise. Useful for polling a +WebSocket for incoming messages without committing to a blocking `receive`. + +A `false` result does not imply the connection is closed — more messages may +still arrive. Conversely, messages buffered before the connection closed remain +`isready` and are returned by `receive` before it throws. +""" +isready(ws::WebSocket)::Bool = isready(ws.readchannel) + function Base.iterate(ws::WebSocket, st=nothing) # Note: do not early-return on isclosed(ws) here: messages may still be # buffered in ws.readchannel after the read task has set readclosed=true diff --git a/test/http_client_tests.jl b/test/http_client_tests.jl index b3b9b4d29..94040611c 100644 --- a/test/http_client_tests.jl +++ b/test/http_client_tests.jl @@ -339,6 +339,7 @@ end req1 = HT.read_request(HT._ConnReader(conn1)) push!(seen_methods, req1.method) push!(seen_targets, req1.target) + _ = _read_all_body_bytes_client(req1.body) headers1 = HT.Headers() HT.setheader(headers1, "Location", "/final") HT.setheader(headers1, "Connection", "close") diff --git a/test/http_parity_tests.jl b/test/http_parity_tests.jl index 20c1c1706..b693dd90a 100644 --- a/test/http_parity_tests.jl +++ b/test/http_parity_tests.jl @@ -24,6 +24,14 @@ function _wait_task_parity!(task::Task; timeout_s::Float64 = 5.0) return nothing end +function _write_response_parity!(conn::NC.Conn, response::HT.Response) + io = IOBuffer() + HT.write_response!(io, response) + write(conn, take!(io)) + HTTP.@try_ignore closewrite(conn) + return nothing +end + @testset "HTTP parity framing guards" begin raw_204 = "HTTP/1.1 204 No Content\r\nContent-Length: 10\r\n\r\nignored" response_204 = HT._read_response(IOBuffer(codeunits(raw_204))) @@ -58,15 +66,13 @@ end conn1 = NC.accept(listener) try req1 = HT.read_request(HT._ConnReader(conn1)) + _ = _read_all_parity(req1.body) push!(seen_methods, req1.method) headers = HT.Headers() HT.setheader(headers, "Location", "/next") HT.setheader(headers, "Connection", "close") resp1 = HT.Response(307, HT.EmptyBody(); reason = "Temporary Redirect", headers = headers, content_length = 0, close = true, request = req1) - io1 = IOBuffer() - HT.write_response!(io1, resp1) - bytes1 = take!(io1) - write(conn1, bytes1) + _write_response_parity!(conn1, resp1) finally HTTP.@try_ignore NC.close(conn1) end @@ -75,10 +81,7 @@ end req2 = HT.read_request(HT._ConnReader(conn2)) push!(seen_methods, req2.method) resp2 = HT.Response(200, HT.BytesBody(UInt8[0x6f, 0x6b]); content_length = 2, request = req2) - io2 = IOBuffer() - HT.write_response!(io2, resp2) - bytes2 = take!(io2) - write(conn2, bytes2) + _write_response_parity!(conn2, resp2) finally HTTP.@try_ignore NC.close(conn2) end @@ -111,9 +114,7 @@ end HT.setheader(headers, "Location", "/next") HT.setheader(headers, "Connection", "close") resp = HT.Response(307, HT.BytesBody(UInt8[0x72]); reason = "Temporary Redirect", headers = headers, content_length = 1, close = true, request = req) - io = IOBuffer() - HT.write_response!(io, resp) - write(conn, take!(io)) + _write_response_parity!(conn, resp) finally HTTP.@try_ignore NC.close(conn) end @@ -177,9 +178,7 @@ end HT.setheader(headers, "Location", "/next") HT.setheader(headers, "Connection", "close") resp1 = HT.Response(307, HT.EmptyBody(); reason = "Temporary Redirect", headers = headers, content_length = 0, close = true, request = req1) - io1 = IOBuffer() - HT.write_response!(io1, resp1) - write(conn1, take!(io1)) + _write_response_parity!(conn1, resp1) finally HTTP.@try_ignore NC.close(conn1) end @@ -189,9 +188,7 @@ end push!(seen_bodies, String(_read_all_parity(req2.body))) push!(seen_content_types, HT.header(req2.headers, "Content-Type")) resp2 = HT.Response(200, HT.BytesBody(UInt8[0x6f, 0x6b]); content_length = 2, request = req2) - io2 = IOBuffer() - HT.write_response!(io2, resp2) - write(conn2, take!(io2)) + _write_response_parity!(conn2, resp2) finally HTTP.@try_ignore NC.close(conn2) end @@ -222,9 +219,7 @@ end HT.setheader(headers, "Location", "/next") HT.setheader(headers, "Connection", "close") resp = HT.Response(307, HT.BytesBody(UInt8[0x72]); reason = "Temporary Redirect", headers = headers, content_length = 1, close = true, request = req) - io = IOBuffer() - HT.write_response!(io, resp) - write(conn, take!(io)) + _write_response_parity!(conn, resp) finally HTTP.@try_ignore NC.close(conn) end diff --git a/test/http_websocket_integration_tests.jl b/test/http_websocket_integration_tests.jl index a895bd032..d85323ba0 100644 --- a/test/http_websocket_integration_tests.jl +++ b/test/http_websocket_integration_tests.jl @@ -213,3 +213,23 @@ end stream = HT.Stream(bad) @test_throws W.WebSocketError W.upgrade(ws -> nothing, stream) end + +@testset "HTTP.WebSockets.isready reports buffered messages" begin + server = W.listen!("127.0.0.1", 0) do ws + msg = W.receive(ws) + W.send(ws, msg) + end + try + address = W.server_addr(server) + W.open("ws://$address/isready") do ws + @test !isready(ws) # nothing buffered yet + W.send(ws, "ping") + @test timedwait(() -> isready(ws), 5.0) != :timed_out # echo arrives in the channel + @test isready(ws) # a message is ready; receive won't block + @test W.receive(ws) == "ping" + @test !isready(ws) # drained + end + finally + close(server) + end +end