Skip to main content

Custom store adapters

Create custom store adapters to connect Reqon to any storage backend.

Store interface

Implement this TypeScript interface:

interface StoreAdapter {
// Get a single record by key
get(key: string): Promise<Record<string, unknown> | null>;

// Set a record with key
set(key: string, value: Record<string, unknown>): Promise<void>;

// Update a record (partial update)
update(key: string, partial: Record<string, unknown>): Promise<void>;

// Delete a record by key
delete(key: string): Promise<void>;

// List all records, optionally filtered
list(filter?: FilterOptions): Promise<Record<string, unknown>[]>;

// Clear all records
clear(): Promise<void>;
}

interface FilterOptions {
where?: WhereClause[];
limit?: number;
offset?: number;
}

interface WhereClause {
field: string;
operator: 'eq' | 'neq' | 'gt' | 'gte' | 'lt' | 'lte' | 'contains';
value: unknown;
}

Basic example

Redis adapter

import { createClient } from 'redis';
import { StoreAdapter, FilterOptions } from 'reqon';

export class RedisStoreAdapter implements StoreAdapter {
private client: ReturnType<typeof createClient>;
private prefix: string;

constructor(url: string, prefix: string) {
this.client = createClient({ url });
this.prefix = prefix;
}

async connect() {
await this.client.connect();
}

private key(id: string) {
return `${this.prefix}:${id}`;
}

async get(key: string) {
const data = await this.client.get(this.key(key));
return data ? JSON.parse(data) : null;
}

async set(key: string, value: Record<string, unknown>) {
await this.client.set(this.key(key), JSON.stringify(value));
}

async update(key: string, partial: Record<string, unknown>) {
const existing = await this.get(key);
if (existing) {
await this.set(key, { ...existing, ...partial });
}
}

async delete(key: string) {
await this.client.del(this.key(key));
}

async list(filter?: FilterOptions) {
const keys = await this.client.keys(`${this.prefix}:*`);
const items: Record<string, unknown>[] = [];

for (const key of keys) {
const data = await this.client.get(key);
if (data) {
items.push(JSON.parse(data));
}
}

return this.applyFilter(items, filter);
}

async clear() {
const keys = await this.client.keys(`${this.prefix}:*`);
if (keys.length > 0) {
await this.client.del(keys);
}
}

private applyFilter(items: Record<string, unknown>[], filter?: FilterOptions) {
let result = items;

if (filter?.where) {
result = result.filter(item =>
filter.where!.every(clause => this.evaluateClause(item, clause))
);
}

if (filter?.offset) {
result = result.slice(filter.offset);
}

if (filter?.limit) {
result = result.slice(0, filter.limit);
}

return result;
}

private evaluateClause(item: Record<string, unknown>, clause: WhereClause) {
const value = item[clause.field];

switch (clause.operator) {
case 'eq': return value === clause.value;
case 'neq': return value !== clause.value;
case 'gt': return (value as number) > (clause.value as number);
case 'gte': return (value as number) >= (clause.value as number);
case 'lt': return (value as number) < (clause.value as number);
case 'lte': return (value as number) <= (clause.value as number);
case 'contains': return String(value).includes(String(clause.value));
default: return true;
}
}
}

MongoDB adapter

import { MongoClient, Db, Collection } from 'mongodb';
import { StoreAdapter, FilterOptions } from 'reqon';

