Jitdb
A database on top of a log with automatic index generation and maintenance
Install / Use
/learn @ssbc/JitdbREADME
JITDB
A database on top of [async-append-only-log] with automatic index generation and maintenance.
The motivation for this database is that it should be:
- fast
- easy to understand
- run in the browser and in node
Async append only log takes care of persistance of the main log. It is
expected to use [bipf] to encode data. On top of this, JITDB lazily
creates and maintains indexes based on the way the data is queried.
Meaning if you search for messages of type post an author x two
indexes will be created the first time. One for type and one for
author. Specific indexes will only updated when it is queried again.
These indexes are tiny compared to normal [flume] indexes. An index of
type post is 80kb.
For this to be feasible it must be really fast to do a full log scan. It turns out that the combination of push streams and bipf makes streaming the full log not much slower than reading the file. Meaning a 350mb log can be scanned in a few seconds.
While this is mainly aimed as a query engine, it is possible to base other indexes types on top of this, such as a reduce index on contact messages.
API
Setup
Before using JITDB, you have to setup an instance of [async-append-only-log] located at a certain path. Then you can instantiate JITDB, and it requires a path to the directory where the indexes will live.
const Log = require('async-append-only-log')
const JITDB = require('jitdb')
const raf = Log('/home/me/path/to/async-log', {
blockSize: 64 * 1024,
})
const db = JITDB(raf, '/home/me/path/to/indexes')
db.onReady(() => {
// The db is ready to be queried
})
Operators
JITDB comes with a set of composable "operators" that allow you to
query the database. You can load these operators from
require('jitdb/operators').
const Log = require('async-append-only-log')
const JITDB = require('jitdb')
const {query, fromDB, where, slowEqual, toCallback} = require('jitdb/operators')
const raf = Log('/home/me/path/to/async-log', {
blockSize: 64 * 1024,
})
const db = JITDB(raf, '/home/me/path/to/indexes')
db.onReady(() => {
query(
fromDB(db),
where(slowEqual('value.content.type', 'post')),
toCallback((err, msgs) => {
console.log(msgs)
})
)
})
The essential operators are fromDB, query, and toCallback.
- fromDB specifies which JITDB instance we are interested in
- query wraps all the operators, chaining them together
- where wraps descriptor operators (see below) that narrow down the data
- toCallback delivers the results of the query to a callback
Then there are descriptor operator that help scope down the results to your
desired set of messages: and, or, not, equal, slowEqual, and others.
and(...args)filters for messages that satisfy allargsor(...args)filters for messages that satisfy at least one of theargsnot(arg)filters for messages that do not safisfyargequal(seek, value, opts)filters for messages where aseeked field matches a specific value:seekis a function that takes a [bipf] buffer as input and usesbipf.seekKeyto return a pointer to the fieldvalueis a string or buffer which is the value we want the field's value to matchoptsare additional configurations:indexTypeis a name used to identify the index produced by this queryprefixboolean or number32that tells this query to use prefix indexes
slowEqual(objPath, value, opts)is a more ergonomic (but slower) way of performingequal:objPatha string in the shape"foo.bar.baz"which specifies the nested field"baz"inside"bar"inside"foo"valueis the same asvaluein theequaloperatoroptssame as the opts forequal()
includes(seek, value, opts)filters for messages where aseeked field is an array and includes a specific valueslowIncludes(objPath, value, opts)is toincludeswhatslowEqualis toequalpredicate(seek, fn, opts)filters for messages where aseeked field is passed to a predicate functionfnand thefnreturns trueoptsare additional configurations such asindexTypeandname. You SHOULD passopts.nameas a simple string uniquely identifying the predicate, OR thefnfunction should be a named function
slowPredicate(objPath, fn, opts)is topredicatewhatslowEqualis toequalabsent(seek, opts)filters for messages where aseeked field does not exist in the messageslowAbsent(objPath)is toabsentwhatslowEqualis toequal
Some examples:
Get all messages of type post:
query(
fromDB(db),
where(slowEqual('value.content.type', 'post')),
toCallback((err, msgs) => {
console.log('There are ' + msgs.length + ' messages of type "post"')
})
)
Same as above but faster performance (recommended in production):
query(
fromDB(db),
where(equal(seekType, 'post', { indexType: 'type' })),
toCallback((err, msgs) => {
console.log('There are ' + msgs.length + ' messages of type "post"')
})
)
// The `seekType` function takes a buffer and uses `bipf` APIs to search for
// the fields we want.
const bValue = Buffer.from('value') // better for performance if defined outside
const bContent = Buffer.from('content')
const bType = Buffer.from('type')
function seekType(buffer) {
var p = 0 // p stands for "position" in the buffer, offset from start
p = bipf.seekKey(buffer, p, bValue)
if (p < 0) return
p = bipf.seekKey(buffer, p, bContent)
if (p < 0) return
return bipf.seekKey(buffer, p, bType)
}
Get all messages of type contact from Alice or Bob:
query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact'),
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId))
)
),
toCallback((err, msgs) => {
console.log('There are ' + msgs.length + ' messages')
})
)
Same as above but faster performance (recommended in production):
query(
fromDB(db),
where(
and(
equal(seekType, 'contact', 'type')
or(
equal(seekAuthor, aliceId, { indexType: 'author' }),
equal(seekAuthor, bobId, { indexType: 'author' })
)
)
),
toCallback((err, msgs) => {
console.log('There are ' + msgs.length + ' messages')
})
)
// where seekAuthor is
const bValue = Buffer.from('value') // better for performance if defined outside
const bAuthor = Buffer.from('author')
function seekAuthor(buffer) {
var p = 0
p = bipf.seekKey(buffer, p, bValue)
if (p < 0) return
return bipf.seekKey(buffer, p, bAuthor)
}
Pagination
If you use toCallback, it gives you all results in one go. If you
want to get results in batches, you should use toPullStream,
paginate, and optionally startFrom and descending.
- toPullStream creates a [pull-stream] source to stream the results
- paginate configures the size of each array sent to the pull-stream source
- startFrom configures the beginning seq from where to start streaming
- descending configures the pagination stream to order results from newest to oldest (otherwise the default order is oldest to newest) based on timestamp
Example, stream all messages of type contact from Alice or Bob in pages of size 10:
const pull = require('pull-stream')
const source = query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact')
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
),
),
paginate(10),
toPullStream()
)
pull(
source,
pull.drain((msgs) => {
console.log('next page')
console.log(msgs)
})
)
Stream all messages of type contact from Alice or Bob in pages of
size 10, starting from the 15th message, sorted from newest to
oldest:
const pull = require('pull-stream')
const source = query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact')
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
),
),
paginate(10),
startFrom(15),
descending(),
toPullStream()
)
pull(
source,
pull.drain((msgs) => {
console.log('next page:')
console.log(msgs)
})
)
Batching with the operator batch() is similar to pagination in terms of
performance, but the messages are delivered one-by-one to the final pull-stream,
instead of as any array. Example:
const pull = require('pull-stream')
const source = query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact')
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
),
),
batch(10), // Note `batch` instead of `paginate`
descending(),
toPullStream()
)
pull(
source,
// Note the below drain is `msg`, not `msgs` array:
pull.drain((msg) => {
console.log('next message:')
console.log(msg)
})
)
async/await
There are also operators that support getting the values using
await. toPromise is like toCallback, delivering all results
at once:
const msgs = await query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact')
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
),
),
toPromise()
)
console.log('There are ' + msgs.length + ' messages')
With pagination, toAsyncIter is like toPullStream, streaming the results in batches:
const results = query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact')
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
),
),
paginate(10),
startFrom(15),
toAsyncIter()
)
for await (let msgs of results) {
console.log('next page:')
console.log(msgs)
}
Custom
Related Skills
feishu-drive
346.4k|
things-mac
346.4kManage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database)
clawhub
346.4kUse the ClawHub CLI to search, install, update, and publish agent skills from clawhub.com
postkit
PostgreSQL-native identity, configuration, metering, and job queues. SQL functions that work with any language or driver
