Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ jobs:
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: julia-actions/cache@v2
- name: Use Reseau Windows IOCP experiment
if: matrix.os == 'windows-latest'
run: julia --project=. --startup-file=no --history-file=no -e 'ENV["JULIA_PKG_PRECOMPILE_AUTO"]="0"; using Pkg; Pkg.add(PackageSpec(name="Reseau", url="https://github.com/JuliaServices/Reseau.jl", rev="d5350e16e770515c58cc21f317424584c2336e35")); Pkg.instantiate()'
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
- uses: julia-actions/julia-processcoverage@v1
Expand Down
9 changes: 8 additions & 1 deletion src/http_transport.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ function _roundtrip_incoming!(
)
catch err
writer_err[] = err isa Exception ? err : ProtocolError("request upload failed")
_request_write_head_written(write_state) && return nothing
_request_write_allows_close(write_state) || return nothing
@try_ignore begin
_close_conn!(conn)
Expand All @@ -1686,7 +1687,9 @@ function _roundtrip_incoming!(
if _request_write_done(write_state)
wait(writer_task::Task)
err = writer_err[]
err === nothing || throw(err::Exception)
if err !== nothing && !_request_write_head_written(write_state)
throw(err::Exception)
end
end
else
try
Expand All @@ -1713,6 +1716,10 @@ function _roundtrip_incoming!(
end
_set_conn_read_deadline!(conn, request_deadline)
early_final = false
upload_err = has_request_body ? writer_err[] : nothing
if upload_err !== nothing && _request_upload_abort_error(upload_err::Exception)
early_final = true
end
if _request_write_should_wait_for_continue(write_state) && _request_write_continue_state(write_state::_RequestWriteState) == _REQUEST_WRITE_CONTINUE_PENDING
_request_write_mark_continue_suppressed!(write_state::_RequestWriteState)
early_final = true
Expand Down
15 changes: 14 additions & 1 deletion src/http_websockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ with [`WebSocket`](@ref) values directly rather than the lower-level
"""
module WebSockets

import Base: close, iterate
import Base: close, iterate, isready

import ..Headers
import ..HTTPError
Expand Down Expand Up @@ -516,6 +516,19 @@ function receive(ws::WebSocket)
return take!(ws.readchannel)
end

"""
isready(ws) -> Bool

Return `true` when at least one complete message is buffered and can be read with
[`receive`](@ref) without blocking, and `false` otherwise. Useful for polling a
WebSocket for incoming messages without committing to a blocking `receive`.

A `false` result does not imply the connection is closed — more messages may
still arrive. Conversely, messages buffered before the connection closed remain
`isready` and are returned by `receive` before it throws.
"""
isready(ws::WebSocket)::Bool = isready(ws.readchannel)

function Base.iterate(ws::WebSocket, st=nothing)
# Note: do not early-return on isclosed(ws) here: messages may still be
# buffered in ws.readchannel after the read task has set readclosed=true
Expand Down
1 change: 1 addition & 0 deletions test/http_client_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ end
req1 = HT.read_request(HT._ConnReader(conn1))
push!(seen_methods, req1.method)
push!(seen_targets, req1.target)
_ = _read_all_body_bytes_client(req1.body)
headers1 = HT.Headers()
HT.setheader(headers1, "Location", "/final")
HT.setheader(headers1, "Connection", "close")
Expand Down
35 changes: 15 additions & 20 deletions test/http_parity_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ function _wait_task_parity!(task::Task; timeout_s::Float64 = 5.0)
return nothing
end

function _write_response_parity!(conn::NC.Conn, response::HT.Response)
io = IOBuffer()
HT.write_response!(io, response)
write(conn, take!(io))
HTTP.@try_ignore closewrite(conn)
return nothing
end

@testset "HTTP parity framing guards" begin
raw_204 = "HTTP/1.1 204 No Content\r\nContent-Length: 10\r\n\r\nignored"
response_204 = HT._read_response(IOBuffer(codeunits(raw_204)))
Expand Down Expand Up @@ -58,15 +66,13 @@ end
conn1 = NC.accept(listener)
try
req1 = HT.read_request(HT._ConnReader(conn1))
_ = _read_all_parity(req1.body)
push!(seen_methods, req1.method)
headers = HT.Headers()
HT.setheader(headers, "Location", "/next")
HT.setheader(headers, "Connection", "close")
resp1 = HT.Response(307, HT.EmptyBody(); reason = "Temporary Redirect", headers = headers, content_length = 0, close = true, request = req1)
io1 = IOBuffer()
HT.write_response!(io1, resp1)
bytes1 = take!(io1)
write(conn1, bytes1)
_write_response_parity!(conn1, resp1)
finally
HTTP.@try_ignore NC.close(conn1)
end
Expand All @@ -75,10 +81,7 @@ end
req2 = HT.read_request(HT._ConnReader(conn2))
push!(seen_methods, req2.method)
resp2 = HT.Response(200, HT.BytesBody(UInt8[0x6f, 0x6b]); content_length = 2, request = req2)
io2 = IOBuffer()
HT.write_response!(io2, resp2)
bytes2 = take!(io2)
write(conn2, bytes2)
_write_response_parity!(conn2, resp2)
finally
HTTP.@try_ignore NC.close(conn2)
end
Expand Down Expand Up @@ -111,9 +114,7 @@ end
HT.setheader(headers, "Location", "/next")
HT.setheader(headers, "Connection", "close")
resp = HT.Response(307, HT.BytesBody(UInt8[0x72]); reason = "Temporary Redirect", headers = headers, content_length = 1, close = true, request = req)
io = IOBuffer()
HT.write_response!(io, resp)
write(conn, take!(io))
_write_response_parity!(conn, resp)
finally
HTTP.@try_ignore NC.close(conn)
end
Expand Down Expand Up @@ -177,9 +178,7 @@ end
HT.setheader(headers, "Location", "/next")
HT.setheader(headers, "Connection", "close")
resp1 = HT.Response(307, HT.EmptyBody(); reason = "Temporary Redirect", headers = headers, content_length = 0, close = true, request = req1)
io1 = IOBuffer()
HT.write_response!(io1, resp1)
write(conn1, take!(io1))
_write_response_parity!(conn1, resp1)
finally
HTTP.@try_ignore NC.close(conn1)
end
Expand All @@ -189,9 +188,7 @@ end
push!(seen_bodies, String(_read_all_parity(req2.body)))
push!(seen_content_types, HT.header(req2.headers, "Content-Type"))
resp2 = HT.Response(200, HT.BytesBody(UInt8[0x6f, 0x6b]); content_length = 2, request = req2)
io2 = IOBuffer()
HT.write_response!(io2, resp2)
write(conn2, take!(io2))
_write_response_parity!(conn2, resp2)
finally
HTTP.@try_ignore NC.close(conn2)
end
Expand Down Expand Up @@ -222,9 +219,7 @@ end
HT.setheader(headers, "Location", "/next")
HT.setheader(headers, "Connection", "close")
resp = HT.Response(307, HT.BytesBody(UInt8[0x72]); reason = "Temporary Redirect", headers = headers, content_length = 1, close = true, request = req)
io = IOBuffer()
HT.write_response!(io, resp)
write(conn, take!(io))
_write_response_parity!(conn, resp)
finally
HTTP.@try_ignore NC.close(conn)
end
Expand Down
20 changes: 20 additions & 0 deletions test/http_websocket_integration_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,23 @@ end
stream = HT.Stream(bad)
@test_throws W.WebSocketError W.upgrade(ws -> nothing, stream)
end

@testset "HTTP.WebSockets.isready reports buffered messages" begin
server = W.listen!("127.0.0.1", 0) do ws
msg = W.receive(ws)
W.send(ws, msg)
end
try
address = W.server_addr(server)
W.open("ws://$address/isready") do ws
@test !isready(ws) # nothing buffered yet
W.send(ws, "ping")
@test timedwait(() -> isready(ws), 5.0) != :timed_out # echo arrives in the channel
@test isready(ws) # a message is ready; receive won't block
@test W.receive(ws) == "ping"
@test !isready(ws) # drained
end
finally
close(server)
end
end
Loading