Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@ All notable changes to this project are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Security / hardening

- Lower the default per-frame cap (`DefaultMaxFrameSize`) from 1 GiB to
64 MiB and stop pre-allocating the attacker-declared frame size in
`ReadFrame` — payloads now grow incrementally as bytes arrive, so a single
hostile peer can no longer OOM the receiver by announcing a giant frame.
Raise the cap with `PILOT_DATAEXCHANGE_MAX_FRAME` (both ends must agree).
- Add a per-connection idle/read deadline (`ServiceConfig.IdleTimeout`,
default 2 min) reset before every frame so slowloris peers can no longer
hold connections open indefinitely.
- Add a total-byte inbox cap (`ServiceConfig.InboxMaxBytes`, default 256 MiB)
enforced on every receipt, alongside the existing file-count cap.
- Add a received-files disk quota (`ServiceConfig.ReceivedMaxBytes`, default
2 GiB) covering completed files and retained `.partial` stream fragments,
enforced before every write (legacy `TypeFile` and chunked `TypeFileStream`).
- Store binary / non-UTF-8 inbox payloads as base64 (`data_b64`) by default
with a `data_encoding` marker, instead of silently corrupting them into a
lossy JSON string.
- Reject over-long filenames in `WriteFrame` before the `uint16` length cast
so a name cannot wrap/truncate onto the wire.

A negative value for any of `InboxMaxBytes`, `ReceivedMaxBytes`, or
`IdleTimeout` disables that limit (escape hatch); zero selects the default.

## [v0.1.0]

Initial release.
81 changes: 68 additions & 13 deletions dataexchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package dataexchange

