Utilities for encoding and decoding frames using async/await.
Find a file
Repository files (latest commit first)
Filename Latest commit message Latest commit date
2026-04-10 13:14:11 +02:00
benches chore: sync dependencies (monorepo) 2026-04-10 13:14:11 +02:00
src chore: sync dependencies (monorepo) 2026-04-10 13:14:11 +02:00
tests chore: sync dependencies (monorepo) 2026-04-10 13:14:11 +02:00
.gitignore Initial Commit 2019-04-25 22:18:11 -04:00
Cargo.toml chore: sync dependencies (monorepo) 2026-04-07 18:06:14 +02:00
LICENSE *: Prepare fork "asynchronous-codec" 2021-01-06 12:04:15 +01:00
README.md chore: sync dependencies (monorepo) 2026-04-10 13:14:11 +02:00

asynchronous-codec

Utilities for encoding and decoding frames using async/await. This library provides adapters to transform byte streams (AsyncRead and AsyncWrite) into framed streams implementing Sink and Stream traits. Framed streams are also known as transports.

This is a fork of futures-codec by Matt Hunzinger, borrowing concepts from tokio-codec.

What This Library Does

This library provides tools for:

  • Encoding structured data into byte frames for transmission
  • Decoding byte frames back into structured data
  • Managing back-pressure on async I/O streams
  • Handling partial data and EOF conditions gracefully
  • Working with multiple serialization formats (JSON, CBOR)

The design follows Rust's async/await patterns with zero-cost abstractions and efficient buffer management.

Installation

Current (Vendored Path-Based)

Currently using vendored crates with path dependencies:

[dependencies]
asynchronous-codec = { path = "../asynchronous-codec" }
bytes = { path = "../bytes" }
futures-util = { path = "../futures-rs/futures-util", features = ["io"] }

# Optional features
asynchronous-codec = { path = "../asynchronous-codec", features = ["json"] }  # For JSON codec
asynchronous-codec = { path = "../asynchronous-codec", features = ["cbor"] }  # For CBOR codec

Future (Git-Based Release System)

Transitioning to git-based dependencies hosted at https://git.sly.so/kade/:

[dependencies]
asynchronous-codec = { git = "https://git.sly.so/kade/asynchronous-codec", branch = "master" }
bytes = { git = "https://git.sly.so/kade/bytes", branch = "master" }
futures-util = { git = "https://git.sly.so/kade/futures-rs", branch = "master", features = ["io"] }

# Optional features
asynchronous-codec = { git = "https://git.sly.so/kade/asynchronous-codec", branch = "master", features = ["json"] }
asynchronous-codec = { git = "https://git.sly.so/kade/asynchronous-codec", branch = "master", features = ["cbor"] }

Note: The git-based release system is under development. See ./vendor/garou-repos.toml for the complete repository database.

Architecture Overview

