async/await.
- Rust 100%
| Filename | Latest commit message | Latest commit date |
|---|---|---|
| benches | ||
| src | ||
| tests | ||
| .gitignore | ||
| Cargo.toml | ||
| LICENSE | ||
| README.md | ||
asynchronous-codec
Utilities for encoding and decoding frames using async/await. This library provides adapters to transform byte streams (AsyncRead and AsyncWrite) into framed streams implementing Sink and Stream traits. Framed streams are also known as transports.
This is a fork of futures-codec by Matt Hunzinger, borrowing concepts from tokio-codec.
What This Library Does
This library provides tools for:
- Encoding structured data into byte frames for transmission
- Decoding byte frames back into structured data
- Managing back-pressure on async I/O streams
- Handling partial data and EOF conditions gracefully
- Working with multiple serialization formats (JSON, CBOR)
The design follows Rust's async/await patterns with zero-cost abstractions and efficient buffer management.
Installation
Current (Vendored Path-Based)
Currently using vendored crates with path dependencies:
[dependencies]
asynchronous-codec = { path = "../asynchronous-codec" }
bytes = { path = "../bytes" }
futures-util = { path = "../futures-rs/futures-util", features = ["io"] }
# Optional features
asynchronous-codec = { path = "../asynchronous-codec", features = ["json"] } # For JSON codec
asynchronous-codec = { path = "../asynchronous-codec", features = ["cbor"] } # For CBOR codec
Future (Git-Based Release System)
Transitioning to git-based dependencies hosted at https://git.sly.so/kade/:
[dependencies]
asynchronous-codec = { git = "https://git.sly.so/kade/asynchronous-codec", branch = "master" }
bytes = { git = "https://git.sly.so/kade/bytes", branch = "master" }
futures-util = { git = "https://git.sly.so/kade/futures-rs", branch = "master", features = ["io"] }
# Optional features
asynchronous-codec = { git = "https://git.sly.so/kade/asynchronous-codec", branch = "master", features = ["json"] }
asynchronous-codec = { git = "https://git.sly.so/kade/asynchronous-codec", branch = "master", features = ["cbor"] }
Note: The git-based release system is under development. See ./vendor/garou-repos.toml for the complete repository database.
Architecture Overview
graph TB
subgraph "Core Traits"
Encoder[Encoder Trait<br/>encode]
Decoder[Decoder Trait<br/>decode, decode_eof]
end
subgraph "Framed Adapters"
Framed[Framed<br/>Stream + Sink]
FramedRead[FramedRead<br/>Stream only]
FramedWrite[FramedWrite<br/>Sink only]
end
subgraph "Internal Types"
Fuse[Fuse<br/>I/O + Codec]
FramedRead2[FramedRead2<br/>Internal impl]
FramedWrite2[FramedWrite2<br/>Internal impl]
end
subgraph "Codecs"
Bytes[BytesCodec<br/>Pass-through]
Length[LengthCodec<br/>Length-prefixed]
Lines[LinesCodec<br/>Line-delimited]
Json[JsonCodec<br/>JSON serde]
Cbor[CborCodec<br/>CBOR serde]
end
subgraph "I/O Traits"
AsyncRead[AsyncRead]
AsyncWrite[AsyncWrite]
Stream[Stream]
Sink[Sink]
end
Encoder --> Framed
Decoder --> Framed
Encoder --> FramedRead
Decoder --> FramedRead
Encoder --> FramedWrite
Decoder --> FramedWrite
Framed --> Fuse
FramedRead --> Fuse
FramedWrite --> Fuse
Fuse --> FramedRead2
Fuse --> FramedWrite2
Bytes --> Encoder
Bytes --> Decoder
Length --> Encoder
Length --> Decoder
Lines --> Encoder
Lines --> Decoder
Json --> Encoder
Json --> Decoder
Cbor --> Encoder
Cbor --> Decoder
AsyncRead --> FramedRead
AsyncWrite --> FramedWrite
AsyncRead --> Framed
AsyncWrite --> Framed
Framed --> Stream
Framed --> Sink
FramedRead --> Stream
FramedWrite --> Sink
style Encoder fill:#e1f5ff
style Decoder fill:#e1f5ff
style Framed fill:#fff4e1
style Fuse fill:#ffe1f5
style Bytes fill:#e1ffe1
style Length fill:#e1ffe1
style Lines fill:#e1ffe1
Core Concepts
Encoder Trait
The Encoder trait defines how to convert items into bytes for transmission.
pub trait Encoder {
type Item<'a>;
type Error: From<Error>;
fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error>;
}
The encoder takes an item and writes its byte representation into a BytesMut buffer. The Item<'a> associated type is generic over lifetime to support borrowed data.
Decoder Trait
The Decoder trait defines how to extract items from bytes received from the network.
pub trait Decoder {
type Item;
type Error: From<Error>;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.decode(src)
}
}
The decoder reads from a BytesMut buffer and returns Some(item) when a complete frame is available, None when more data is needed, or an error if the data is invalid. The decode_eof method is called when the stream reaches EOF to handle any remaining buffered data.
graph LR
A[BytesMut Buffer] -->|decode| B{Complete Frame?}
B -->|Yes| C[Some Item]
B -->|No| D[None]
B -->|Invalid| E[Error]
A -->|decode_eof| F{EOF Handling}
F -->|Remaining Data| G[Final Item]
F -->|Empty| H[Stream End]
F -->|Invalid| I[Error]
style C fill:#e1ffe1
style G fill:#e1ffe1
style E fill:#ffe1e1
style I fill:#ffe1e1
Framed Types
The library provides three main framed types:
- Framed<T, U>: Combines both reading and writing (Stream + Sink)
- FramedRead<T, D>: Read-only (Stream)
- FramedWrite<T, E>: Write-only (Sink)
These types wrap an underlying I/O object and a codec, handling buffer management and back-pressure automatically.
graph TB
subgraph "Framed Read Path"
IO[AsyncRead] -->|poll_read| Buffer[BytesMut Buffer]
Buffer -->|decode| Codec[Decoder]
Codec -->|Item| Stream[Stream]
end
subgraph "Framed Write Path"
Sink[Sink] -->|Item| Codec2[Encoder]
Codec2 -->|encode| Buffer2[BytesMut Buffer]
Buffer2 -->|poll_write| IO2[AsyncWrite]
end
subgraph "Back-Pressure"
Buffer2 -->|len >= HWM| Block[Block send]
Block -->|flush progress| Unblock[Unblock]
end
style Buffer fill:#fff4e1
style Buffer2 fill:#fff4e1
style Block fill:#ffe1e1
style Unblock fill:#e1ffe1
Built-in Codecs
BytesCodec
A pass-through codec that ships bytes around without modification.
pub struct BytesCodec;
Encoding: Writes bytes directly to buffer Decoding: Returns all available bytes as a single frame
Use case: When you want to work with raw byte frames without any framing protocol.
LengthCodec
Prefixes data with its length as a u64 (8 bytes) for frame delimiting.
pub struct LengthCodec;
Encoding format: [u64 length][data bytes]
Decoding: Reads length prefix, then extracts exactly that many bytes
Use case: Binary protocols where you need reliable frame boundaries. Often wrapped in custom codecs for type-specific serialization.
graph LR
A[Data] -->|encode| B[Length Prefix<br/>8 bytes]
B --> C[Data Bytes]
C --> D[Frame]
E[Frame] -->|decode| F[Read Length]
F -->|length bytes| G[Extract Data]
G --> H[Data]
style B fill:#fff4e1
style F fill:#fff4e1
LinesCodec
Splits data into lines delimited by newline characters (\n).
pub struct LinesCodec;
Encoding: Writes string bytes to buffer
Decoding: Uses memchr for efficient newline search
Use case: Text-based protocols like HTTP headers, IRC, or any line-oriented protocol.
JsonCodec (feature: json)
Serializes and deserializes types using JSON via serde.
pub struct JsonCodec<Enc, Dec>;
Encoding: serde_json::to_string
Decoding: Streaming JSON deserializer with EOF handling
Use case: REST APIs, websockets with JSON payloads, configuration protocols.
Error handling: JsonCodecError wraps both IO and JSON errors.
CborCodec (feature: cbor)
Serializes and deserializes types using CBOR (Concise Binary Object Representation).
pub struct CborCodec<Enc, Dec>;
Encoding: serde_cbor::to_vec
Decoding: Streaming CBOR deserializer with EOF handling
Use case: Binary protocols requiring compact serialization, IoT devices, resource-constrained environments.
Error handling: CborCodecError wraps both IO and CBOR errors.
Buffer Management and Back-Pressure
High-Water Mark
The FramedWrite implements back-pressure using a high-water mark to prevent unbounded buffer growth.
\text{HWM} = 2^{17} \text{ bytes} = 131,072 \text{ bytes} \approx 60\% \text{ of default TCP SO\_SNDBUF}
When the write buffer length exceeds the high-water mark, poll_ready returns Pending until buffer space is freed by flushing to the underlying I/O.
graph TB
subgraph "Back-Pressure Lifecycle"
Send[send called] -->|encode| Buffer[Buffer grows]
Buffer -->|len < HWM| Accept[Accept item]
Buffer -->|len >= HWM| Block[Block sender]
Block -->|flush| Write[Write to I/O]
Write -->|progress| Space[Buffer space freed]
Space -->|len < HWM| Unblock[Unblock sender]
end
style Block fill:#ffe1e1
style Unblock fill:#e1ffe1
style Buffer fill:#fff4e1
Buffer Capacities
Read buffer initial capacity:
C_{read} = 8 \times 1024 \text{ bytes} = 8 \text{ KB}
Write buffer initial capacity:
C_{write} = 1028 \times 8 \text{ bytes} \approx 8 \text{ KB}
These values balance memory usage with read efficiency for typical network packet sizes.
Buffer Operations
The library uses BytesMut from the bytes crate for efficient buffer management:
extend_from_slice: Append data to buffersplit_to: Remove prefix from buffer and return asBytesadvance: Remove prefix without returning (consumed data)reserve: Ensure capacity for future writes
graph LR
A[BytesMut] -->|extend_from_slice| B[Append Data]
B -->|split_to| C[Extract Frame]
C -->|advance| D[Consume Header]
D -->|reserve| E[Pre-allocate Space]
style B fill:#fff4e1
style C fill:#e1ffe1
style D fill:#ffe1f5
Advanced Concepts
Fuse Type
The Fuse<T, U> type combines an I/O object with a codec, implementing AsyncRead and AsyncWrite by delegating to the I/O object while providing access to the codec via Deref.
pub(crate) struct Fuse<T, U> {
#[pin]
pub t: T, // I/O object
pub u: U, // Codec
}
This allows the framed types to treat the I/O+codec combination as a single object that implements both async I/O traits and codec traits.
Parts Pattern
All framed types support deconstruction into parts and reconstruction:
pub struct FramedParts<T, U> {
pub io: T,
pub codec: U,
pub read_buffer: BytesMut,
pub write_buffer: BytesMut,
}
This enables:
- Codec switching without losing buffered data
- Inspecting internal state for debugging
- Custom framed type construction
graph TB
A[Framed] -->|into_parts| B[Parts]
B -->|map_codec| C[New Codec]
C -->|from_parts| D[New Framed]
B -->|io| E[Underlying I/O]
B -->|codec| F[Original Codec]
B -->|read_buffer| G[Read Buffer]
B -->|write_buffer| H[Write Buffer]
style B fill:#fff4e1
style C fill:#e1ffe1
EOF Handling
The decode_eof method allows codecs to handle remaining data when the stream reaches EOF:
- Default implementation calls
decode - Custom implementations can handle partial frames
- Returns error if invalid data remains in buffer
graph TB
A[Stream EOF] -->|decode_eof| B{Buffer Empty?}
B -->|Yes| C[Return None<br/>Clean EOF]
B -->|No| D{Valid Frame?}
D -->|Yes| E[Return Item<br/>Final frame]
D -->|No| F[Return Error<br/>Truncated data]
style C fill:#e1ffe1
style E fill:#e1ffe1
style F fill:#ffe1e1
Streaming Decode
Decoders support streaming by returning None when insufficient data is available:
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < MIN_REQUIRED {
return Ok(None); // Need more data
}
// ... decode frame
}
The framed read loop automatically reads more data from the underlying I/O when decode returns None.
Quick Start
Basic Framed Usage
use asynchronous_codec::{BytesCodec, Framed};
use bytes::Bytes;
use futures::{SinkExt, TryStreamExt};
use futures::io::Cursor;
# futures::executor::block_on(async move {
let mut buf = vec![];
let cur = Cursor::new(&mut buf);
let mut framed = Framed::new(cur, BytesCodec {});
// Send bytes
framed.send(Bytes::from("Hello world!")).await?;
// Receive bytes
while let Some(bytes) = framed.try_next().await? {
println!("{:?}", bytes);
}
# Ok::<_, std::io::Error>(())
# }).unwrap();
Line-Delimited Protocol
use asynchronous_codec::{LinesCodec, FramedRead};
use futures::TryStreamExt;
# futures::executor::block_on(async move {
let input = "hello\nworld\nthis\nis\ndog\n".as_bytes();
let mut lines = FramedRead::new(input, LinesCodec);
while let Some(line) = lines.try_next().await? {
println!("{}", line);
}
# Ok::<_, std::io::Error>(())
# }).unwrap();
Length-Prefixed Custom Codec
use asynchronous_codec::{Decoder, Encoder, LengthCodec};
use bytes::{Bytes, BytesMut};
use std::io::{Error, ErrorKind};
pub struct MyStringCodec(LengthCodec);
impl Encoder for MyStringCodec {
type Item<'a> = String;
type Error = Error;
fn encode(&mut self, src: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = Bytes::from(src);
self.0.encode(bytes, dst)
}
}
impl Decoder for MyStringCodec {
type Item = String;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.0.decode(src)? {
Some(bytes) => {
match String::from_utf8(bytes.to_vec()) {
Ok(string) => Ok(Some(string)),
Err(e) => Err(Error::new(ErrorKind::InvalidData, e))
}
},
None => Ok(None),
}
}
}
JSON Serialization
use asynchronous_codec::{JsonCodec, Framed};
use serde::{Serialize, Deserialize};
use futures::{SinkExt, TryStreamExt};
#[derive(Serialize, Deserialize)]
struct Message {
pub id: u32,
pub data: String,
}
# futures::executor::block_on(async move {
# let mut buf = vec![];
# let stream = futures::io::Cursor::new(&mut buf);
let codec = JsonCodec::<Message, Message>::new();
let mut framed = Framed::new(stream, codec);
// Send message
framed.send(Message { id: 1, data: "hello".to_string() }).await?;
// Receive messages
while let Some(msg) = framed.try_next().await? {
println!("Received: {} - {}", msg.id, msg.data);
}
# Ok::<_, std::io::Error>(())
# }).unwrap();
Performance Considerations
Buffer Sizing
The default buffer sizes are tuned for typical network scenarios:
- Small messages: 8KB buffers reduce per-message overhead
- Large messages: Buffers grow dynamically as needed
- Back-pressure: High-water mark prevents memory exhaustion
Zero-Copy Operations
The Bytes type enables zero-copy operations:
split_toreturns aBytesthat shares the underlying memory- No data copying when extracting frames from buffer
- Reference counting manages memory lifetime
Async Overhead
The library uses pin_project_lite for minimal async overhead:
- No heap allocations for pinned data
- Compiler optimizations inline critical paths
- Poll-based async integrates with executors efficiently
API Reference
Encoder Trait
pub trait Encoder {
type Item<'a>;
type Error: From<Error>;
fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error>;
}
Decoder Trait
pub trait Decoder {
type Item;
type Error: From<Error>;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
}
Framed
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
{
pub fn new(inner: T, codec: U) -> Self;
pub fn from_parts(parts: FramedParts<T, U>) -> Self;
pub fn into_parts(self) -> FramedParts<T, U>;
pub fn into_inner(self) -> T;
pub fn codec(&self) -> &U;
pub fn codec_mut(&mut self) -> &mut U;
pub const fn read_buffer(&self) -> &BytesMut;
pub fn send_high_water_mark(&self) -> usize;
pub fn set_send_high_water_mark(&mut self, hwm: usize);
}
FramedRead
impl<T, D> FramedRead<T, D>
where
T: AsyncRead,
D: Decoder,
{
pub fn new(inner: T, decoder: D) -> Self;
pub fn from_parts(parts: FramedReadParts<T, D>) -> Self;
pub fn into_parts(self) -> FramedReadParts<T, D>;
pub fn into_inner(self) -> T;
pub fn decoder(&self) -> &D;
pub fn decoder_mut(&mut self) -> &mut D;
pub const fn read_buffer(&self) -> &BytesMut;
}
FramedWrite
impl<T, E> FramedWrite<T, E>
where
T: AsyncWrite,
E: Encoder,
{
pub fn new(inner: T, encoder: E) -> Self;
pub fn from_parts(parts: FramedWriteParts<T, E>) -> Self;
pub fn into_parts(self) -> FramedWriteParts<T, E>;
pub fn into_inner(self) -> T;
pub fn encoder(&self) -> E;
pub fn encoder_mut(&mut self) -> &mut E;
pub const fn send_high_water_mark(&self) -> usize;
pub const fn set_send_high_water_mark(&mut self, hwm: usize);
}
Codecs
BytesCodec: pub struct BytesCodec;
LengthCodec: pub struct LengthCodec;
LinesCodec: pub struct LinesCodec;
JsonCodec: pub struct JsonCodec<Enc, Dec>;
CborCodec: pub struct CborCodec<Enc, Dec>;
Testing
The library includes comprehensive tests for all codecs and framed types. Run tests with:
cargo test
Run benchmarks with:
cargo bench
License
MIT