export class MongoStoreAdapter implements StoreAdapter {
private client: MongoClient;
private db: Db;
private collection: Collection;

constructor(url: string, database: string, collectionName: string) {
this.client = new MongoClient(url);
this.db = this.client.db(database);
this.collection = this.db.collection(collectionName);
}

async connect() {
await this.client.connect();
}

async get(key: string) {
const doc = await this.collection.findOne({ _id: key });
if (!doc) return null;
const { _id, ...data } = doc;
return { id: _id, ...data };
}

async set(key: string, value: Record<string, unknown>) {
await this.collection.replaceOne(
{ _id: key },
{ _id: key, ...value },
{ upsert: true }
);
}

async update(key: string, partial: Record<string, unknown>) {
await this.collection.updateOne(
{ _id: key },
{ $set: partial }
);
}

async delete(key: string) {
await this.collection.deleteOne({ _id: key });
}

async list(filter?: FilterOptions) {
const query = this.buildQuery(filter?.where);
let cursor = this.collection.find(query);

if (filter?.offset) {
cursor = cursor.skip(filter.offset);
}

if (filter?.limit) {
cursor = cursor.limit(filter.limit);
}

const docs = await cursor.toArray();
return docs.map(({ _id, ...data }) => ({ id: _id, ...data }));
}

async clear() {
await this.collection.deleteMany({});
}

private buildQuery(clauses?: WhereClause[]) {
if (!clauses || clauses.length === 0) return {};

const query: Record<string, unknown> = {};

for (const clause of clauses) {
const field = clause.field;
switch (clause.operator) {
case 'eq': query[field] = clause.value; break;
case 'neq': query[field] = { $ne: clause.value }; break;
case 'gt': query[field] = { $gt: clause.value }; break;
case 'gte': query[field] = { $gte: clause.value }; break;
case 'lt': query[field] = { $lt: clause.value }; break;
case 'lte': query[field] = { $lte: clause.value }; break;
case 'contains': query[field] = { $regex: clause.value }; break;
}
}

return query;
}
}

Registering custom adapters

import { execute, registerStoreAdapter } from 'reqon';
import { RedisStoreAdapter } from './redis-adapter';

// Register the adapter
registerStoreAdapter('redis', async (name: string, config: any) => {
const adapter = new RedisStoreAdapter(config.url, name);
await adapter.connect();
return adapter;
});

// Use in mission
await execute(`
mission Test {
store cache: redis("my-cache")

action Fetch {
get "/data"
store response -> cache { key: .id }
}

run Fetch
}
`, {
storeConfig: {
redis: {
url: 'redis://localhost:6379'
}
}
});

Best practices

Connection management

class MyAdapter implements StoreAdapter {
private connected = false;

async ensureConnected() {
if (!this.connected) {
await this.connect();
this.connected = true;
}
}

async get(key: string) {
await this.ensureConnected();
// ...
}
}

Error handling

async set(key: string, value: Record<string, unknown>) {
try {
await this.client.set(key, value);
} catch (error) {
throw new StoreError(`Failed to set ${key}: ${error.message}`);
}
}

Connection pooling

class PooledAdapter implements StoreAdapter {
private pool: Pool;

constructor(config: PoolConfig) {
this.pool = createPool(config);
}

async get(key: string) {
const conn = await this.pool.acquire();
try {
return await conn.get(key);
} finally {
this.pool.release(conn);
}
}
}

Batch operations

async setMany(items: Array<{ key: string; value: unknown }>) {
// Override for efficient batch writes
await this.client.mset(items);
}

Testing adapters

import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { MyCustomAdapter } from './my-adapter';

describe('MyCustomAdapter', () => {
let adapter: MyCustomAdapter;

beforeEach(async () => {
adapter = new MyCustomAdapter(/* config */);
await adapter.connect();
await adapter.clear();
});

afterEach(async () => {
await adapter.clear();
await adapter.disconnect();
});

it('should set and get', async () => {
await adapter.set('key1', { name: 'test' });
const result = await adapter.get('key1');
expect(result).toEqual({ name: 'test' });
});

it('should list with filter', async () => {
await adapter.set('1', { status: 'active' });
await adapter.set('2', { status: 'inactive' });

const result = await adapter.list({
where: [{ field: 'status', operator: 'eq', value: 'active' }]
});

expect(result).toHaveLength(1);
expect(result[0].status).toBe('active');
});
});