Skip to content

Zer0-Tolerance/MongoDB-Fast

Repository files navigation

MongoDB::Fast

A high-performance, async MongoDB driver for Raku with an optimized BSON and wire protocol implementation. Designed for speed and simplicity, with full support for single connections and connection pools.

Features

  • Async/await API using native Raku Promise
  • Persistent background socket reader (no per-request reconnect overhead)
  • Batched inserts via OP_MSG document sequences
  • Connection pooling
  • Auto-reconnect with exponential backoff
  • CRUD: insert, find, update, delete, replace
  • Aggregation pipeline
  • Index management
  • Cursor support with lazy pagination

Installation

zef install MongoDB::Fast

Or clone and install locally:

git clone https://github.com/Zer0-Tolerance/MongoDB-Fast
cd MongoDB-Fast
zef install .

Quick Start

use MongoDB::Fast;

my $m = MongoDB::Fast.new;
await $m.connect;

my $col = $m.db('mydb').collection('users');

# Insert
await $col.insert-one({ name => 'Alice', age => 30 });

# Find one
my $doc = await $col.find-one({ name => 'Alice' });
say $doc<name>;  # Alice

# Clean up
$m.close;

Connection Options

my $m = MongoDB::Fast.new(
    host                   => '127.0.0.1',   # default
    port                   => 27017,          # default
    use-pool               => False,          # set True to use connection pool
    max-connections        => 10,             # pool size
    enable-auto-reconnect  => True,
    max-reconnect-attempts => 5,
    initial-retry-delay    => 0.1e0,          # seconds, doubles each attempt
    connection-timeout     => 10,             # seconds
);
await $m.connect;

CRUD Operations

Insert

# Insert one document — returns { insertedId => ..., acknowledged => True }
my $r = await $col.insert-one({ name => 'Bob', score => 42 });
say $r<insertedId>;

# Insert many documents — returns { insertedIds => [...], insertedCount => N }
my @docs = ({ name => 'Alice' }, { name => 'Bob' }, { name => 'Carol' });
my $r = await $col.insert-many(@docs);
say $r<insertedCount>;  # 3

Find

# Find one document
my $doc = await $col.find-one({ name => 'Alice' });

# Find one with projection and sort
my $doc = await $col.find-one(
    { status => 'active' },
    options => { sort => { score => -1 }, projection => { name => 1, score => 1 } }
);

# Cursor — lazy, paginated
my $cursor = $col.find({ status => 'active' });
while my $doc = await $cursor.next {
    say $doc<name>;
}

# Fetch all at once (uses large batchSize internally)
my @all = await $col.find({ status => 'active' }).all;

# Find with options
my $cursor = $col.find(
    { age => { '$gte' => 18 } },
    options => { sort => { age => 1 }, limit => 50, batchSize => 100 }
);

Update

# Update one
my $r = await $col.update-one(
    { name => 'Alice' },
    { '$set' => { score => 99 } }
);
say $r<modifiedCount>;

# Update many
my $r = await $col.update-many(
    { status => 'inactive' },
    { '$set' => { archived => True } }
);

# Upsert
await $col.update-one(
    { name => 'Dave' },
    { '$set' => { score => 10 } },
    :upsert
);

# Replace one
await $col.replace-one(
    { name => 'Alice' },
    { name => 'Alice', score => 100, updated => True }
);

Delete

# Delete one
my $r = await $col.delete-one({ name => 'Bob' });
say $r<deletedCount>;

# Delete many
my $r = await $col.delete-many({ score => { '$lt' => 10 } });

Count

my $n = await $col.count-documents;            # all
my $n = await $col.count-documents({ age => { '$gte' => 18 } });

Aggregation

# aggregate returns an Array — receive it in a scalar, then iterate
my $results = await $col.aggregate([
    { '$match'  => { status => 'active' } },
    { '$group'  => { _id => '$country', total => { '$sum' => 1 } } },
    { '$sort'   => { total => -1 } },
    { '$limit'  => 10 },
]);

for @$results -> $r {
    say "$r<_id>: $r<total>";
}

Indexes

# Create a single-field index
await $col.create-index({ email => 1 });

# Unique index with custom name
await $col.create-index(
    { email => 1 },
    options => { unique => True, name => 'email_unique' }
);

# Compound index
await $col.create-index({ country => 1, score => -1 });

Database Operations

my $db = $m.db('mydb');

# List collections
my @cols = await $db.list-collections;
say @cols.map(*<name>).join(', ');

# Database stats
my $stats = await $db.stats;
say $stats<dataSize>;

# Drop database
await $db.drop;

Collection Operations

# Drop collection
await $col.drop;

Connection Pool

my $m = MongoDB::Fast.new(:use-pool, max-connections => 20);
await $m.connect;

# All subsequent operations automatically acquire/release connections
my $col = $m.db('mydb').collection('events');
await $col.insert-one({ type => 'login', user => 'alice' });

$m.close;  # closes all pooled connections

Health Check

my $ok = await $m.ping;
say $ok ?? "MongoDB is up" !! "MongoDB is down";

Bulk Insert Example

For high-throughput ingestion, batch documents and insert in chunks:

use MongoDB::Fast;

sub MAIN(Int $total = 10_000, Int $batch-size = 1_000) {
    my $m = MongoDB::Fast.new;
    await $m.connect;
    my $col = $m.db('bench').collection('data');

    my $start = now;
    my @batch;

    for ^$total -> $i {
        @batch.push: { index => $i, value => $i * $i };
        if $i > 0 && $i %% $batch-size {
            await $col.insert-many(@batch);
            @batch = ();
        }
    }
    await $col.insert-many(@batch) if @batch;

    my $elapsed = now - $start;
    say "Inserted $total docs in {$elapsed.fmt('%.3f')}s ({($total / $elapsed).Int} docs/sec)";

    await $col.drop;
    $m.close;
}

Error Handling

All methods return Promise. Use try/CATCH to handle errors:

my $doc = await $col.find-one({ _id => 'missing' });
say $doc.defined ?? $doc<name> !! "not found";

try {
    await $col.insert-one({ _id => 'dup' });
    await $col.insert-one({ _id => 'dup' });
    CATCH {
        default { say "Error: {$_.message}" }
    }
}

Architecture

Module Role
MongoDB::Fast Top-level client; holds connection or pool
MongoDB::Fast::Connection Single TCP connection, background reader, serialized send/recv
MongoDB::Fast::Connection::Pool Pool of connections with acquire/release
MongoDB::Fast::Database Database handle; dispatches commands
MongoDB::Fast::Collection CRUD and aggregation methods
MongoDB::Fast::Cursor Lazy paginated result set
MongoDB::Fast::Wire OP_MSG framing and parsing
MongoDB::Fast::BSON BSON encode/decode
MongoDB::Fast::ObjectID ObjectID generation

License

Artistic-2.0

About

High-performance async MongoDB driver for Raku with optimized BSON and wire protocol implementation

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages