SkillAgentSearch skills...

Jitdb

A database on top of a log with automatic index generation and maintenance

Install / Use

/learn @ssbc/Jitdb
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<!-- SPDX-FileCopyrightText: 2021 Anders Rune Jensen SPDX-License-Identifier: CC0-1.0 -->

JITDB

A database on top of [async-append-only-log] with automatic index generation and maintenance.

The motivation for this database is that it should be:

  • fast
  • easy to understand
  • run in the browser and in node

Async append only log takes care of persistance of the main log. It is expected to use [bipf] to encode data. On top of this, JITDB lazily creates and maintains indexes based on the way the data is queried. Meaning if you search for messages of type post an author x two indexes will be created the first time. One for type and one for author. Specific indexes will only updated when it is queried again. These indexes are tiny compared to normal [flume] indexes. An index of type post is 80kb.

For this to be feasible it must be really fast to do a full log scan. It turns out that the combination of push streams and bipf makes streaming the full log not much slower than reading the file. Meaning a 350mb log can be scanned in a few seconds.

While this is mainly aimed as a query engine, it is possible to base other indexes types on top of this, such as a reduce index on contact messages.

API

Setup

Before using JITDB, you have to setup an instance of [async-append-only-log] located at a certain path. Then you can instantiate JITDB, and it requires a path to the directory where the indexes will live.

const Log = require('async-append-only-log')
const JITDB = require('jitdb')

const raf = Log('/home/me/path/to/async-log', {
  blockSize: 64 * 1024,
})
const db = JITDB(raf, '/home/me/path/to/indexes')

db.onReady(() => {
  // The db is ready to be queried
})

Operators

JITDB comes with a set of composable "operators" that allow you to query the database. You can load these operators from require('jitdb/operators').

const Log = require('async-append-only-log')
const JITDB = require('jitdb')
const {query, fromDB, where, slowEqual, toCallback} = require('jitdb/operators')

const raf = Log('/home/me/path/to/async-log', {
  blockSize: 64 * 1024,
})
const db = JITDB(raf, '/home/me/path/to/indexes')

db.onReady(() => {
  query(
    fromDB(db),
    where(slowEqual('value.content.type', 'post')),
    toCallback((err, msgs) => {
      console.log(msgs)
    })
  )
})

The essential operators are fromDB, query, and toCallback.

  • fromDB specifies which JITDB instance we are interested in
  • query wraps all the operators, chaining them together
  • where wraps descriptor operators (see below) that narrow down the data
  • toCallback delivers the results of the query to a callback

Then there are descriptor operator that help scope down the results to your desired set of messages: and, or, not, equal, slowEqual, and others.

  • and(...args) filters for messages that satisfy all args
  • or(...args) filters for messages that satisfy at least one of the args
  • not(arg) filters for messages that do not safisfy arg
  • equal(seek, value, opts) filters for messages where a seeked field matches a specific value:
    • seek is a function that takes a [bipf] buffer as input and uses bipf.seekKey to return a pointer to the field
    • value is a string or buffer which is the value we want the field's value to match
    • opts are additional configurations:
      • indexType is a name used to identify the index produced by this query
      • prefix boolean or number 32 that tells this query to use prefix indexes
  • slowEqual(objPath, value, opts) is a more ergonomic (but slower) way of performing equal:
    • objPath a string in the shape "foo.bar.baz" which specifies the nested field "baz" inside "bar" inside "foo"
    • value is the same as value in the equal operator
    • opts same as the opts for equal()
  • includes(seek, value, opts) filters for messages where a seeked field is an array and includes a specific value
  • slowIncludes(objPath, value, opts) is to includes what slowEqual is to equal
  • predicate(seek, fn, opts) filters for messages where a seeked field is passed to a predicate function fn and the fn returns true
    • opts are additional configurations such as indexType and name. You SHOULD pass opts.name as a simple string uniquely identifying the predicate, OR the fn function should be a named function
  • slowPredicate(objPath, fn, opts) is to predicate what slowEqual is to equal
  • absent(seek, opts) filters for messages where a seeked field does not exist in the message
  • slowAbsent(objPath) is to absent what slowEqual is to equal

Some examples:

Get all messages of type post:

query(
  fromDB(db),
  where(slowEqual('value.content.type', 'post')),
  toCallback((err, msgs) => {
    console.log('There are ' + msgs.length + ' messages of type "post"')
  })
)

Same as above but faster performance (recommended in production):

