RxMongo is a TypeScript/JavaScript library that wraps MongoDB operations in RxJS observables, allowing for reactive, event-driven interactions with MongoDB. It is designed to provide a seamless way of working with MongoDB using the reactive programming paradigm. With RxMongo, you can perform MongoDB operations such as querying, updating, inserting, and aggregating in a reactive way with support for RxJS streams.
- Reactive MongoDB operations: Perform CRUD operations, aggregations, and more using RxJS.
- Observable-based APIs: Get data streams from MongoDB collections and work with them reactively.
- Easy to integrate: Supports integration with Node.js, Express.js, and NestJS environments.
- TypeScript support: Fully typed for better development experience.-
To install the package, you need to add rxjs and mongodb as dependencies along with @todorivanov/rx-mongo:
npm install @todorivanov/rx-mongo rxjs mongodb- Node.js Integration
Here's how to integrate
RxMongointo a basic Node.js application:
import { RxMongo, RxMongoCollection } from "@todorivanov/rx-mongo";
import { Observable } from "rxjs";
import { MongoClient, Document } from "mongodb";
// Define an interface that extends Document
interface User extends Document {
name: string;
email: string;
age: number;
}
// Initialize RxMongo and get the collection
async function main() {
const client = await MongoClient.connect("mongodb://localhost:27017");
const db = client.db("myDatabase");
const rxMongo = new RxMongo(db);
// Get collection as Observable
rxMongo.getCollection<User>("users").subscribe((userCollection) => {
// Example: Insert a user
const newUser: User = {
name: "John Doe",
email: "john.doe@example.com",
age: 30,
};
userCollection.insertOne(newUser).subscribe((result) => {
console.log("User inserted:", result);
});
// Example: Query users
userCollection.find({ age: { $gt: 25 } }).subscribe((users) => {
console.log("Users older than 25:", users);
});
});
}
main();- Express.js Integration
You can also use RxMongo in an Express.js environment. Here's an example of integrating it into an API:
import express from "express";
import { RxMongo, RxMongoCollection } from "@todorivanov/rx-mongo";
import { MongoClient, Document } from "mongodb";
import { Observable } from "rxjs";
// Define a generic interface for a Product document
interface Product extends Document {
name: string;
price: number;
stock: number;
}
const app = express();
app.use(express.json());
let productCollection: RxMongoCollection<Product>;
// Initialize MongoDB and set up the collection
async function initMongo() {
const client = await MongoClient.connect("mongodb://localhost:27017");
const db = client.db("myDatabase");
const rxMongo = new RxMongo(db);
rxMongo.getCollection<Product>("products").subscribe((collection) => {
productCollection = collection;
});
}
// Endpoint to add a product
app.post("/products", (req, res) => {
const newProduct: Product = req.body;
productCollection.insertOne(newProduct).subscribe((result) => {
res.status(201).send(result);
});
});
// Endpoint to get all products
app.get("/products", (req, res) => {
productCollection.find({}).subscribe((products) => {
res.json(products);
});
});
// Start the server
app.listen(3000, () => {
initMongo().then(() => {
console.log("Server and MongoDB initialized on port 3000");
});
});- NestJs Integration
You can easily integrate RxMongo into a NestJS project. Below is an example module and service using RxMongo with NestJS.
Create a DatabaseModule:
import { Module } from "@nestjs/common";
import { MongoClient } from "mongodb";
import { RxMongoService } from "./rx-mongo.service";
@Module({
providers: [
{
provide: "DATABASE_CONNECTION",
useFactory: async () => {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
return client.db("mydatabase"); // Use your MongoDB database
},
},
RxMongoService,
],
exports: [RxMongoService],
})
export class DatabaseModule {}Create RxMongoService:
import { Injectable, Inject } from "@nestjs/common";
import { RxMongo } from "@todorivanov/rx-mongo";
import { Db } from "mongodb";
@Injectable()
export class RxMongoService {
private rxMongo: RxMongo;
constructor(@Inject("DATABASE_CONNECTION") db: Db) {
this.rxMongo = new RxMongo(db); // Initialize RxMongo
}
getUsers() {
return this.rxMongo.getCollection("users"); // Return users collection as Observable
}
}- API Overview
RxMongo
constructor(db: Db): InitializeRxMongowith a MongoDBDbinstance.getCollection<T>(collectionName: string, options?: CollectionOptions): Observable<RxMongoCollection<T>>: Returns an observable for accessing a specific collection wrapped with reactive operations.
RxMongoCollection<T>
All MongoDB collection methods are wrapped as RxJS observables:
countDocuments(filter: Filter<T>, options?: CountDocumentsOptions): Observable<number>: Count the documents matching the filter.find(filter: Filter<T>, options?: FindOptions): Observable<WithId<T>[]>: Find documents in the collection that match the provided filter.findOne(filter: Filter<T>, options?: FindOptions): Observable<T | null>: Find a single document.insertOne(document: OptionalUnlessRequiredId<T>): Observable<InsertOneResult<T>>: Insert a single document into the collection.insertMany(docs: ReadonlyArray<OptionalUnlessRequiredId<T>>, options?: BulkWriteOptions): Observable<InsertManyResult<T>>: Insert multiple documents.updateOne(filter: Filter<T>, update: UpdateFilter<T>, options?: UpdateOptions): Observable<UpdateResult<T>>: Update a single document using update operators ($set, $inc, etc.).replaceOne(filter: Filter<T>, replacement: WithoutId<T>, options?: ReplaceOptions): Observable<UpdateResult<T>>: Replace an entire document.updateMany(filter: Filter<T>, update: UpdateFilter<T> | T[], options?: UpdateOptions): Observable<UpdateResult<T>>: Update multiple documents.deleteOne(filter?: Filter<T>, options?: DeleteOptions): Observable<DeleteResult>: Delete a single document from the collection.deleteMany(filter?: Filter<T>, options?: DeleteOptions): Observable<DeleteResult>: Delete multiple documents.distinct(key: string, filter: Filter<T>, options?: DistinctOptions): Observable<any[]>: Get distinct values for a field.aggregate(pipeline?: T[], options?: AggregateOptions): Observable<AggregationCursor<T>>: Perform aggregation operations.bulkWrite(operations: ReadonlyArray<AnyBulkWriteOperation<T>>, options?: BulkWriteOptions): Observable<BulkWriteResult>: Perform bulk write operations.- Index management methods:
createIndex,createIndexes,dropIndex,dropIndexes,indexes,listIndexes,indexExists
For a complete list of available operations, see the MongoDB Node.js driver documentation and the corresponding RxJS observables used in this library.
Make sure you're using MongoDB driver v6.0.0 or higher, which includes built-in TypeScript types. The @types/mongodb package is deprecated and should not be used.
This library requires rxjs (^7.0.0) and mongodb (^6.0.0) as peer dependencies. Install them if you haven't already:
npm install @todorivanov/rx-mongo rxjs mongodbEnsure your MongoDB connection string is correct and the MongoDB server is running. For development, you can use MongoDB locally or a cloud service like MongoDB Atlas.
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
This project is licensed under the MIT License - see the LICENSE file for details.
Todor Ivanov
- GitHub: @todorivanov