diff --git a/CHANGELOG.md b/CHANGELOG.md index e1da867..69e18d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,32 @@ All notable changes to this project are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Security / hardening + +- Lower the default per-frame cap (`DefaultMaxFrameSize`) from 1 GiB to + 64 MiB and stop pre-allocating the attacker-declared frame size in + `ReadFrame` — payloads now grow incrementally as bytes arrive, so a single + hostile peer can no longer OOM the receiver by announcing a giant frame. + Raise the cap with `PILOT_DATAEXCHANGE_MAX_FRAME` (both ends must agree). +- Add a per-connection idle/read deadline (`ServiceConfig.IdleTimeout`, + default 2 min) reset before every frame so slowloris peers can no longer + hold connections open indefinitely. +- Add a total-byte inbox cap (`ServiceConfig.InboxMaxBytes`, default 256 MiB) + enforced on every receipt, alongside the existing file-count cap. +- Add a received-files disk quota (`ServiceConfig.ReceivedMaxBytes`, default + 2 GiB) covering completed files and retained `.partial` stream fragments, + enforced before every write (legacy `TypeFile` and chunked `TypeFileStream`). +- Store binary / non-UTF-8 inbox payloads as base64 (`data_b64`) by default + with a `data_encoding` marker, instead of silently corrupting them into a + lossy JSON string. +- Reject over-long filenames in `WriteFrame` before the `uint16` length cast + so a name cannot wrap/truncate onto the wire. + +A negative value for any of `InboxMaxBytes`, `ReceivedMaxBytes`, or +`IdleTimeout` disables that limit (escape hatch); zero selects the default. + ## [v0.1.0] Initial release. diff --git a/dataexchange.go b/dataexchange.go index 00beb46..d32c158 100644 --- a/dataexchange.go +++ b/dataexchange.go @@ -3,6 +3,7 @@ package dataexchange import ( + "bytes" "encoding/binary" "fmt" "io" @@ -68,19 +69,27 @@ func ReadTracePayload(f *Frame) (*TraceFrame, error) { // maxFilenameLen limits filename length to prevent abuse. const maxFilenameLen = 255 -// DefaultMaxFrameSize caps a single data-exchange frame at 1 GiB. +// DefaultMaxFrameSize caps a single data-exchange frame at 64 MiB. // // The wire format's length field is a uint32 (see Frame docstring below), -// so the absolute ceiling is ~4 GiB; this cap exists purely to bound the -// memory the receiver allocates for any single transfer. Raising it -// trades RAM for the ability to ship larger artefacts in one frame. +// so the absolute ceiling is ~4 GiB; this cap exists to bound the memory a +// single hostile peer can make the receiver commit for one transfer. The +// receiver no longer pre-allocates the attacker-declared length up front +// (see ReadFrame) — it grows the buffer as bytes actually arrive — but the +// cap still bounds the steady-state worst case, so it is deliberately set +// well below the wire ceiling. // -// History: was 256 MiB pre-2026-06-14 (sized for the test fleet's 100 MiB -// payloads). Raised after operators on multi-GiB-RAM hosts hit the cap on -// real workloads; the chunked streaming protocol described in -// docs/PROPOSAL-reliable-file-transfer.md (web4) will eventually remove -// the need for a per-frame cap entirely. -const DefaultMaxFrameSize uint32 = 1 << 30 +// 64 MiB comfortably covers inbox messages and the deprecated legacy +// single-frame TypeFile path; anything larger should ride the chunked +// TypeFileStream protocol (filestream.go), which never buffers a whole +// file in memory. Operators who genuinely need bigger single frames can +// raise the cap via PILOT_DATAEXCHANGE_MAX_FRAME (both ends must agree). +// +// History: 256 MiB pre-2026-06-14, then briefly 1 GiB (sized for the test +// fleet's 100 MiB payloads on multi-GiB-RAM hosts). 1 GiB let a single +// hostile peer OOM the receiver, so the default was lowered to 64 MiB and +// the up-front allocation was removed. +const DefaultMaxFrameSize uint32 = 64 << 20 // MaxFrameSize is the runtime-effective frame cap. Set at package init // from the PILOT_DATAEXCHANGE_MAX_FRAME env var (in bytes) if present @@ -118,8 +127,14 @@ type Frame struct { func WriteFrame(w io.Writer, f *Frame) error { payload := f.Payload if f.Type == TypeFile { - // Prepend filename + // Prepend filename. Validate the length BEFORE the uint16 cast: a + // name longer than 65535 bytes would wrap the 2-byte length field + // and silently truncate, and any name over maxFilenameLen is + // rejected by ReadFrame anyway — fail fast on the writer side. name := []byte(f.Filename) + if len(name) > maxFilenameLen { + return fmt.Errorf("filename too long: %d bytes (max %d)", len(name), maxFilenameLen) + } payload = make([]byte, 2+len(name)+len(f.Payload)) binary.BigEndian.PutUint16(payload[0:2], uint16(len(name))) copy(payload[2:], name) @@ -149,8 +164,14 @@ func ReadFrame(r io.Reader) (*Frame, error) { return nil, fmt.Errorf("frame too large: %d (max %d)", length, MaxFrameSize) } - payload := make([]byte, length) - if _, err := io.ReadFull(r, payload); err != nil { + // Do NOT pre-allocate `length` bytes — that is the attacker-declared + // size, so a single hostile peer could announce a huge (but in-cap) + // frame and never send the bytes, pinning that much RAM per connection. + // Instead grow incrementally as bytes actually arrive, with a bounded + // starting capacity. io.CopyN stops at exactly `length`, and the final + // length is re-checked so a truncated stream surfaces as ErrUnexpectedEOF. + payload, err := readBounded(r, int64(length)) + if err != nil { return nil, err } @@ -183,6 +204,40 @@ func ReadFrame(r io.Reader) (*Frame, error) { return f, nil } +// readBoundedInitialCap bounds the initial buffer capacity reserved before +// any payload bytes arrive. A frame can legitimately be up to MaxFrameSize, +// but reserving that much for the declared (untrusted) length is exactly the +// OOM vector we are closing — so we reserve at most this much up front and +// let bytes.Buffer grow geometrically as real data lands. +const readBoundedInitialCap = 64 * 1024 + +// readBounded reads exactly n bytes from r without trusting n to size the +// initial allocation. It grows a buffer as bytes arrive (capped initial +// reservation), so an attacker who declares a large length but never sends +// the bytes only ties up the small initial buffer, not n bytes. A short +// read returns io.ErrUnexpectedEOF, matching io.ReadFull's contract. +func readBounded(r io.Reader, n int64) ([]byte, error) { + if n == 0 { + return []byte{}, nil + } + initial := n + if initial > readBoundedInitialCap { + initial = readBoundedInitialCap + } + buf := bytes.NewBuffer(make([]byte, 0, initial)) + read, err := io.CopyN(buf, r, n) + if err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return nil, err + } + if read != n { + return nil, io.ErrUnexpectedEOF + } + return buf.Bytes(), nil +} + // TypeName returns a human-readable name for a frame type. func TypeName(t uint32) string { switch t { diff --git a/filestream.go b/filestream.go index c11091f..99046e2 100644 --- a/filestream.go +++ b/filestream.go @@ -43,13 +43,13 @@ import ( // Stream control-frame kinds (the first byte of a TypeFileStream payload). const ( - streamKindInit byte = 0x01 // sender→receiver: filename, size, full hash, chunk size - streamKindChunk byte = 0x02 // sender→receiver: offset + chunk bytes - streamKindAck byte = 0x03 // receiver→sender: highest contiguous offset received - streamKindDone byte = 0x04 // sender→receiver: end of stream; verify full hash - streamKindInitAck byte = 0x05 // receiver→sender: resume offset (presence ⇒ peer supports stream) + streamKindInit byte = 0x01 // sender→receiver: filename, size, full hash, chunk size + streamKindChunk byte = 0x02 // sender→receiver: offset + chunk bytes + streamKindAck byte = 0x03 // receiver→sender: highest contiguous offset received + streamKindDone byte = 0x04 // sender→receiver: end of stream; verify full hash + streamKindInitAck byte = 0x05 // receiver→sender: resume offset (presence ⇒ peer supports stream) streamKindComplete byte = 0x06 // receiver→sender: final status after DONE - streamKindAbort byte = 0x07 // either direction: cancel + reason + streamKindAbort byte = 0x07 // either direction: cancel + reason ) // Defaults. Chunk size is deliberately held below 64 KiB: on the Mac↔GCP-VM @@ -61,11 +61,11 @@ const ( // blackhole heuristic does not flip the link mid-transfer. Window bounds the // in-flight (unacked) bytes; 16 × 48 KiB = 768 KiB. const ( - StreamChunkSize = 48 * 1024 - streamWindow = 16 - streamNegTimeout = 5 * time.Second // wait for INIT-ACK before falling back to TypeFile + StreamChunkSize = 48 * 1024 + streamWindow = 16 + streamNegTimeout = 5 * time.Second // wait for INIT-ACK before falling back to TypeFile streamStepTimeout = 60 * time.Second // max wait for an ACK / the final COMPLETE - transferIDLen = 16 + transferIDLen = 16 ) // ErrStreamUnsupported is returned by SendFileStream when the peer does not @@ -392,32 +392,56 @@ type StreamReceiver struct { onSaved func(name, path string, size int64) nameSuffix func(base string) string // produces the final unique filename + // quotaBytes caps total on-disk bytes under receivedDir (completed files + // + .partial fragments). Enforced on INIT (declared size) and on every + // chunk write so a peer cannot overrun the disk mid-stream. Zero ⇒ no + // quota. + quotaBytes int64 + mu sync.Mutex transfers map[[transferIDLen]byte]*recvTransfer } type recvTransfer struct { - file *os.File - partial string - name string - size uint64 - hash [32]byte - cursor uint64 // highest contiguous byte written - pending map[uint64][]byte // out-of-order chunks held until contiguous + file *os.File + partial string + name string + size uint64 + hash [32]byte + cursor uint64 // highest contiguous byte written + pending map[uint64][]byte // out-of-order chunks held until contiguous pendBytes int + // quotaBudget is the number of additional bytes this transfer may still + // write to disk before tripping the receiver quota. Snapshotted at INIT + // (quota minus everything already on disk other than this .partial) and + // debited as contiguous chunks land. -1 ⇒ unlimited (quota disabled). + quotaBudget int64 } // NewStreamReceiver builds a receiver writing into receivedDir. nameSuffix // maps a base filename to a final unique name (nil ⇒ a timestamped default). -// onSaved (nil ok) fires after a verified file is renamed into place. +// onSaved (nil ok) fires after a verified file is renamed into place. No disk +// quota is enforced — use NewStreamReceiverWithQuota to bound on-disk bytes. func NewStreamReceiver(receivedDir string, nameSuffix func(base string) string, onSaved func(name, path string, size int64)) *StreamReceiver { + return NewStreamReceiverWithQuota(receivedDir, nameSuffix, onSaved, 0) +} + +// NewStreamReceiverWithQuota is NewStreamReceiver with a disk quota: +// quotaBytes caps the total on-disk bytes under receivedDir (completed files +// plus retained .partial fragments). Enforced on INIT and on every chunk +// write so a peer cannot fill the disk mid-stream. Zero ⇒ unlimited. +func NewStreamReceiverWithQuota(receivedDir string, nameSuffix func(base string) string, onSaved func(name, path string, size int64), quotaBytes int64) *StreamReceiver { if nameSuffix == nil { nameSuffix = defaultStreamName } + if quotaBytes < 0 { + quotaBytes = 0 + } return &StreamReceiver{ receivedDir: receivedDir, onSaved: onSaved, nameSuffix: nameSuffix, + quotaBytes: quotaBytes, transfers: make(map[[transferIDLen]byte]*recvTransfer), } } @@ -467,6 +491,24 @@ func (sr *StreamReceiver) handleInit(id [transferIDLen]byte, body []byte) *Frame } partial := filepath.Join(partialDir, hex.EncodeToString(id[:])) + // Quota gate: reject up front if the declared size cannot fit alongside + // what is already on disk (excluding this transfer's own .partial, which + // resume credits back). Bounds disk use before a single chunk lands. + quotaBudget := int64(-1) + if sr.quotaBytes > 0 { + used := dirSizeExcluding(sr.receivedDir, partial) + budget := sr.quotaBytes - used + if budget < 0 { + budget = 0 + } + if int64(size) > budget { + return encodeComplete(id, false, + fmt.Sprintf("receiver disk quota exceeded: need %d, %d of %d available", + size, budget, sr.quotaBytes)) + } + quotaBudget = budget + } + file, err := os.OpenFile(partial, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return encodeComplete(id, false, "open partial: "+err.Error()) @@ -489,13 +531,14 @@ func (sr *StreamReceiver) handleInit(id [transferIDLen]byte, body []byte) *Frame _ = old.file.Close() } sr.transfers[id] = &recvTransfer{ - file: file, - partial: partial, - name: sanitizeBase(name), - size: size, - hash: hash, - cursor: resume, - pending: make(map[uint64][]byte), + file: file, + partial: partial, + name: sanitizeBase(name), + size: size, + hash: hash, + cursor: resume, + pending: make(map[uint64][]byte), + quotaBudget: quotaBudget, } sr.mu.Unlock() @@ -548,8 +591,16 @@ func (sr *StreamReceiver) handleChunk(id [transferIDLen]byte, body []byte) *Fram } // writeAt appends a contiguous chunk at off (== cursor) and advances cursor. -// Caller holds sr.mu. +// Caller holds sr.mu. Debits the per-transfer disk-quota budget and refuses +// the write if it would overrun, so a peer that lies about its declared size +// (sends more chunk bytes than INIT promised) still cannot exceed the quota. func (sr *StreamReceiver) writeAt(t *recvTransfer, off uint64, data []byte) error { + if t.quotaBudget >= 0 { + if int64(len(data)) > t.quotaBudget { + return fmt.Errorf("receiver disk quota exceeded at offset %d", off) + } + t.quotaBudget -= int64(len(data)) + } if _, err := t.file.WriteAt(data, int64(off)); err != nil { return fmt.Errorf("write at %d: %w", off, err) } @@ -628,6 +679,27 @@ func (sr *StreamReceiver) Close() { sr.mu.Unlock() } +// dirSizeExcluding sums the on-disk size of every regular file under dir, +// recursively, skipping the single file at excludePath (this transfer's own +// .partial, whose resume bytes are credited back into the budget). A missing +// directory counts as zero. Best-effort: unreadable entries are skipped. +func dirSizeExcluding(dir, excludePath string) int64 { + var total int64 + _ = filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { + if err != nil || d.IsDir() { + return nil + } + if path == excludePath { + return nil + } + if info, ierr := d.Info(); ierr == nil { + total += info.Size() + } + return nil + }) + return total +} + func sanitizeBase(name string) string { b := filepath.Base(name) if b == "." || b == "/" || b == "" { diff --git a/service.go b/service.go index b6b7599..c500d23 100644 --- a/service.go +++ b/service.go @@ -16,6 +16,7 @@ import ( "sort" "sync/atomic" "time" + "unicode/utf8" "github.com/pilot-protocol/common/coreapi" "github.com/pilot-protocol/common/protocol" @@ -38,10 +39,79 @@ type ServiceConfig struct { // disk indefinitely. InboxMaxFiles int // InboxMaxBytes caps the total on-disk bytes used by the inbox. - // When > 0, saveInboxMessage checks the accumulated size before - // every write and evictExpandOverflow uses bytes instead of file - // count. Zero ⇒ no byte cap (backward-compatible). + // saveInboxMessage checks the accumulated size before every write and + // evicts oldest-first when over. Zero ⇒ DefaultInboxMaxBytes; a + // negative value disables the byte cap entirely (escape hatch). InboxMaxBytes int64 + // ReceivedMaxBytes caps the total on-disk bytes used by the received- + // files directory (completed TypeFile / TypeFileStream files plus any + // retained .partial fragments). Enforced before every write so a peer + // cannot fill the disk. Zero ⇒ DefaultReceivedMaxBytes; a negative + // value disables the quota entirely (escape hatch). + ReceivedMaxBytes int64 + // IdleTimeout bounds how long a single connection may sit without + // delivering the next frame. Reset before every read, so a slowloris + // peer that opens a connection and dribbles (or stalls) is dropped + // instead of pinning the goroutine and its buffers indefinitely. + // Zero ⇒ DefaultIdleTimeout; a negative value disables the deadline. + IdleTimeout time.Duration +} + +// Defensible defaults applied when the corresponding ServiceConfig field +// is left at its zero value. A negative field value disables the limit. +const ( + // DefaultInboxMaxBytes caps inbox JSON at 256 MiB total. + DefaultInboxMaxBytes int64 = 256 << 20 + // DefaultReceivedMaxBytes caps received files + partials at 2 GiB total. + DefaultReceivedMaxBytes int64 = 2 << 30 + // DefaultIdleTimeout drops a connection idle for 2 minutes between frames. + DefaultIdleTimeout = 2 * time.Minute +) + +// readDeadliner is the optional deadline surface a Stream may expose. The +// production transport (*driver.Conn) implements it; test pipes generally do +// not, so we type-assert and skip the deadline when it is unavailable. +type readDeadliner interface { + SetReadDeadline(time.Time) error +} + +// effectiveInboxMaxBytes resolves the configured inbox byte cap: zero ⇒ +// default, negative ⇒ disabled (returns 0). +func (s *Service) effectiveInboxMaxBytes() int64 { + switch { + case s.cfg.InboxMaxBytes < 0: + return 0 + case s.cfg.InboxMaxBytes == 0: + return DefaultInboxMaxBytes + default: + return s.cfg.InboxMaxBytes + } +} + +// effectiveReceivedMaxBytes resolves the received-files quota: zero ⇒ +// default, negative ⇒ disabled (returns 0). +func (s *Service) effectiveReceivedMaxBytes() int64 { + switch { + case s.cfg.ReceivedMaxBytes < 0: + return 0 + case s.cfg.ReceivedMaxBytes == 0: + return DefaultReceivedMaxBytes + default: + return s.cfg.ReceivedMaxBytes + } +} + +// effectiveIdleTimeout resolves the per-connection idle deadline: zero ⇒ +// default, negative ⇒ disabled (returns 0). +func (s *Service) effectiveIdleTimeout() time.Duration { + switch { + case s.cfg.IdleTimeout < 0: + return 0 + case s.cfg.IdleTimeout == 0: + return DefaultIdleTimeout + default: + return s.cfg.IdleTimeout + } } // inboxEvictCheckEvery: only run the eviction-scan once every N saves @@ -153,7 +223,15 @@ func (s *Service) handleConn(ctx context.Context, conn coreapi.Stream) { } } + idle := s.effectiveIdleTimeout() + dl, canDeadline := conn.(readDeadliner) for { + // Reset the idle/read deadline before every frame so a slowloris + // peer that opens a connection and then dribbles or stalls is torn + // down instead of holding the goroutine and its buffers forever. + if canDeadline && idle > 0 { + _ = dl.SetReadDeadline(time.Now().Add(idle)) + } frame, err := ReadFrame(conn) // Capture right after the IO read so receiver-side timestamps are as // close to the wire as possible. @@ -183,7 +261,7 @@ func (s *Service) handleConn(ctx context.Context, conn coreapi.Stream) { _ = WriteFrame(conn, &Frame{Type: TypeText, Payload: []byte("ERR mkdir: " + mderr.Error())}) return } - sr = NewStreamReceiver(dir, streamNameSuffix, streamOnSaved) + sr = NewStreamReceiverWithQuota(dir, streamNameSuffix, streamOnSaved, s.effectiveReceivedMaxBytes()) } if resp := sr.HandleFrame(frame); resp != nil { if werr := WriteFrame(conn, resp); werr != nil { @@ -281,6 +359,27 @@ func (s *Service) saveReceivedFile(frame *Frame) error { return fmt.Errorf("mkdir: %w", err) } + // Disk quota: reject the file up front if storing it would push the + // received directory (completed files + retained .partial fragments) + // past the quota, so a peer cannot fill the disk one file at a time. + if quota := s.effectiveReceivedMaxBytes(); quota > 0 { + current, _ := dirTotalBytes(dir) + if current+int64(len(frame.Payload)) > quota { + slog.Warn("received-files quota exceeded", + "current_bytes", current, "max_bytes", quota, + "frame_bytes", len(frame.Payload)) + if s.deps.Events != nil { + s.deps.Events.Publish("received.full", map[string]any{ + "type": TypeName(frame.Type), + "frame_bytes": len(frame.Payload), + "max_bytes": quota, + }) + } + return fmt.Errorf("received-files quota exceeded: %d + %d > %d", + current, len(frame.Payload), quota) + } + } + safeName := filepath.Base(frame.Filename) ts := time.Now().Format("20060102-150405.000") seq := s.seq.Add(1) @@ -312,31 +411,32 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error { return fmt.Errorf("mkdir: %w", err) } - // Byte-budget check: if InboxMaxBytes is set, confirm there is room - // BEFORE writing. Evict if over, then re-check. - if s.cfg.InboxMaxBytes > 0 { + // Byte-budget check: confirm there is room BEFORE writing. Evict if + // over, then re-check. The cap is always on (defaulted) unless the + // operator explicitly disables it with a negative config value. + if maxBytes := s.effectiveInboxMaxBytes(); maxBytes > 0 { current, _ := inboxTotalBytes(dir) // Estimate the JSON overhead: type, from, data, bytes, received_at, // and possibly data_b64. estimated := int64(len(frame.Payload)) + 256 - if current+estimated > s.cfg.InboxMaxBytes { + if current+estimated > maxBytes { s.evictInboxOverflowByBytes(dir) after, _ := inboxTotalBytes(dir) - if after+estimated > s.cfg.InboxMaxBytes { + if after+estimated > maxBytes { slog.Warn("inbox byte budget exceeded after eviction", "current_bytes", after, - "max_bytes", s.cfg.InboxMaxBytes, + "max_bytes", maxBytes, "frame_bytes", len(frame.Payload)) if s.deps.Events != nil { s.deps.Events.Publish("inbox.full", map[string]any{ "from": from.String(), "type": TypeName(frame.Type), "frame_bytes": len(frame.Payload), - "max_bytes": s.cfg.InboxMaxBytes, + "max_bytes": maxBytes, }) } return fmt.Errorf("inbox byte budget exceeded: %d + %d > %d", - after, estimated, s.cfg.InboxMaxBytes) + after, estimated, maxBytes) } } } @@ -348,10 +448,19 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error { "bytes": len(frame.Payload), "received_at": ts.Format(time.RFC3339Nano), } - if s.cfg.IncludeBase64 { + // Store the payload losslessly. JSON cannot represent arbitrary bytes in + // a string — invalid UTF-8 is replaced with U+FFFD by encoding/json — so + // any payload that is not valid UTF-8 (all binary frames, in practice) + // is written as base64 instead of being silently corrupted. IncludeBase64 + // forces base64 for every payload (back-compat for consumers that always + // expect data_b64). A `data_encoding` field tells the reader which form + // to expect. + if s.cfg.IncludeBase64 || frame.Type == TypeBinary || !utf8.Valid(frame.Payload) { msg["data_b64"] = base64.StdEncoding.EncodeToString(frame.Payload) + msg["data_encoding"] = "base64" } else { msg["data"] = string(frame.Payload) + msg["data_encoding"] = "utf8" } data, err := json.Marshal(msg) if err != nil { @@ -492,3 +601,29 @@ func inboxTotalBytes(dir string) (int64, error) { } return total, nil } + +// dirTotalBytes sums the on-disk size of every regular file under dir, +// recursively — so the received-files quota accounts for both completed +// files and the .partial fragments of in-flight streamed transfers. A +// missing directory counts as zero (not yet created). +func dirTotalBytes(dir string) (int64, error) { + var total int64 + err := filepath.WalkDir(dir, func(_ string, d os.DirEntry, err error) error { + if err != nil { + return nil // skip unreadable entries; best-effort accounting + } + if d.IsDir() { + return nil + } + info, ierr := d.Info() + if ierr != nil { + return nil + } + total += info.Size() + return nil + }) + if os.IsNotExist(err) { + return 0, nil + } + return total, err +} diff --git a/service_disabled.go b/service_disabled.go index e390636..e865588 100644 --- a/service_disabled.go +++ b/service_disabled.go @@ -12,17 +12,23 @@ package dataexchange import ( "context" + "time" "github.com/pilot-protocol/common/coreapi" ) // ServiceConfig mirrors the real ServiceConfig so cmd/daemon's // dataexchange.NewService(dataexchange.ServiceConfig{...}) call site -// compiles unchanged when the plugin is disabled. +// compiles unchanged when the plugin is disabled. Field set kept in sync +// with the real ServiceConfig in service.go. type ServiceConfig struct { - ReceivedDir string - InboxDir string - IncludeBase64 bool + ReceivedDir string + InboxDir string + IncludeBase64 bool + InboxMaxFiles int + InboxMaxBytes int64 + ReceivedMaxBytes int64 + IdleTimeout time.Duration } // Service is a no-op replacement for the real plugin Service. diff --git a/zz_harden_test.go b/zz_harden_test.go new file mode 100644 index 0000000..35163e0 --- /dev/null +++ b/zz_harden_test.go @@ -0,0 +1,424 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +//go:build !no_dataexchange +// +build !no_dataexchange + +package dataexchange + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "errors" + "io" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/pilot-protocol/common/coreapi" + "github.com/pilot-protocol/common/protocol" +) + +// --- item 1: oversized declared frame rejected without a huge allocation --- + +// hugeDeclaredReader presents a frame header that declares `declared` payload +// bytes but only ever yields `avail` of them. It records the largest single +// Read buffer ReadFrame asks for, so the test can prove ReadFrame never sizes +// a single allocation to the attacker-declared length. +type hugeDeclaredReader struct { + hdr []byte + hdrPos int + avail int64 // payload bytes we are willing to emit + emitted int64 + maxBufAsk int +} + +func (r *hugeDeclaredReader) Read(p []byte) (int, error) { + if len(p) > r.maxBufAsk { + r.maxBufAsk = len(p) + } + // Serve the header first. + if r.hdrPos < len(r.hdr) { + n := copy(p, r.hdr[r.hdrPos:]) + r.hdrPos += n + return n, nil + } + if r.emitted >= r.avail { + // Declared more than we will ever send: stall as EOF so ReadFrame + // returns a short-read error rather than spinning. + return 0, io.EOF + } + n := len(p) + if int64(n) > r.avail-r.emitted { + n = int(r.avail - r.emitted) + } + for i := 0; i < n; i++ { + p[i] = 0 + } + r.emitted += int64(n) + return n, nil +} + +func TestReadFrame_OversizedDeclaredNoHugeAlloc(t *testing.T) { + t.Parallel() + // Declare a frame just under the cap (so the cap check passes) but never + // deliver the bytes. A correct implementation must NOT allocate the + // declared size up front; it should grow incrementally and ultimately + // fail with a short read. + declared := MaxFrameSize - 1 + var hdr [8]byte + binary.BigEndian.PutUint32(hdr[0:4], TypeBinary) + binary.BigEndian.PutUint32(hdr[4:8], declared) + + r := &hugeDeclaredReader{hdr: hdr[:], avail: 0} + _, err := ReadFrame(r) + if err == nil { + t.Fatal("expected error reading a frame whose declared bytes never arrive") + } + if !errors.Is(err, io.ErrUnexpectedEOF) { + t.Fatalf("err = %v, want io.ErrUnexpectedEOF", err) + } + // The crux: ReadFrame must never ask for a single buffer anywhere near + // the declared length. The bounded initial reservation is 64 KiB, so any + // single Read ask should stay within a small multiple of that. + if int64(r.maxBufAsk) >= int64(declared) { + t.Fatalf("ReadFrame requested a %d-byte buffer for a %d-declared frame; "+ + "it must not pre-allocate the attacker-declared size", r.maxBufAsk, declared) + } + if r.maxBufAsk > readBoundedInitialCap*4 { + t.Fatalf("max single read ask = %d, want <= %d (bounded growth expected)", + r.maxBufAsk, readBoundedInitialCap*4) + } +} + +// TestReadFrame_OverCapStillRejected keeps the header-level cap guarantee: +// a frame whose declared length exceeds the cap is rejected before any +// payload read. +func TestReadFrame_OverCapStillRejected(t *testing.T) { + t.Parallel() + var hdr [8]byte + binary.BigEndian.PutUint32(hdr[0:4], TypeBinary) + binary.BigEndian.PutUint32(hdr[4:8], MaxFrameSize+1) + _, err := ReadFrame(bytes.NewReader(hdr[:])) + if err == nil || !bytes.Contains([]byte(err.Error()), []byte("too large")) { + t.Fatalf("err = %v, want 'frame too large'", err) + } +} + +// --- item 6: over-long filename rejected on the writer side ----------------- + +func TestWriteFrame_OverLongFilenameRejected(t *testing.T) { + t.Parallel() + long := string(bytes.Repeat([]byte("a"), maxFilenameLen+1)) + var buf bytes.Buffer + err := WriteFrame(&buf, &Frame{Type: TypeFile, Filename: long, Payload: []byte("x")}) + if err == nil { + t.Fatal("expected WriteFrame to reject an over-long filename before the uint16 cast") + } + if buf.Len() != 0 { + t.Fatalf("WriteFrame wrote %d bytes despite rejecting the filename", buf.Len()) + } + + // A name longer than 65535 bytes would have wrapped the uint16 length + // field; confirm that is rejected too (and never truncated onto the wire). + wrapping := string(bytes.Repeat([]byte("b"), 1<<16+10)) + buf.Reset() + if err := WriteFrame(&buf, &Frame{Type: TypeFile, Filename: wrapping, Payload: []byte("x")}); err == nil { + t.Fatal("expected WriteFrame to reject a name that would wrap the uint16 length") + } + + // A name at the limit still works and round-trips. + ok := string(bytes.Repeat([]byte("c"), maxFilenameLen)) + buf.Reset() + if err := WriteFrame(&buf, &Frame{Type: TypeFile, Filename: ok, Payload: []byte("ok")}); err != nil { + t.Fatalf("WriteFrame rejected a max-length name: %v", err) + } + got, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if got.Filename != ok { + t.Fatalf("filename round-trip: got %q (len %d), want len %d", got.Filename, len(got.Filename), maxFilenameLen) + } +} + +// --- item 5: binary payloads round-trip losslessly through the inbox -------- + +func TestSaveInboxMessage_BinaryLossless(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + s := NewService(ServiceConfig{InboxDir: tmp}) // IncludeBase64 off by default + + // Bytes that are NOT valid UTF-8 — would be mangled to U+FFFD if stored + // as a JSON string. + payload := []byte{0x00, 0xFF, 0xDE, 0xAD, 0xBE, 0xEF, 0x80, 0xC3, 0x28} + if err := s.saveInboxMessage(&Frame{Type: TypeBinary, Payload: payload}, protocol.Addr{Node: 1}); err != nil { + t.Fatalf("saveInboxMessage: %v", err) + } + entries, _ := os.ReadDir(tmp) + if len(entries) != 1 { + t.Fatalf("inbox files = %d, want 1", len(entries)) + } + body, _ := os.ReadFile(filepath.Join(tmp, entries[0].Name())) + var msg map[string]any + if err := json.Unmarshal(body, &msg); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if msg["data_encoding"] != "base64" { + t.Fatalf("data_encoding = %v, want base64 for binary payload", msg["data_encoding"]) + } + b64, ok := msg["data_b64"].(string) + if !ok { + t.Fatalf("data_b64 missing/wrong type for binary payload: %v", msg["data_b64"]) + } + decoded, err := base64.StdEncoding.DecodeString(b64) + if err != nil { + t.Fatalf("decode data_b64: %v", err) + } + if !bytes.Equal(decoded, payload) { + t.Fatalf("binary payload corrupted: got %x, want %x", decoded, payload) + } + // And it must NOT have been written as a lossy string field. + if _, hasRaw := msg["data"]; hasRaw { + t.Errorf("binary payload also stored as lossy 'data' string: %v", msg["data"]) + } +} + +// --- item 3: inbox total-byte cap enforced on receipt ---------------------- + +func TestSaveInboxMessage_ByteCapEnforced(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + // Small cap so a couple of writes blow past it. Each message is the + // payload + ~256 estimated JSON overhead, so cap at ~1 KiB. + s := NewService(ServiceConfig{InboxDir: tmp, InboxMaxBytes: 1024}) + + from := protocol.Addr{Node: 1} + payload := bytes.Repeat([]byte("x"), 400) // valid UTF-8 + + // First write fits. + if err := s.saveInboxMessage(&Frame{Type: TypeText, Payload: payload}, from); err != nil { + t.Fatalf("first write should fit: %v", err) + } + // Keep writing; the cap must eventually reject (after eviction can no + // longer make room, which here happens because every file is ~same size + // and the cap holds only ~1-2 of them). + rejected := false + for i := 0; i < 20; i++ { + if err := s.saveInboxMessage(&Frame{Type: TypeText, Payload: payload}, from); err != nil { + rejected = true + break + } + } + if !rejected { + t.Fatal("expected the inbox byte cap to reject a write once full") + } + // On-disk total must never have exceeded the cap by more than one message. + total, _ := inboxTotalBytes(tmp) + if total > 1024+int64(len(payload))+512 { + t.Fatalf("inbox grew to %d bytes, well past the 1024 cap", total) + } +} + +// TestEffectiveInboxMaxBytes_Defaults checks the defaulting / disable logic. +func TestEffectiveInboxMaxBytes_Defaults(t *testing.T) { + t.Parallel() + if got := (&Service{cfg: ServiceConfig{}}).effectiveInboxMaxBytes(); got != DefaultInboxMaxBytes { + t.Errorf("zero ⇒ %d, want default %d", got, DefaultInboxMaxBytes) + } + if got := (&Service{cfg: ServiceConfig{InboxMaxBytes: -1}}).effectiveInboxMaxBytes(); got != 0 { + t.Errorf("negative ⇒ %d, want 0 (disabled)", got) + } + if got := (&Service{cfg: ServiceConfig{InboxMaxBytes: 99}}).effectiveInboxMaxBytes(); got != 99 { + t.Errorf("explicit ⇒ %d, want 99", got) + } +} + +// --- item 4: received-files quota enforced ---------------------------------- + +func TestSaveReceivedFile_QuotaEnforced(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + s := NewService(ServiceConfig{ReceivedDir: tmp, ReceivedMaxBytes: 1000}) + + // First 600-byte file fits. + if err := s.saveReceivedFile(&Frame{Type: TypeFile, Filename: "a.bin", Payload: bytes.Repeat([]byte("a"), 600)}); err != nil { + t.Fatalf("first file should fit: %v", err) + } + // Second 600-byte file would push past the 1000-byte quota. + if err := s.saveReceivedFile(&Frame{Type: TypeFile, Filename: "b.bin", Payload: bytes.Repeat([]byte("b"), 600)}); err == nil { + t.Fatal("expected the received-files quota to reject the second file") + } + total, _ := dirTotalBytes(tmp) + if total > 1000 { + t.Fatalf("received dir grew to %d bytes, past the 1000 quota", total) + } +} + +// TestStreamReceiver_QuotaRejectsOversizedInit proves the streamed path also +// honours the disk quota: an INIT declaring more than the quota is rejected +// before any chunk is written. +func TestStreamReceiver_QuotaRejectsOversizedInit(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + sr := NewStreamReceiverWithQuota(tmp, nil, nil, 500) + defer sr.Close() + + var id [transferIDLen]byte + id[0] = 0xAB + var hash [32]byte + // Declare a 10_000-byte transfer against a 500-byte quota. + resp := sr.HandleFrame(encodeInit(id, 10_000, hash, StreamChunkSize, "big.bin")) + if resp == nil { + t.Fatal("expected a COMPLETE response rejecting the oversized INIT") + } + ok, msg := decodeComplete(resp.Payload[1+transferIDLen:]) + if ok { + t.Fatalf("oversized INIT was accepted; msg=%q", msg) + } + if !bytes.Contains([]byte(msg), []byte("quota")) { + t.Fatalf("rejection message = %q, want it to mention the quota", msg) + } + // No partial should have been left on disk for this transfer. + total, _ := dirTotalBytes(tmp) + if total != 0 { + t.Fatalf("quota-rejected transfer left %d bytes on disk", total) + } +} + +// --- item 2: per-connection read deadline fires on a slowloris peer --------- + +// deadlineStream models *driver.Conn's deadline behaviour: Read blocks until +// either bytes arrive on `in` or the read deadline elapses, in which case it +// returns os.ErrDeadlineExceeded. This is the surface the production transport +// exposes and the handler relies on. +type deadlineStream struct { + mu sync.Mutex + in chan []byte + out *bytes.Buffer + buf []byte + deadline time.Time + closed chan struct{} + closeOnce sync.Once +} + +func newDeadlineStream() *deadlineStream { + return &deadlineStream{in: make(chan []byte, 16), out: &bytes.Buffer{}, closed: make(chan struct{})} +} + +func (s *deadlineStream) Read(p []byte) (int, error) { + if len(s.buf) > 0 { + n := copy(p, s.buf) + s.buf = s.buf[n:] + return n, nil + } + s.mu.Lock() + dl := s.deadline + s.mu.Unlock() + var timer <-chan time.Time + if !dl.IsZero() { + if !time.Now().Before(dl) { + return 0, os.ErrDeadlineExceeded + } + t := time.NewTimer(time.Until(dl)) + defer t.Stop() + timer = t.C + } + select { + case data, ok := <-s.in: + if !ok { + return 0, io.EOF + } + n := copy(p, data) + if n < len(data) { + s.buf = data[n:] + } + return n, nil + case <-timer: + return 0, os.ErrDeadlineExceeded + case <-s.closed: + return 0, io.EOF + } +} + +func (s *deadlineStream) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.out.Write(p) +} +func (s *deadlineStream) Close() error { + s.closeOnce.Do(func() { close(s.closed) }) + return nil +} +func (s *deadlineStream) LocalAddr() coreapi.Addr { return protocol.Addr{} } +func (s *deadlineStream) LocalPort() uint16 { return 1001 } +func (s *deadlineStream) RemoteAddr() coreapi.Addr { return protocol.Addr{Node: 0xBAD} } +func (s *deadlineStream) RemotePort() uint16 { return 5555 } +func (s *deadlineStream) SetDeadline(t time.Time) error { + return s.SetReadDeadline(t) +} +func (s *deadlineStream) SetReadDeadline(t time.Time) error { + s.mu.Lock() + s.deadline = t + s.mu.Unlock() + return nil +} +func (s *deadlineStream) SetWriteDeadline(time.Time) error { return nil } + +func TestHandleConn_ReadDeadlineFires(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + // Tiny idle timeout — a peer that connects and then never sends a frame + // must be torn down. + svc := NewService(ServiceConfig{InboxDir: tmp, ReceivedDir: tmp, IdleTimeout: 100 * time.Millisecond}) + svc.deps = coreapi.Deps{} + + stream := newDeadlineStream() // never feeds any bytes ⇒ slowloris + done := make(chan struct{}) + go func() { + svc.handleConn(context.Background(), stream) + close(done) + }() + + select { + case <-done: + // handleConn returned because the read deadline fired — good. + case <-time.After(3 * time.Second): + stream.Close() + t.Fatal("handleConn did not return: read deadline never fired on an idle (slowloris) connection") + } +} + +// TestHandleConn_NoDeadlineWhenDisabled ensures a negative IdleTimeout opts +// out cleanly (handler still tears down on EOF, not on a deadline). +func TestHandleConn_NoDeadlineWhenDisabled(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + svc := NewService(ServiceConfig{InboxDir: tmp, ReceivedDir: tmp, IdleTimeout: -1}) + svc.deps = coreapi.Deps{} + + stream := newDeadlineStream() + done := make(chan struct{}) + go func() { + svc.handleConn(context.Background(), stream) + close(done) + }() + // With the deadline disabled the handler should still be blocked after a + // short wait (no premature teardown). + select { + case <-done: + t.Fatal("handleConn returned despite the idle deadline being disabled and no EOF") + case <-time.After(300 * time.Millisecond): + } + // Closing the stream EOFs the read loop and unblocks the handler. + stream.Close() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("handleConn did not return after Close") + } +} diff --git a/zz_max_frame_env_test.go b/zz_max_frame_env_test.go index 51ae235..8a8544b 100644 --- a/zz_max_frame_env_test.go +++ b/zz_max_frame_env_test.go @@ -14,8 +14,8 @@ import "testing" // review. func TestMaxFrameSize_DefaultAndConstants(t *testing.T) { t.Parallel() - if DefaultMaxFrameSize != 1<<30 { - t.Errorf("DefaultMaxFrameSize = %d; want %d (1 GiB)", DefaultMaxFrameSize, 1<<30) + if DefaultMaxFrameSize != 64<<20 { + t.Errorf("DefaultMaxFrameSize = %d; want %d (64 MiB)", DefaultMaxFrameSize, 64<<20) } // When PILOT_DATAEXCHANGE_MAX_FRAME is unset (as it is in CI), // MaxFrameSize must equal the default. A non-default value here