graph TB
    subgraph "Core Traits"
        Encoder[Encoder Trait<br/>encode]
        Decoder[Decoder Trait<br/>decode, decode_eof]
    end

    subgraph "Framed Adapters"
        Framed[Framed<br/>Stream + Sink]
        FramedRead[FramedRead<br/>Stream only]
        FramedWrite[FramedWrite<br/>Sink only]
    end

    subgraph "Internal Types"
        Fuse[Fuse<br/>I/O + Codec]
        FramedRead2[FramedRead2<br/>Internal impl]
        FramedWrite2[FramedWrite2<br/>Internal impl]
    end

    subgraph "Codecs"
        Bytes[BytesCodec<br/>Pass-through]
        Length[LengthCodec<br/>Length-prefixed]
        Lines[LinesCodec<br/>Line-delimited]
        Json[JsonCodec<br/>JSON serde]
        Cbor[CborCodec<br/>CBOR serde]
    end

    subgraph "I/O Traits"
        AsyncRead[AsyncRead]
        AsyncWrite[AsyncWrite]
        Stream[Stream]
        Sink[Sink]
    end

    Encoder --> Framed
    Decoder --> Framed
    Encoder --> FramedRead
    Decoder --> FramedRead
    Encoder --> FramedWrite
    Decoder --> FramedWrite

    Framed --> Fuse
    FramedRead --> Fuse
    FramedWrite --> Fuse

    Fuse --> FramedRead2
    Fuse --> FramedWrite2

    Bytes --> Encoder
    Bytes --> Decoder
    Length --> Encoder
    Length --> Decoder
    Lines --> Encoder
    Lines --> Decoder
    Json --> Encoder
    Json --> Decoder
    Cbor --> Encoder
    Cbor --> Decoder

    AsyncRead --> FramedRead
    AsyncWrite --> FramedWrite
    AsyncRead --> Framed
    AsyncWrite --> Framed
    Framed --> Stream
    Framed --> Sink
    FramedRead --> Stream
    FramedWrite --> Sink

    style Encoder fill:#e1f5ff
    style Decoder fill:#e1f5ff
    style Framed fill:#fff4e1
    style Fuse fill:#ffe1f5
    style Bytes fill:#e1ffe1
    style Length fill:#e1ffe1
    style Lines fill:#e1ffe1

Core Concepts

Encoder Trait

The Encoder trait defines how to convert items into bytes for transmission.

pub trait Encoder {
    type Item<'a>;
    type Error: From<Error>;

    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error>;
}

The encoder takes an item and writes its byte representation into a BytesMut buffer. The Item<'a> associated type is generic over lifetime to support borrowed data.

Decoder Trait

The Decoder trait defines how to extract items from bytes received from the network.

pub trait Decoder {
    type Item;
    type Error: From<Error>;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;

    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        self.decode(src)
    }
}

The decoder reads from a BytesMut buffer and returns Some(item) when a complete frame is available, None when more data is needed, or an error if the data is invalid. The decode_eof method is called when the stream reaches EOF to handle any remaining buffered data.

graph LR
    A[BytesMut Buffer] -->|decode| B{Complete Frame?}
    B -->|Yes| C[Some Item]
    B -->|No| D[None]
    B -->|Invalid| E[Error]

    A -->|decode_eof| F{EOF Handling}
    F -->|Remaining Data| G[Final Item]
    F -->|Empty| H[Stream End]
    F -->|Invalid| I[Error]

    style C fill:#e1ffe1
    style G fill:#e1ffe1
    style E fill:#ffe1e1
    style I fill:#ffe1e1

Framed Types

The library provides three main framed types:

  • Framed<T, U>: Combines both reading and writing (Stream + Sink)
  • FramedRead<T, D>: Read-only (Stream)
  • FramedWrite<T, E>: Write-only (Sink)

These types wrap an underlying I/O object and a codec, handling buffer management and back-pressure automatically.

graph TB
    subgraph "Framed Read Path"
        IO[AsyncRead] -->|poll_read| Buffer[BytesMut Buffer]
        Buffer -->|decode| Codec[Decoder]
        Codec -->|Item| Stream[Stream]
    end

    subgraph "Framed Write Path"
        Sink[Sink] -->|Item| Codec2[Encoder]
        Codec2 -->|encode| Buffer2[BytesMut Buffer]
        Buffer2 -->|poll_write| IO2[AsyncWrite]
    end

    subgraph "Back-Pressure"
        Buffer2 -->|len >= HWM| Block[Block send]
        Block -->|flush progress| Unblock[Unblock]
    end

    style Buffer fill:#fff4e1
    style Buffer2 fill:#fff4e1
    style Block fill:#ffe1e1
    style Unblock fill:#e1ffe1

Built-in Codecs

BytesCodec

A pass-through codec that ships bytes around without modification.

pub struct BytesCodec;

Encoding: Writes bytes directly to buffer Decoding: Returns all available bytes as a single frame

Use case: When you want to work with raw byte frames without any framing protocol.

LengthCodec