import (
"bytes"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -68,19 +69,27 @@ func ReadTracePayload(f *Frame) (*TraceFrame, error) {
// maxFilenameLen limits filename length to prevent abuse.
const maxFilenameLen = 255

// DefaultMaxFrameSize caps a single data-exchange frame at 1 GiB.
// DefaultMaxFrameSize caps a single data-exchange frame at 64 MiB.
//
// The wire format's length field is a uint32 (see Frame docstring below),
// so the absolute ceiling is ~4 GiB; this cap exists purely to bound the
// memory the receiver allocates for any single transfer. Raising it
// trades RAM for the ability to ship larger artefacts in one frame.
// so the absolute ceiling is ~4 GiB; this cap exists to bound the memory a
// single hostile peer can make the receiver commit for one transfer. The
// receiver no longer pre-allocates the attacker-declared length up front
// (see ReadFrame) — it grows the buffer as bytes actually arrive — but the
// cap still bounds the steady-state worst case, so it is deliberately set
// well below the wire ceiling.
//
// History: was 256 MiB pre-2026-06-14 (sized for the test fleet's 100 MiB
// payloads). Raised after operators on multi-GiB-RAM hosts hit the cap on
// real workloads; the chunked streaming protocol described in
// docs/PROPOSAL-reliable-file-transfer.md (web4) will eventually remove
// the need for a per-frame cap entirely.
const DefaultMaxFrameSize uint32 = 1 << 30
// 64 MiB comfortably covers inbox messages and the deprecated legacy
// single-frame TypeFile path; anything larger should ride the chunked
// TypeFileStream protocol (filestream.go), which never buffers a whole
// file in memory. Operators who genuinely need bigger single frames can
// raise the cap via PILOT_DATAEXCHANGE_MAX_FRAME (both ends must agree).
//
// History: 256 MiB pre-2026-06-14, then briefly 1 GiB (sized for the test
// fleet's 100 MiB payloads on multi-GiB-RAM hosts). 1 GiB let a single
// hostile peer OOM the receiver, so the default was lowered to 64 MiB and
// the up-front allocation was removed.
const DefaultMaxFrameSize uint32 = 64 << 20

// MaxFrameSize is the runtime-effective frame cap. Set at package init
// from the PILOT_DATAEXCHANGE_MAX_FRAME env var (in bytes) if present
Expand Down Expand Up @@ -118,8 +127,14 @@ type Frame struct {
func WriteFrame(w io.Writer, f *Frame) error {
payload := f.Payload
if f.Type == TypeFile {
// Prepend filename
// Prepend filename. Validate the length BEFORE the uint16 cast: a
// name longer than 65535 bytes would wrap the 2-byte length field
// and silently truncate, and any name over maxFilenameLen is
// rejected by ReadFrame anyway — fail fast on the writer side.
name := []byte(f.Filename)
if len(name) > maxFilenameLen {
return fmt.Errorf("filename too long: %d bytes (max %d)", len(name), maxFilenameLen)
}
payload = make([]byte, 2+len(name)+len(f.Payload))
binary.BigEndian.PutUint16(payload[0:2], uint16(len(name)))
copy(payload[2:], name)
Expand Down Expand Up @@ -149,8 +164,14 @@ func ReadFrame(r io.Reader) (*Frame, error) {
return nil, fmt.Errorf("frame too large: %d (max %d)", length, MaxFrameSize)
}

payload := make([]byte, length)
if _, err := io.ReadFull(r, payload); err != nil {
// Do NOT pre-allocate `length` bytes — that is the attacker-declared
// size, so a single hostile peer could announce a huge (but in-cap)
// frame and never send the bytes, pinning that much RAM per connection.
// Instead grow incrementally as bytes actually arrive, with a bounded
// starting capacity. io.CopyN stops at exactly `length`, and the final
// length is re-checked so a truncated stream surfaces as ErrUnexpectedEOF.
payload, err := readBounded(r, int64(length))
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -183,6 +204,40 @@ func ReadFrame(r io.Reader) (*Frame, error) {
return f, nil
}

// readBoundedInitialCap bounds the initial buffer capacity reserved before
// any payload bytes arrive. A frame can legitimately be up to MaxFrameSize,
// but reserving that much for the declared (untrusted) length is exactly the
// OOM vector we are closing — so we reserve at most this much up front and
// let bytes.Buffer grow geometrically as real data lands.
const readBoundedInitialCap = 64 * 1024

// readBounded reads exactly n bytes from r without trusting n to size the
// initial allocation. It grows a buffer as bytes arrive (capped initial
// reservation), so an attacker who declares a large length but never sends
// the bytes only ties up the small initial buffer, not n bytes. A short
// read returns io.ErrUnexpectedEOF, matching io.ReadFull's contract.
func readBounded(r io.Reader, n int64) ([]byte, error) {
if n == 0 {
return []byte{}, nil
}
initial := n
if initial > readBoundedInitialCap {
initial = readBoundedInitialCap
}
buf := bytes.NewBuffer(make([]byte, 0, initial))
read, err := io.CopyN(buf, r, n)
if err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return nil, err
}
if read != n {
return nil, io.ErrUnexpectedEOF
}
return buf.Bytes(), nil
}

// TypeName returns a human-readable name for a frame type.
func TypeName(t uint32) string {
switch t {
Expand Down
124 changes: 98 additions & 26 deletions filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ import (

// Stream control-frame kinds (the first byte of a TypeFileStream payload).
const (
streamKindInit byte = 0x01 // sender→receiver: filename, size, full hash, chunk size
streamKindChunk byte = 0x02 // sender→receiver: offset + chunk bytes
streamKindAck byte = 0x03 // receiver→sender: highest contiguous offset received
streamKindDone byte = 0x04 // sender→receiver: end of stream; verify full hash
streamKindInitAck byte = 0x05 // receiver→sender: resume offset (presence ⇒ peer supports stream)
streamKindInit byte = 0x01 // sender→receiver: filename, size, full hash, chunk size
streamKindChunk byte = 0x02 // sender→receiver: offset + chunk bytes
streamKindAck byte = 0x03 // receiver→sender: highest contiguous offset received
streamKindDone byte = 0x04 // sender→receiver: end of stream; verify full hash
streamKindInitAck byte = 0x05 // receiver→sender: resume offset (presence ⇒ peer supports stream)
streamKindComplete byte = 0x06 // receiver→sender: final status after DONE
streamKindAbort byte = 0x07 // either direction: cancel + reason
streamKindAbort byte = 0x07 // either direction: cancel + reason
)

// Defaults. Chunk size is deliberately held below 64 KiB: on the Mac↔GCP-VM
Expand All @@ -61,11 +61,11 @@ const (
// blackhole heuristic does not flip the link mid-transfer. Window bounds the
// in-flight (unacked) bytes; 16 × 48 KiB = 768 KiB.
const (
StreamChunkSize = 48 * 1024
streamWindow = 16
streamNegTimeout = 5 * time.Second // wait for INIT-ACK before falling back to TypeFile
StreamChunkSize = 48 * 1024
streamWindow = 16
streamNegTimeout = 5 * time.Second // wait for INIT-ACK before falling back to TypeFile
streamStepTimeout = 60 * time.Second // max wait for an ACK / the final COMPLETE
transferIDLen = 16
transferIDLen = 16
)

// ErrStreamUnsupported is returned by SendFileStream when the peer does not
Expand Down Expand Up @@ -392,32 +392,56 @@ type StreamReceiver struct {
onSaved func(name, path string, size int64)
nameSuffix func(base string) string // produces the final unique filename

// quotaBytes caps total on-disk bytes under receivedDir (completed files
// + .partial fragments). Enforced on INIT (declared size) and on every
// chunk write so a peer cannot overrun the disk mid-stream. Zero ⇒ no
// quota.
quotaBytes int64

mu sync.Mutex
transfers map[[transferIDLen]byte]*recvTransfer
}

type recvTransfer struct {
file *os.File
partial string
name string
size uint64
hash [32]byte
cursor uint64 // highest contiguous byte written
pending map[uint64][]byte // out-of-order chunks held until contiguous
file *os.File
partial string
name string
size uint64
hash [32]byte
cursor uint64 // highest contiguous byte written
pending map[uint64][]byte // out-of-order chunks held until contiguous
pendBytes int
// quotaBudget is the number of additional bytes this transfer may still
// write to disk before tripping the receiver quota. Snapshotted at INIT
// (quota minus everything already on disk other than this .partial) and
// debited as contiguous chunks land. -1 ⇒ unlimited (quota disabled).
quotaBudget int64
}

// NewStreamReceiver builds a receiver writing into receivedDir. nameSuffix
// maps a base filename to a final unique name (nil ⇒ a timestamped default).
// onSaved (nil ok) fires after a verified file is renamed into place.
// onSaved (nil ok) fires after a verified file is renamed into place. No disk
// quota is enforced — use NewStreamReceiverWithQuota to bound on-disk bytes.
func NewStreamReceiver(receivedDir string, nameSuffix func(base string) string, onSaved func(name, path string, size int64)) *StreamReceiver {
return NewStreamReceiverWithQuota(receivedDir, nameSuffix, onSaved, 0)
}

// NewStreamReceiverWithQuota is NewStreamReceiver with a disk quota:
// quotaBytes caps the total on-disk bytes under receivedDir (completed files
// plus retained .partial fragments). Enforced on INIT and on every chunk
// write so a peer cannot fill the disk mid-stream. Zero ⇒ unlimited.
func NewStreamReceiverWithQuota(receivedDir string, nameSuffix func(base string) string, onSaved func(name, path string, size int64), quotaBytes int64) *StreamReceiver {
if nameSuffix == nil {
nameSuffix = defaultStreamName
}
if quotaBytes < 0 {
quotaBytes = 0
}
return &StreamReceiver{
receivedDir: receivedDir,
onSaved: onSaved,
nameSuffix: nameSuffix,
quotaBytes: quotaBytes,
transfers: make(map[[transferIDLen]byte]*recvTransfer),
}
}
Expand Down Expand Up @@ -467,6 +491,24 @@ func (sr *StreamReceiver) handleInit(id [transferIDLen]byte, body []byte) *Frame
}
partial := filepath.Join(partialDir, hex.EncodeToString(id[:]))

// Quota gate: reject up front if the declared size cannot fit alongside
// what is already on disk (excluding this transfer's own .partial, which
// resume credits back). Bounds disk use before a single chunk lands.
quotaBudget := int64(-1)
if sr.quotaBytes > 0 {
used := dirSizeExcluding(sr.receivedDir, partial)
budget := sr.quotaBytes - used
if budget < 0 {
budget = 0
}
if int64(size) > budget {
return encodeComplete(id, false,
fmt.Sprintf("receiver disk quota exceeded: need %d, %d of %d available",
size, budget, sr.quotaBytes))
}
quotaBudget = budget
}

file, err := os.OpenFile(partial, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return encodeComplete(id, false, "open partial: "+err.Error())
Expand All @@ -489,13 +531,14 @@ func (sr *StreamReceiver) handleInit(id [transferIDLen]byte, body []byte) *Frame
_ = old.file.Close()
}
sr.transfers[id] = &recvTransfer{
file: file,
partial: partial,
name: sanitizeBase(name),
size: size,
hash: hash,
cursor: resume,
pending: make(map[uint64][]byte),
file: file,
partial: partial,
name: sanitizeBase(name),
size: size,
hash: hash,
cursor: resume,
pending: make(map[uint64][]byte),
quotaBudget: quotaBudget,
}
sr.mu.Unlock()

Expand Down Expand Up @@ -548,8 +591,16 @@ func (sr *StreamReceiver) handleChunk(id [transferIDLen]byte, body []byte) *Fram
}

// writeAt appends a contiguous chunk at off (== cursor) and advances cursor.
// Caller holds sr.mu.
// Caller holds sr.mu. Debits the per-transfer disk-quota budget and refuses
// the write if it would overrun, so a peer that lies about its declared size
// (sends more chunk bytes than INIT promised) still cannot exceed the quota.
func (sr *StreamReceiver) writeAt(t *recvTransfer, off uint64, data []byte) error {
if t.quotaBudget >= 0 {
if int64(len(data)) > t.quotaBudget {
return fmt.Errorf("receiver disk quota exceeded at offset %d", off)
}
t.quotaBudget -= int64(len(data))
}
if _, err := t.file.WriteAt(data, int64(off)); err != nil {
return fmt.Errorf("write at %d: %w", off, err)
}
Expand Down Expand Up @@ -628,6 +679,27 @@ func (sr *StreamReceiver) Close() {
sr.mu.Unlock()
}

// dirSizeExcluding sums the on-disk size of every regular file under dir,
// recursively, skipping the single file at excludePath (this transfer's own
// .partial, whose resume bytes are credited back into the budget). A missing
// directory counts as zero. Best-effort: unreadable entries are skipped.
func dirSizeExcluding(dir, excludePath string) int64 {
var total int64
_ = filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
return nil
}
if path == excludePath {
return nil
}
if info, ierr := d.Info(); ierr == nil {
total += info.Size()
}
return nil
})
return total
}

func sanitizeBase(name string) string {
b := filepath.Base(name)
if b == "." || b == "/" || b == "" {
Expand Down
Loading
Loading