query(
  fromDB(db),
  where(equal(seekType, 'post', { indexType: 'type' })),
  toCallback((err, msgs) => {
    console.log('There are ' + msgs.length + ' messages of type "post"')
  })
)

// The `seekType` function takes a buffer and uses `bipf` APIs to search for
// the fields we want.
const bValue = Buffer.from('value') // better for performance if defined outside
const bContent = Buffer.from('content')
const bType = Buffer.from('type')
function seekType(buffer) {
  var p = 0 // p stands for "position" in the buffer, offset from start
  p = bipf.seekKey(buffer, p, bValue)
  if (p < 0) return
  p = bipf.seekKey(buffer, p, bContent)
  if (p < 0) return
  return bipf.seekKey(buffer, p, bType)
}

Get all messages of type contact from Alice or Bob:

query(
  fromDB(db),
  where(
    and(
      slowEqual('value.content.type', 'contact'),
      or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId))
    )
  ),
  toCallback((err, msgs) => {
    console.log('There are ' + msgs.length + ' messages')
  })
)

Same as above but faster performance (recommended in production):

query(
  fromDB(db),
  where(
    and(
      equal(seekType, 'contact', 'type')
      or(
        equal(seekAuthor, aliceId, { indexType: 'author' }),
        equal(seekAuthor, bobId, { indexType: 'author' })
      )
    )
  ),
  toCallback((err, msgs) => {
    console.log('There are ' + msgs.length + ' messages')
  })
)

// where seekAuthor is
const bValue = Buffer.from('value') // better for performance if defined outside
const bAuthor = Buffer.from('author')
function seekAuthor(buffer) {
  var p = 0
  p = bipf.seekKey(buffer, p, bValue)
  if (p < 0) return
  return bipf.seekKey(buffer, p, bAuthor)
}

Pagination

If you use toCallback, it gives you all results in one go. If you want to get results in batches, you should use toPullStream, paginate, and optionally startFrom and descending.

  • toPullStream creates a [pull-stream] source to stream the results
  • paginate configures the size of each array sent to the pull-stream source
  • startFrom configures the beginning seq from where to start streaming
  • descending configures the pagination stream to order results from newest to oldest (otherwise the default order is oldest to newest) based on timestamp

Example, stream all messages of type contact from Alice or Bob in pages of size 10:

const pull = require('pull-stream')

const source = query(
  fromDB(db),
  where(
    and(
      slowEqual('value.content.type', 'contact')
      or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
    ),
  ),
  paginate(10),
  toPullStream()
)

pull(
  source,
  pull.drain((msgs) => {
    console.log('next page')
    console.log(msgs)
  })
)

Stream all messages of type contact from Alice or Bob in pages of size 10, starting from the 15th message, sorted from newest to oldest:

const pull = require('pull-stream')

const source = query(
  fromDB(db),
  where(
    and(
      slowEqual('value.content.type', 'contact')
      or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
    ),
  ),
  paginate(10),
  startFrom(15),
  descending(),
  toPullStream()
)

pull(
  source,
  pull.drain((msgs) => {
    console.log('next page:')
    console.log(msgs)
  })
)

Batching with the operator batch() is similar to pagination in terms of performance, but the messages are delivered one-by-one to the final pull-stream, instead of as any array. Example:

const pull = require('pull-stream')

const source = query(
  fromDB(db),
  where(
    and(
      slowEqual('value.content.type', 'contact')
      or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
    ),
  ),
  batch(10), // Note `batch` instead of `paginate`
  descending(),
  toPullStream()
)

pull(
  source,
  // Note the below drain is `msg`, not `msgs` array:
  pull.drain((msg) => {
    console.log('next message:')
    console.log(msg)
  })
)

async/await

There are also operators that support getting the values using await. toPromise is like toCallback, delivering all results at once:

const msgs = await query(
  fromDB(db),
  where(
    and(
      slowEqual('value.content.type', 'contact')
      or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
    ),
  ),
  toPromise()
)

console.log('There are ' + msgs.length + ' messages')

With pagination, toAsyncIter is like toPullStream, streaming the results in batches:

const results = query(
  fromDB(db),
  where(
    and(
      slowEqual('value.content.type', 'contact')
      or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
    ),
  ),
  paginate(10),
  startFrom(15),
  toAsyncIter()
)

for await (let msgs of results) {
  console.log('next page:')
  console.log(msgs)
}

Custom

Related Skills

View on GitHub
GitHub Stars50
CategoryData
Updated1y ago
Forks7

Languages

JavaScript

Security Score

65/100

Audited on Oct 10, 2024

No findings