Prefixes data with its length as a u64 (8 bytes) for frame delimiting.

pub struct LengthCodec;

Encoding format: [u64 length][data bytes] Decoding: Reads length prefix, then extracts exactly that many bytes

Use case: Binary protocols where you need reliable frame boundaries. Often wrapped in custom codecs for type-specific serialization.

graph LR
    A[Data] -->|encode| B[Length Prefix<br/>8 bytes]
    B --> C[Data Bytes]
    C --> D[Frame]

    E[Frame] -->|decode| F[Read Length]
    F -->|length bytes| G[Extract Data]
    G --> H[Data]

    style B fill:#fff4e1
    style F fill:#fff4e1

LinesCodec

Splits data into lines delimited by newline characters (\n).

pub struct LinesCodec;

Encoding: Writes string bytes to buffer Decoding: Uses memchr for efficient newline search

Use case: Text-based protocols like HTTP headers, IRC, or any line-oriented protocol.

JsonCodec (feature: json)

Serializes and deserializes types using JSON via serde.

pub struct JsonCodec<Enc, Dec>;

Encoding: serde_json::to_string Decoding: Streaming JSON deserializer with EOF handling

Use case: REST APIs, websockets with JSON payloads, configuration protocols.

Error handling: JsonCodecError wraps both IO and JSON errors.

CborCodec (feature: cbor)

Serializes and deserializes types using CBOR (Concise Binary Object Representation).

pub struct CborCodec<Enc, Dec>;

Encoding: serde_cbor::to_vec Decoding: Streaming CBOR deserializer with EOF handling

Use case: Binary protocols requiring compact serialization, IoT devices, resource-constrained environments.

Error handling: CborCodecError wraps both IO and CBOR errors.

Buffer Management and Back-Pressure

High-Water Mark

The FramedWrite implements back-pressure using a high-water mark to prevent unbounded buffer growth.

 \text{HWM} = 2^{17} \text{ bytes} = 131,072 \text{ bytes} \approx 60\% \text{ of default TCP SO\_SNDBUF} 

When the write buffer length exceeds the high-water mark, poll_ready returns Pending until buffer space is freed by flushing to the underlying I/O.

graph TB
    subgraph "Back-Pressure Lifecycle"
        Send[send called] -->|encode| Buffer[Buffer grows]
        Buffer -->|len < HWM| Accept[Accept item]
        Buffer -->|len >= HWM| Block[Block sender]
        Block -->|flush| Write[Write to I/O]
        Write -->|progress| Space[Buffer space freed]
        Space -->|len < HWM| Unblock[Unblock sender]
    end

    style Block fill:#ffe1e1
    style Unblock fill:#e1ffe1
    style Buffer fill:#fff4e1

Buffer Capacities

Read buffer initial capacity:

 C_{read} = 8 \times 1024 \text{ bytes} = 8 \text{ KB} 

Write buffer initial capacity:

 C_{write} = 1028 \times 8 \text{ bytes} \approx 8 \text{ KB} 

These values balance memory usage with read efficiency for typical network packet sizes.

Buffer Operations

The library uses BytesMut from the bytes crate for efficient buffer management:

  • extend_from_slice: Append data to buffer
  • split_to: Remove prefix from buffer and return as Bytes
  • advance: Remove prefix without returning (consumed data)
  • reserve: Ensure capacity for future writes
graph LR
    A[BytesMut] -->|extend_from_slice| B[Append Data]
    B -->|split_to| C[Extract Frame]
    C -->|advance| D[Consume Header]
    D -->|reserve| E[Pre-allocate Space]

    style B fill:#fff4e1
    style C fill:#e1ffe1
    style D fill:#ffe1f5

Advanced Concepts

Fuse Type

The Fuse<T, U> type combines an I/O object with a codec, implementing AsyncRead and AsyncWrite by delegating to the I/O object while providing access to the codec via Deref.

pub(crate) struct Fuse<T, U> {
    #[pin]
    pub t: T,  // I/O object
    pub u: U,  // Codec
}

This allows the framed types to treat the I/O+codec combination as a single object that implements both async I/O traits and codec traits.

Parts Pattern

All framed types support deconstruction into parts and reconstruction:

pub struct FramedParts<T, U> {
    pub io: T,
    pub codec: U,
    pub read_buffer: BytesMut,
    pub write_buffer: BytesMut,
}

This enables:

  • Codec switching without losing buffered data
  • Inspecting internal state for debugging
  • Custom framed type construction
graph TB
    A[Framed] -->|into_parts| B[Parts]
    B -->|map_codec| C[New Codec]
    C -->|from_parts| D[New Framed]

    B -->|io| E[Underlying I/O]
    B -->|codec| F[Original Codec]
    B -->|read_buffer| G[Read Buffer]
    B -->|write_buffer| H[Write Buffer]

    style B fill:#fff4e1
    style C fill:#e1ffe1

EOF Handling

The decode_eof method allows codecs to handle remaining data when the stream reaches EOF:

  • Default implementation calls decode
  • Custom implementations can handle partial frames
  • Returns error if invalid data remains in buffer
graph TB
    A[Stream EOF] -->|decode_eof| B{Buffer Empty?}
    B -->|Yes| C[Return None<br/>Clean EOF]
    B -->|No| D{Valid Frame?}
    D -->|Yes| E[Return Item<br/>Final frame]
    D -->|No| F[Return Error<br/>Truncated data]

    style C fill:#e1ffe1
    style E fill:#e1ffe1
    style F fill:#ffe1e1

Streaming Decode

Decoders support streaming by returning None when insufficient data is available:

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
    if src.len() < MIN_REQUIRED {
        return Ok(None);  // Need more data
    }
    // ... decode frame
}

The framed read loop automatically reads more data from the underlying I/O when decode returns None.

Quick Start

Basic Framed Usage

use asynchronous_codec::{BytesCodec, Framed};
use bytes::Bytes;
use futures::{SinkExt, TryStreamExt};
use futures::io::Cursor;

# futures::executor::block_on(async move {
let mut buf = vec![];
let cur = Cursor::new(&mut buf);
let mut framed = Framed::new(cur, BytesCodec {});

// Send bytes
framed.send(Bytes::from("Hello world!")).await?;

// Receive bytes
while let Some(bytes) = framed.try_next().await? {
    println!("{:?}", bytes);
}
# Ok::<_, std::io::Error>(())
# }).unwrap();

Line-Delimited Protocol

use asynchronous_codec::{LinesCodec, FramedRead};
use futures::TryStreamExt;

# futures::executor::block_on(async move {
let input = "hello\nworld\nthis\nis\ndog\n".as_bytes();
let mut lines = FramedRead::new(input, LinesCodec);

while let Some(line) = lines.try_next().await? {
    println!("{}", line);
}
# Ok::<_, std::io::Error>(())
# }).unwrap();

Length-Prefixed Custom Codec

use asynchronous_codec::{Decoder, Encoder, LengthCodec};
use bytes::{Bytes, BytesMut};
use std::io::{Error, ErrorKind};

pub struct MyStringCodec(LengthCodec);

impl Encoder for MyStringCodec {
    type Item<'a> = String;
    type Error = Error;

    fn encode(&mut self, src: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
        let bytes = Bytes::from(src);
        self.0.encode(bytes, dst)
    }
}

impl Decoder for MyStringCodec {
    type Item = String;
    type Error = Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        match self.0.decode(src)? {
            Some(bytes) => {
                match String::from_utf8(bytes.to_vec()) {
                    Ok(string) => Ok(Some(string)),
                    Err(e) => Err(Error::new(ErrorKind::InvalidData, e))
                }
            },
            None => Ok(None),
        }
    }
}

JSON Serialization

use asynchronous_codec::{JsonCodec, Framed};
use serde::{Serialize, Deserialize};
use futures::{SinkExt, TryStreamExt};

#[derive(Serialize, Deserialize)]
struct Message {
    pub id: u32,
    pub data: String,
}

# futures::executor::block_on(async move {
# let mut buf = vec![];
# let stream = futures::io::Cursor::new(&mut buf);
let codec = JsonCodec::<Message, Message>::new();
let mut framed = Framed::new(stream, codec);

// Send message
framed.send(Message { id: 1, data: "hello".to_string() }).await?;

// Receive messages
while let Some(msg) = framed.try_next().await? {
    println!("Received: {} - {}", msg.id, msg.data);
}
# Ok::<_, std::io::Error>(())
# }).unwrap();

Performance Considerations

Buffer Sizing

The default buffer sizes are tuned for typical network scenarios:

  • Small messages: 8KB buffers reduce per-message overhead
  • Large messages: Buffers grow dynamically as needed
  • Back-pressure: High-water mark prevents memory exhaustion

Zero-Copy Operations

The Bytes type enables zero-copy operations:

  • split_to returns a Bytes that shares the underlying memory
  • No data copying when extracting frames from buffer
  • Reference counting manages memory lifetime

Async Overhead

The library uses pin_project_lite for minimal async overhead:

  • No heap allocations for pinned data
  • Compiler optimizations inline critical paths
  • Poll-based async integrates with executors efficiently

API Reference

Encoder Trait

pub trait Encoder {
    type Item<'a>;
    type Error: From<Error>;

    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error>;
}

Decoder Trait

pub trait Decoder {
    type Item;
    type Error: From<Error>;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;

    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
}

Framed

impl<T, U> Framed<T, U>
where
    T: AsyncRead + AsyncWrite,
    U: Decoder + Encoder,
{
    pub fn new(inner: T, codec: U) -> Self;
    pub fn from_parts(parts: FramedParts<T, U>) -> Self;
    pub fn into_parts(self) -> FramedParts<T, U>;
    pub fn into_inner(self) -> T;
    pub fn codec(&self) -> &U;
    pub fn codec_mut(&mut self) -> &mut U;
    pub const fn read_buffer(&self) -> &BytesMut;
    pub fn send_high_water_mark(&self) -> usize;
    pub fn set_send_high_water_mark(&mut self, hwm: usize);
}

FramedRead

impl<T, D> FramedRead<T, D>
where
    T: AsyncRead,
    D: Decoder,
{
    pub fn new(inner: T, decoder: D) -> Self;
    pub fn from_parts(parts: FramedReadParts<T, D>) -> Self;
    pub fn into_parts(self) -> FramedReadParts<T, D>;
    pub fn into_inner(self) -> T;
    pub fn decoder(&self) -> &D;
    pub fn decoder_mut(&mut self) -> &mut D;
    pub const fn read_buffer(&self) -> &BytesMut;
}

FramedWrite

impl<T, E> FramedWrite<T, E>
where
    T: AsyncWrite,
    E: Encoder,
{
    pub fn new(inner: T, encoder: E) -> Self;
    pub fn from_parts(parts: FramedWriteParts<T, E>) -> Self;
    pub fn into_parts(self) -> FramedWriteParts<T, E>;
    pub fn into_inner(self) -> T;
    pub fn encoder(&self) -> E;
    pub fn encoder_mut(&mut self) -> &mut E;
    pub const fn send_high_water_mark(&self) -> usize;
    pub const fn set_send_high_water_mark(&mut self, hwm: usize);
}

Codecs

BytesCodec: pub struct BytesCodec;

LengthCodec: pub struct LengthCodec;

LinesCodec: pub struct LinesCodec;

JsonCodec: pub struct JsonCodec<Enc, Dec>;

CborCodec: pub struct CborCodec<Enc, Dec>;

Testing

The library includes comprehensive tests for all codecs and framed types. Run tests with:

cargo test

Run benchmarks with:

cargo bench

License

MIT